微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

node.js – 我可以限制kafka-node使用者的消费吗?

看起来像我的kafka节点消费者:

var kafka = require('kafka-node');
var consumer = new Consumer(client,[],{
     ...
    });

在某些情况下,获取的消息太多了.
有没有办法限制它(例如,每秒接受不超过1000条消息,可能使用暂停api?)

>我正在使用kafka-node,与Java版本相比,它似乎具有有限的api

解决方法

我有类似的情况,我正在消费来自Kafka的消息,不得不限制消费,因为我的消费者服务依赖于有自己约束的第三方API.

我使用async / queue以及async / cargo的包装程序asyncTimedCargo进行批处理.
货物从kafka-consumer获取所有消息,并在达到大小限制batch_config.batch_size或超时batch_config.batch_timeout时将其发送到队列.
async / queue提供饱和和不饱和的回调,如果队列任务工作者忙,可以使用它来停止使用.这将阻止货物填满,您的应用程序不会耗尽内存.消费将在不满足时恢复.

//cargo-service.js
module.exports = function(key){
    return new asyncTimedCargo(function(tasks,callback) {
        var length = tasks.length;
        var postBody = [];
        for(var i=0;i<length;i++){
            var message ={};
            var task = JSON.parse(tasks[i].value);
            message = task;
            postBody.push(message);
        }
        var postJson = {
            "json": {"request":postBody}
        };
        sms_queue.push(postJson);
        callback();
    },batch_config.batch_size,batch_config.batch_timeout)
};

//kafka-consumer.js
cargo = cargo-service()
consumer.on('message',function (message) {
    if(message && message.value && utils.isValidJsonString(message.value)) {
        var msgObject = JSON.parse(message.value);        
        cargo.push(message);
    }
    else {
        logger.error('Invalid JSON Message');
    }
});

// sms-queue.js
var sms_queue = queue(
retryable({
    times: queue_config.num_retries,errorFilter: function (err) {
        logger.info("inside retry");
        console.log(err);
        if (err) {
            return true;
        }
        else {
            return false;
        }
    }
},function (task,callback) {
// your worker task for queue
  callback()
}),queue_config.queue_worker_threads);

sms_queue.saturated = function() {
    consumer.pause();
    logger.warn('Queue saturated Consumption paused: ' + sms_queue.running());
};
sms_queue.unsaturated = function() {
    consumer.resume();
    logger.info('Queue unsaturated Consumption resumed: ' + sms_queue.running());
};

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐