微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Kafka 笔记

1.Kafka的安装与运行

安装

下载地址 http://kafka.apache.org/downloads

更改配置 打开 config/server.properties 文件 修改以下

log.dirs = C:\sunlin\kafka\kafka_2.11-2.1.1\log

运行

kafka运行依赖于zookeeper,所以先启动zookeeper服务。 kafka bin 目录里自带zookeeper-server-start.bat 用来启动zookeeper 参数为 config/zookeeper.properties

然后就可以运行 bin/windows/kafka-server-start.bat 参数为 config/server.properties

 

2. kafka 命令

添加一个topic

kafka-topics.bat  --create --zookeeper localhost:2181 --replication-factor   1 --partitions 1 --topic labelwall_516_test

查看所有topic

kafka-topics.bat --list --zookeeper localhost:2181

启动producer

kafka-console-producer.bat --broker-list localhost:9092 --topic labelwall_516_test

启动consumer

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic labelwall_516_test --from-beginning

 

3.kafka producer的三种消息

private static KafkaProducer<String, String> producer;

@Before
public void init() {
    Properties prop = new Properties();
    prop.put("bootstrap.servers", "127.0.0.1:9092");
    prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producer = new KafkaProducer<String, String>(prop);
}

只发送不管结果

@Test
public void sendMessageForgetResult() {
    ProducerRecord<String, String> record = 
                  new ProducerRecord<String, String>("labelwall_516_test", "name", "forgetResult");
    producer.send(record);
    producer.close();
}

同步发送

@Test
public void sendMessageSync() throws ExecutionException, InterruptedException {
    ProducerRecord<String, String> record = 
            new ProducerRecord<String, String>("labelwall_516_test", "name", "sync");
    RecordMetadata result = producer.send(record).get();
    System.out.println(result.topic());
    // 分区
    System.out.println(result.partition());
    // 偏移量
    System.out.println(result.offset());
    producer.close();
}

异步发送

public static class MyProducerCallback implements Callback {
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            e.printstacktrace();
            return;
        }
        System.out.println(recordMetadata.topic());
        // 分区
        System.out.println(recordMetadata.partition());
        // 偏移量
        System.out.println(recordMetadata.offset());
        System.out.println("coming in MyProducerCallback ... ");
    }
}

@Test
public void sendMessageCallback() throws ExecutionException, InterruptedException {
    ProducerRecord<String, String> record = new ProducerRecord<String, String>("labelwall_516_test", "name", "Callback");
    producer.send(record, new MyProducerCallback());
    producer.close();
}

4.自定义分区分配器


public class CustomPartitioner implements Partitioner {

    /**
     * 自定义消息分区
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
        int size = partitionInfoList.size();
        if(keyBytes == null || !(key instanceof  String)) {
            throw new InvalidRecordException("kafka message must be have key !!!");
        }
        if(size == 1){
            return 0;
        }
        if(key.equals("name")){
            return size -1 ;
        }
        return Math.abs(Utils.murmur2(keyBytes)) & (size -1);
    }

    public void close() {

    }

    public void configure(Map<String, ?> map) {

    }
}

 

5. kafka的consumer

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐