1.Kafka的安装与运行
安装
下载地址 http://kafka.apache.org/downloads
更改配置 打开 config/server.properties 文件 修改以下
log.dirs = C:\sunlin\kafka\kafka_2.11-2.1.1\log
运行
kafka运行依赖于zookeeper,所以先启动zookeeper服务。 kafka bin 目录里自带了 zookeeper-server-start.bat 用来启动zookeeper 参数为 config/zookeeper.properties
然后就可以运行 bin/windows/kafka-server-start.bat 参数为 config/server.properties
2. kafka 命令
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic labelwall_516_test
查看所有topic
kafka-topics.bat --list --zookeeper localhost:2181
启动producer
kafka-console-producer.bat --broker-list localhost:9092 --topic labelwall_516_test
启动consumer
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic labelwall_516_test --from-beginning
3.kafka producer的三种消息
private static KafkaProducer<String, String> producer;
@Before
public void init() {
Properties prop = new Properties();
prop.put("bootstrap.servers", "127.0.0.1:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(prop);
}
只发送不管结果
@Test
public void sendMessageForgetResult() {
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("labelwall_516_test", "name", "forgetResult");
producer.send(record);
producer.close();
}
同步发送
@Test
public void sendMessageSync() throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("labelwall_516_test", "name", "sync");
RecordMetadata result = producer.send(record).get();
System.out.println(result.topic());
// 分区
System.out.println(result.partition());
// 偏移量
System.out.println(result.offset());
producer.close();
}
异步发送
public static class MyProducerCallback implements Callback {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printstacktrace();
return;
}
System.out.println(recordMetadata.topic());
// 分区
System.out.println(recordMetadata.partition());
// 偏移量
System.out.println(recordMetadata.offset());
System.out.println("coming in MyProducerCallback ... ");
}
}
@Test
public void sendMessageCallback() throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("labelwall_516_test", "name", "Callback");
producer.send(record, new MyProducerCallback());
producer.close();
}
4.自定义分区分配器
public class CustomPartitioner implements Partitioner {
/**
* 自定义消息分区
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
int size = partitionInfoList.size();
if(keyBytes == null || !(key instanceof String)) {
throw new InvalidRecordException("kafka message must be have key !!!");
}
if(size == 1){
return 0;
}
if(key.equals("name")){
return size -1 ;
}
return Math.abs(Utils.murmur2(keyBytes)) & (size -1);
}
public void close() {
}
public void configure(Map<String, ?> map) {
}
}
5. kafka的consumer
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。