微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

kafka 的使用回顾

基本上照着官网操作,一步一步的就能操作出来。 

kafka分producer  kafka集群    consumer  三个角色

kafka是个消息队列,

数据的来源是producer产生的

消费是由consumer消费的

照着官网我也走一遍吧

Step 1: Download the code (下载)

Step 2: Start the server (启动服务)
做这一步需要先配置环境变量 :
export KAFKA_HOME=/home/hadoop/app/kafka_2.11-0.9.0.0
export PATH=$KAFKA_HOME/bin:$PATH
第二步修改server.properties 文件,这个文件在config 文件夹下

broker.id=0
listeners=PLAINTEXT://:9092
host.name=localhost
log.dirs=/home/hadoop/app/tmp/kafka
zookeeper.connect=localhost:2181

上面这5个参数优先注意下。需要改就改一下。

启动:  kafka-server-start.sh config/server.properties

Step 3: Create a topic  (创建一个主题)

kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test   Step 4: Send some messages(启动生产者,发送消息) >  kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message ……  

Step 5: Start a consumer  (启动消费者,消费消息)

>  kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message   注意Step5  这里的--bootstrap-server   一般都是zookeeper , 这个和生产者的bootstrap-server 不一样。 可以写成这样:kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning --from-beginning  的意思是从头开始消费,也就是kafka集群里面存有的数据都会给你消费一次。  如果不加上,就是从当前开始计算,producer产生一个,消费一个   代码:



public class KafkaProperties {
public static final String ZK = "ip:2181";
  public static final String TOPIC = "test_topic001";
public static final String BROKERLIST = "ip1:port1,ip2:port2,ip3:port3";
public static final String GROUP_ID = "topic_group_id_1";
}


public class KafkaProducer extends Thread{
private String topic;
private Producer<Integer, String> producer;

public KafkaProducer(String topic){
this.topic = topic;

Properties properties = new Properties();
properties.put("metadata.broker.list", KafkaProperties.BROKERLIST);
properties.put("serializer.class","kafka.serializer.StringEncoder");
properties.put("request.required.acks","1");

producer = new Producer<Integer, String>(new ProducerConfig(properties));
}
@Override
public void run() {
int messageNo = 1;

while(true) {
String message = "message: " + messageNo;
producer.send(new KeyedMessage<Integer, String>(topic, message));
System.out.println("Sent: " + message);
messageNo ++ ;
try{
Thread.sleep(2000);
} catch (Exception e){
e.printStackTrace();
}
}
}
}

public class KafkaConsumer extends Thread{
private String topic ;
private Consumer consumer;
public KafkaConsumer(String topic){
this.topic = topic;
}
/**
* kafka 的consumer
* @return
*/
private ConsumerConnector consumerConnector(){
Properties properties = new Properties();
properties.put("zookeeper.connect",KafkaProperties.ZK);
properties.put("group.id",KafkaProperties.GROUP_ID);
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}

@Override
public void run() {
//建立连接
ConsumerConnector consumer = consumerConnector();
Map<String,Integer> topicMapCount = new HashMap<String, Integer>();
topicMapCount.put(topic , 1);
//consumer.createMessageStreams 建立连接 返回类型为 Map<String, List<KafkaStream<byte[], byte[]>>>
//String : topic
//List<KafkaStream<byte[], byte[]>> 数据流
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicMapCount);
KafkaStream<byte[], byte[]> messageAndMetadata = messageStreams.get(topic).get(0);// get(0)获取每次接受到的数据
//获取数据后需要迭代一下
ConsumerIterator<byte[], byte[]> iterator = messageAndMetadata.iterator();
while(iterator.hasNext()){
// byte[] message = iterator.next().message();
//数据默认是byte的需要转换成String
String message = new String(iterator.next().message());
System.out.println(message);
}
}
}

/**
* kafka api 测试类
*/
public class KafkaClientApp {
public static void main(String[] args) {
// new KafkaProducer(KafkaProperties.TOPIC).start();
new KafkaConsumer(KafkaProperties.TOPIC).start();
}
}

 

     

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐