如何解决有没有办法通过 API 重置 Kafka 消费者组的偏移量?
我有一个用例,其中有一个消费组正在消费消息。我想构建一个 API 来修改它的偏移量。因此,当使用偏移量调用端点时,我必须更改使用者组的偏移量。我正在使用 SpringBoot,而消费者是使用 Spring Kafka 构建的。 提前致谢。
解决方法
这是一个通过 CLI 的解决方案:
列出群组订阅的主题:
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe
注意“CURRENT-OFFSET”和“LOG-END-OFFSET”下的值。 “CURRENT-OFFSET”是这个消费者组当前在每个分区中的偏移量。
重置主题的消费者偏移量(预览):
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest
这将打印重置的预期结果,但不会实际运行它。
重置主题的消费者偏移量(执行):
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute
这将执行重置并将指定主题的消费者组偏移量重置回 0。
重复1检查是否重置成功
,Spring for Apache Kafka 为在应用程序初始化期间或之后的任何时间执行查找提供了一些便利的机制。
最简单的方法是让您的侦听器扩展 AbstractConsumerSeekAware
或实现 ConsumerSeekAware
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。