微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在kafka主题中创建多个分区,并使用kafka-node将消息发布到所有分区

如何解决在kafka主题中创建多个分区,并使用kafka-node将消息发布到所有分区

我是kafka的新手,并使用kafka-node在nodeJS中实现了它。我想在一个主题中创建3个分区,并同时将消息发布到所有主题。我尝试了以下代码,但是这里仅创建一个分区,所有消息都将发送到该分区。谁能告诉我我要去哪里错了。非常感谢。

Abc.abcData = async() => {
    try 
    {
        var client = new kafka.KafkaClient();
        var topic = 'newTopic';
        var topicsToCreate = [
        {
            topic: topic,partitions: 3,replicationFactor: 2,replicaAssignment: [
            {
              partition: 0,replicas: [0]
            },{
                partition: 1,replicas: [1]
            },{
                partition: 2,replicas: [2]
            }
          ]
        },]
        client.createtopics(topicsToCreate,(error,result) => {
        console.log(result);
        });
        var HighLevelProducer = kafka.HighLevelProducer;
        var producer = new HighLevelProducer(client);
        var payloads = [
        { topic: topic,messages: 'this is partition 1!!',partitions: 0},{ topic: topic,messages: 'this is partition 2!!',partitions: 1},messages: 'this is partition 3!!',partitions: 2}
        ];
        producer.on('ready',function () {
            producer.send(payloads,function (err,result) {
                if (err)
                    console.log(err);
                console.log(result);

            });
        });
        
    } 
    catch (err) 
    {
        console.error(err.message);
    }
};

我得到如下回应-

[ { topic: 'newTopic',error: "Topic 'newTopic' already exists." } ]
{"newTopic":{"0":6}}

解决方法

在这里您在kafka服务器上使用了 createTopics(),只有在Kafka服务器上的 auto.create.topics.enable 设置为true时,它才起作用。客户端只需将元数据请求发送到服务器即可自动创建主题。当async设置为false时,此方法在创建所有主题之前不会返回,否则将立即返回。 因此,这里默认创建一个带有一个分区的主题。要创建多个分区或对其进行自定义,您必须在 server.property 文件-

中添加以下行
auto.create.topics.enable=false

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。