微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Kafka消费者偏移量自动重置等参数

如何解决Kafka消费者偏移量自动重置等参数

我有一个 KafkaConsumer,它需要订阅两个主题 topicAtopicB。但是我需要一些不同的参数。例如。如果我需要 auto.offset.reset topicAearilesttopicB 应该是 latest。我认为没有简单的方法可以做到这一点。一种选择是运行两个消费者,但在这种情况下,我需要两个轮询线程,因此应该处理多线程。有没有更简单的方法

解决方法

创建两个(或更多)线程是正确的。

消费者不是线程安全的,无论如何都应该与其他进程隔离和分离。

您可以使用更高级别的 Kafka 库(例如 Vert.x / Spring)来简化此操作。

,

如果您需要使用一个消费者,那么我认为您可以将 auto.reset.offset 设置为 latest,然后手动移动 topicA 的偏移量(如果需要)。为此,您可以在订阅和轮询循环之间:

  1. 获取分配给消费者的分区(方法 assignment
  2. 通过过滤前一点的 topicA 分区(方法 committed)来检查主题 topicA 的提交偏移量。如果结果为 null,则调用 seekToBeginning 方法。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。