微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

node.js – kafka-node使用者收到offsetOutOfRange错误

我正在使用kafka-node(kafka的节点客户端),使用使用者来检索有关主题的消息.不幸的是,我收到了“offsetoutOfRange”条件(调用了offsetoutOfRange回调).我的应用程序运行良好,直到消费者显着落后于生产者,在最早和最新的偏移之间留下了一个很大的差距.此时我(也许是错误的)假设消费者能够继续接收消息(并希望能够接收到生产者).

我的kafka消费者客户端代码如下:

:
:
var kafka = require('kafka-node');

var zookeeper = "10.0.1.201:2181";
var id = "embClient";

var Consumer = kafka.Consumer;
var client = new kafka.Client(zookeeper,id);
var consumer = new Consumer( client,[ { topic: "test",partition: 0 } ],{ autoCommit: false } );

consumer.on('error',[error callback...]);

consumer.on('offsetoutOfRange',[offset error callback...]);

consumer.on('message',[message callback...]);
:
:

我做错了什么,还是我错过了什么?

如果没有,我有几个问题:

(a)是否有一种公认的“最佳”方式来写客户端以优雅地处理这种情况?

(b)为何会提出这个条件? (我假设一个客户应该能够继续阅读它停止的消息,最终(理想情况下)赶上……)

(c)我是否需要编写代码/逻辑来处理这种情况,并明确地重新定位消费者偏移量以进行读取? (这看起来有点麻烦)……

任何帮助表示赞赏.

解决方法

我相信该应用程序可能会尝试读取Kafka中不再提供的消息. Kafka根据log.retention.*属性删除旧消息.假设您已向Kafka发送1000条消息.由于保留,Kafka删除了前500条消息.如果您的应用尝试阅读消息350,它将失败并且它将引发offsetoutOfRange错误.之所以会发生这种情况,是因为您的消费者非常缓慢,以至于Kafka经纪人已经在消费者处理消息之前删除了消息.或者您的消费者崩溃了,但上次处理的消息的偏移量已保存在某处.

您可以使用Offset class检索最新/最早的可用偏移量(请参阅方法提取)并更新消费者的偏移量.我们使用这种方法.

一般来说,当这种情况发生时应该知道应该做什么并不容易,因为显然有些事情是非常错误的.

希望能帮助到你,卢卡斯

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐