微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用python从kafka的最新偏移量中读取

如何解决使用python从kafka的最新偏移量中读取

我正在使用 confluent-kafka Python 库来读取 kafka。 我正在使用以下消费者设置

Consumer ={
"bootstrap.servers" : kafka_server,"group_id" : "testing","auto.offset.reset" : "latest"}

我的目标是确保我始终阅读 kafka 中的最新消息。只要程序继续运行,上述方法就可以工作。但是如果程序由于某种原因崩溃,它会从它上次消费的消息而不是主题中的最后一条消息开始读取。

我不介意丢失一些消息,但我始终阅读最新消息是绝对必要的。看起来消费者记住了偏移量并从它开始而不是从最新的偏移量开始。

我尝试将 enable.auto.commit 参数设置为 False 但得到相同的结果。

解决方法

enable.auto.commit 应该是 true,如果你想实现这种情况。

由于您有 enable.auto.commit='false',这意味着您的代码(消费者)有责任提交偏移量。在崩溃的情况下,它可能无法保证提交偏移量,这会导致您的应用程序从最后一条消费消息开始。

配置“最新”并不意味着消费者会跳过消息并处理最新消息。

,

如果您想从 latest 中读取消息,请始终使用唯一的 group_id for consumer always 并确保 auto.offset.reset 是最新的。

您可以始终使用 uuid 生成随机 ID

 Consumer ={ "bootstrap.servers" : kafka_server,"group_id" : uuid.uuid4(),"auto.offset.reset" : "latest"}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。