微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

消费一下kafka的__consumer_offsets

__consumer_offsets

consumer认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets

__consumer_offsets 为kafka中的topic, 那就可以通过消费者进行消费.

大概思路:

 

 

 1.先启动一个生产者:

 

offset_Producer
package Look_offset;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/*简单一个生产者 给offset_Consumer提供数据消费的*/
public class offset_Producer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig, "zzz01:9092");
        properties.put(ProducerConfig.BATCH_SIZE_CONfig, "16384");
        properties.put(ProducerConfig.LINGER_MS_CONfig, 1);
        properties.put(ProducerConfig.BUFFER_MEMORY_CONfig, 33554432);
        //
        properties.put(ProducerConfig.ACKS_CONfig, "all");

        properties.put("retries", 3);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig, "org.apache.kafka.common.serialization.StringSerializer");


        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        for (int i = 0; i < 1000; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("otto", "害羞小向晚" + i + "次");
            //回调函数  acks设置为all  等所有follower落盘完成之后返回一个回执消息
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata Metadata, Exception exception) {

                    if (exception != null) {
                        exception.printstacktrace();
                    } else {
                        System.out.println(Metadata.topic() + "   数据:" + producerRecord.value() + " " + "分区: " + Metadata.partition() + " "
                                + "offset:" + Metadata.offset());

                    }
                }
            });
            //同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
            //由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。
            Thread.sleep(5);
        }


        kafkaProducer.close();

    }
}

 2. 在kafka上启动脚本

消费__consumer_offsets的脚本:

#将结果输出文件 方便查看
bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server  Ava01:9092 --consumer.config config/consumer.properties  --formatter "kafka.coordinator.group.GroupMetadataManager\$Offsetsmessageformatter" --from-beginning >>kafka_offset.txt

3.启动消费者

又是消费者又是生产者 产生的offset放进去__consumer_offset要被在kafaka中用脚本启动的消费者消费
offset_Consumer
package Look_offset;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/*
* 又是消费者又是生产者 产生的offset放进去__consumer_offset要被offset_Consumer2消费*/
public class offset_Consumer {
    public static void main(String[] args) {
        // 1. 创建配置对象
        Properties properties = new Properties();

        // 2. 给配置对象添加参数
        // 添加连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig, "zzz01:9092");

        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig, "org.apache.kafka.common.serialization.StringDeserializer");
        // 配置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONfig, "Ava");

        // 修改分区分配策略
//        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONfig, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

        // 不排除内部offset,不然看不到__consumer_offsets
        properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONfig, "false");

        //3. 创建kafka消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        //4. 设置消费主题  形参是列表
        ArrayList<String> arrayList = new ArrayList<>();
        // 更换主题
        arrayList.add("otto");
        consumer.subscribe(arrayList);

        //5. 消费数据
        while (true) {
            // 读取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));

            // 输出消息
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.value() + " "+ "offset: "+consumerRecord.offset());
            }
        }

    }


}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐