微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何正确寻找Kafka中的最后一个偏移量?

如何解决如何正确寻找Kafka中的最后一个偏移量?

我正在尝试从上次偏移读取 Kafka 主题,但无法正确读取:

属性文件中我设置了这些:

group.id=my_group
client.id=my_client
enable.auto.commit=true
auto.offset.reset=最早的
isolation.level=read_committed

然后是代码

consumer = new KafkaConsumer<>(props); // read properties
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
consumerRecords.forEach(consumerRecord -> System.out.println(consumerRecord.offset());

即使我在主题中有一些项目,我也可以看到如何从 0 打印偏移量。

在日志文件中我可以看到这个(缩短):

[Consumer clientId=my_client,groupId=my_group] 分配完成 对于第 1 代的组: {my_client-f1678be7-ce6b-48e8-acf2-741ab28f7266=分配(partitions=[mytopic-0])}

[Consumer clientId=my_client,groupId=my_group] 加入成功 第 1 代 [Consumer clientId=my_client,groupId=my_group] 通知转让人有关新的 分配(partitions=[mytopic-0])

[Consumer clientId=my_client,groupId=my_group] 添加新分配的 分区:mytopic-0

[Consumer clientId=my_client,groupId=my_group] 发现没有提交 分区 mytopic-0 的偏移量

[Consumer clientId=my_client,> groupId=my_group] 重置偏移量 将 mytopic-0 分区到偏移量 0。

知道我做错了什么吗?我用 seekToBeginning() + poll() + commitSync() + seekToEnd() 尝试了一些魔法,它以某种方式起作用,但我认为认情况下这应该起作用。

解决方法

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。