微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

confluent-kafka-python 库:每个消费者组每个主题的读取偏移量

如何解决confluent-kafka-python 库:每个消费者组每个主题的读取偏移量

由于 pykafka EOL,我们正在迁移到 confluent-kafka-python。对于 pykafka,我们编写了一个精心设计的脚本,以以下格式生成输出

主题 消费者群体 偏移
topic_alpha total_messages 100
topic_alpha consumer_a 10
topic_alpha consumer_b 25

我想知道是否有 Python 代码知道如何为 confluent-kafka-python 做类似的事情?

小字:有一个关于如何read offsets per given consumer_group的部分示例。但是,我很难在不手动解析 consumer_group 的情况下获得每个主题__consumer_offsets 列表。

解决方法

使用 admin_client.list_groups() 获取组列表,使用 admin_client.list_topics() 获取集群中的所有主题和分区,使用 client.get_watermark_offsets() 获取给定主题。

然后为每个消费者组实例化一个具有相应group.id的新消费者,创建一个TopicPartition列表来查询已提交的偏移量,然后调用c.committed()来检索已提交的偏移量。 从高水印中减去承诺的偏移量得到 th

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。