微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

获取使用 StreamBridge 处理 kafka 消息的分区和偏移量

如何解决获取使用 StreamBridge 处理 kafka 消息的分区和偏移量

我需要打印/记录/存储处理我的消息的 kafka 分区和偏移量。 我怎样才能做到这一点? 我正在使用 StreamBridge 从生产者发送消息,并使用功能性 spring kafka 流方法

Public delegatetosupplier(String id,Abc obj) {
Message<Abc> message = MessageBuilder.withPayload(obj).seHeaders(KafkaHeaders.MESSAGE_KEY,id.getBytes()).build();
streamBridge.send("out-topic",message);
}

解决方法

记录元数据可通过元数据通道(异步)获得:

@SpringBootApplication
public class So66436499Application {

    public static void main(String[] args) {
        SpringApplication.run(So66436499Application.class,args);
    }

    @Autowired
    StreamBridge bridge;

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            this.bridge.send("myBinding","test");
            Thread.sleep(5000);
        };
    }

    @ServiceActivator(inputChannel = "meta")
    void meta(Message<?> sent) {
        System.out.println("Sent: " + sent.getHeaders().get(KafkaHeaders.RECORD_METADATA,RecordMetadata.class));
    }

}
spring.cloud.stream.bindings.myBinding.destination=foo
spring.cloud.stream.kafka.bindings.myBinding.producer.record-metadata-channel=meta
Sent: foo-0@5

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。