如何解决Kafka消费者偏移量自动重置等参数
我有一个 KafkaConsumer,它需要订阅两个主题 topicA
和 topicB
。但是我需要一些不同的参数。例如。如果我需要 auto.offset.reset
topicA
是 earilest
而 topicB
应该是 latest
。我认为没有简单的方法可以做到这一点。一种选择是运行两个消费者,但在这种情况下,我需要两个轮询线程,因此应该处理多线程。有没有更简单的方法?
解决方法
创建两个(或更多)线程是正确的。
消费者不是线程安全的,无论如何都应该与其他进程隔离和分离。
您可以使用更高级别的 Kafka 库(例如 Vert.x / Spring)来简化此操作。
,如果您需要使用一个消费者,那么我认为您可以将 auto.reset.offset
设置为 latest
,然后手动移动 topicA
的偏移量(如果需要)。为此,您可以在订阅和轮询循环之间:
- 获取分配给消费者的分区(方法 assignment)
- 通过过滤前一点的
topicA
分区(方法 committed)来检查主题topicA
的提交偏移量。如果结果为null
,则调用 seekToBeginning 方法。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。