微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

KafkaIO - 与 groupId 一起使用时,enable.auto.commit 的不同行为设置为 true 和 commitOffsetsInFinalize

如何解决KafkaIO - 与 groupId 一起使用时,enable.auto.commit 的不同行为设置为 true 和 commitOffsetsInFinalize

我们有一个 Apache Beam 管道,它从给定的 kafka 主题读取消息并进行进一步处理。我的管道使用 FlinkRunner,我描述了我们尝试过的三种不同情况:

案例 1:未指定组 ID:

Beam 为每次运行创建一个新的消费者,从而从最新的主题偏移量中读取。它读取消费者启动后产生的消息。在这种情况下,在管道停止和重新启动之间的时间间隔内可能会丢失数据

案例 2:指定组 ID 并将 enable.auto.commit 设置为 true Beam 从管道停止时开始重新处理消息,并开始读取未提交给给定 groupid 的 kafka 的消息。

新的组 ID 再次开始监听来自最新主题偏移量的消息并开始提交消息

.withConsumerConfigUpdates(ImmutableMap.of("enable.auto.commit",true))
.withConsumerConfigUpdates(ImmutableMap.of("group.id","testGroupId"))

案例 3:使用 commitOffsetsInFinalize() 指定的组 ID

理想情况下,我希望这里的行为与案例 2 相同,但我看到的行为类似于案例 1,其中在管道停止和重新启动之间存在潜在的数据丢失。

.withConsumerConfigUpdates(ImmutableMap.of("group.id","testGroupId"))
.commitOffsetsInFinalize()

从 KafkaIO 的文档中我确实看到,当检查点按以下方式完成时,偏移量会提交回 kafka:https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1098

我们想了解:

  1. 为什么案例 2 在停止和重新启动管道时表现不像案例 3?
  2. 在哪些情况下我们应该将 enable.auto.commit 设置为 true 与 commitOffsetsinFinalize

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。