微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

组中的消费者不会从主题中的所有分区中消费

如何解决组中的消费者不会从主题中的所有分区中消费

我正在使用 Kafka 和 Java 11。我创建了名为 first-application主题,其中包含 3 个分区。下面我显示 cli 结果:

kafka-topics.sh --bootstrap-server localhost:9092 --topic first-topic --describe
Topic: first-topic  TopicId: hJ0PxwbWRQObfbdNVsIv9A PartitionCount: 3   ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: first-topic  Partition: 0    leader: 0   Replicas: 0 Isr: 0
    Topic: first-topic  Partition: 1    leader: 0   Replicas: 0 Isr: 0
    Topic: first-topic  Partition: 2    leader: 0   Replicas: 0 Isr: 0

下面我展示了我的消费者和生产者代码

生产者代码

package com.github.pirx1988.kafka.tutorial1;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerDemoKeys {

    public static void main(String[] args) throws ExecutionException,InterruptedException {

        Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);

        // create Producer properties:

        Properties consumerProps = new Properties();
        consumerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,"localhost:9092");
        consumerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,StringSerializer.class.getName());
        consumerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,StringSerializer.class.getName());

        // create the producer
        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(consumerProps);

        for (int i = 0; i < 10; i++) {
            String topic = "first-topic";
            String value = "hello world" + Integer.toString(i);
            String key = "id_" + Integer.toString(i);
            // create a producer record:
            ProducerRecord<String,String> record = new ProducerRecord<String,String>(topic,key,value);

            logger.info("Key: " + key); // log the key

            // send data
            producer.send(record,new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata,Exception e) {
                    if (e == null) {
                        // the record was successfully sent
                        logger.info("Received new Metadata: \n" +
                                "Topic: " + recordMetadata.topic() + "\n" +
                                "Partition: " + recordMetadata.partition() + "\n" +
                                "Offset: " + recordMetadata.offset() + "\n" +
                                "Timestamp: " + recordMetadata.timestamp()
                        );
                    } else {
                        logger.error("Error while producing",e);
                    }
                }
            }).get();
        }
        // required because send is async action
        producer.flush();
        producer.close();
    }
}

消费者代码

package com.github.pirx1988.kafka.tutorial1;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerDemogroups {
    public static void main(String[] args) {
        Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);

        String bootstrapServer = "localhost:9092";
        String groupId = "third-app";
        String topic = "first-topic";

        // create consumer configs
        Properties consumerProps = new Properties();
        consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapServer);
        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class.getName());
        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,StringDeserializer.class.getName());
        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONfig,groupId);
        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest");

        // create consumer
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(consumerProps);

        // subscribe consumer to our topic(s)
        consumer.subscribe(Collections.singletonList(topic));

        while(true) {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach((record) -> {
                logger.info("Key: " + record.key() + ",Value: " + record.value());
                logger.info("Partition: " +  record.partition());
                logger.info("Offset: " + record.offset());
            });

        }
    }
}

我陈述了与主题 first-topic 相关的同一组 第三个应用的前 2 个消费者。然后我在 for 循环中生成了 10 条消息(就像在代码中一样)并期望我会消耗来自 3 个分区的所有 10 条消息,但我只消耗了 5 条。似乎没有从分区 2 消耗数据。

下面我显示来自一个消费者的日志:

enter image description here

第二消费者:

enter image description here

总而言之,缺少 5 条消息。我确定我没有启动 ConsumerDemogroups3 服务(看屏幕)。我多次重复此操作,每次都丢失 5 条消息。我的消费者或生产者设置有问题吗?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。