微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Kafka了解何时消耗了相关消息

如何解决Kafka了解何时消耗了相关消息

一旦消耗了几条相关的消息,在卡夫卡中有什么办法产生消息? (无需手动在应用程序代码处对其进行控制...)

用例是选择一个巨大的文件,将其分成几个块,为一个主题中的每个块发布一条消息,一旦所有这些消息被消耗,就会产生另一条消息,以通知一个主题的结果。 / p>

我们可以使用数据库或REdis来控制状态,但是我想知道是否存在仅利用Kafka生态系统的更高层次的方法

解决方法

方法可以如下:

  1. 使用完每个块应用程序后,应生成状态(已消耗和块编号)的消息
  2. 第二个应用程序(一次Kafka Streams)应聚合结果,并且当所有块的处理消息产生最终消息时,该文件将被处理。
,

您可以使用ConsumerGroupCommand检查特定使用者组是否已完成对特定主题的所有消息的处理:

  1. $ kafka-consumer-groups --bootstrap-server broker_host:port --describe --group chunk_consumer

OR

  1. $ kafka-run-class kafka.admin.ConsumerGroupCommand ...

每个分区的零延迟将指示消息已被成功使用,并且由使用者提交了偏移量。

或者,您可以选择订阅__consumer_offsets主题并自己处理来自该主题的消息,但是使用ConsumerGroupCommand似乎是一种更简单的解决方案。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。