一、基本概念
- Topic:一组消息数据的标记符;
- Producer:生产者,用于生产数据,可将生产后的消息送入指定的Topic;
- Consumer:消费者,获取数据,可消费指定的Topic;
- Group:消费者组,同一个group可以有多个消费者,一条消息在一个group中,只会被一个消费者获取;
- Partition:分区,为了保证kafka的吞吐量,一个Topic可以设置多个分区。同一分区只能被一个消费者订阅。
二、本地安装与启动(基于Docker)
- 下载zookeeper镜像与kafka镜像:
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
2. 本地启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
3. 本地启动kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=localhost \
--env KAFKA_ADVERTISED_PORT=9092 \
wurstmeister/kafka:latest
注意:上述代码,将kafka启动在9092端口
4. 进入kafka bash
docker exec -it kafka bash
cd /opt/kafka/bin
5. 创建Topic,分区为2,Topic name为'kafka_demo'
kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic kafka_demo
6. 查看当前所有topic
kafka-topics.sh --zookeeper zookeeper:2181 --list
7. 安装kafka-python
pip install kafka-python
三、生产者(Producer)与消费者(Consumer)
个人封装
生产者和消费者的简易Demo,这里一起演示:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import traceback
from kafka import KafkaConsumer,KafkaProducer,TopicPartition
"""
kafka 生产者
"""
class KProducer(object):
def __init__(self,bootstrap_servers):
"""
:param bootstrap_servers: 地址
"""
# json 格式化发送的内容
self.producer = KafkaProducer(
bootstrap_servers = bootstrap_servers,value_serializer = lambda m: json.dumps(m).encode("ascii")
# compression_type = "gzip" # 压缩消息发送
)
def sync_producer(self,topic,data):
"""
同步发送 数据
:param topic: topic
:param data_li: 发送数据
:return:
"""
future = self.producer.send(topic,data)
record_Metadata = future.get(timeout=10) # 同步确认消费
partition = record_Metadata.partition # 数据所在的分区
offset = record_Metadata.offset # 数据所在分区的位置
print("save success,partition: {},offset: {}".format(partition,offset))
def asyn_producer(self,data):
"""
异步发送数据
:param topic: topic
:param data_li:发送数据
:return:
"""
self.producer.send(topic,data)
self.producer.flush() # 批量提交
def asyn_producer_callback(self,data):
"""
异步发送数据 + 发送状态处理
:param topic: topic
:param data_li:发送数据
:return:
"""
self.producer.send(topic,data).add_callback(self.send_success).add_errback(self.send_error)
self.producer.flush() # 批量提交
def send_success(self,*args,**kwargs):
"""异步发送成功回调函数"""
print('save success')
return
def send_error(self,**kwargs):
"""异步发送错误回调函数"""
print('save error')
return
def close_producer(self):
try:
self.producer.close()
except:
pass
"""
kafka 消费商
"""
class PConsumers(object):
def __init__(self,bootstrap_servers,group_id):
"""
:param bootstrap_servers: 地址
"""
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
# 获取规定个数的数据(可修改做无限持续获取数据)
def get_message(self,count=1):
"""
:param topic: topic
:param count: 取的条数
:return: msg
"""
counter = 0
msg = []
try:
consumer = KafkaConsumer(
topic,bootstrap_servers = self.bootstrap_servers,group_id = self.group_id,value_deserializer = lambda m: json.loads(m.decode("ascii")),# 确定返回结果json还是str
auto_offset_reset = "earliest"
)
for message in consumer:
print(
"%s:%d:%d: key=%s value=%s header=%s" % (
message.topic,message.partition,message.offset,message.key,message.value,message.headers
)
)
msg.append(message.value)
counter += 1
if count == counter:
break
else:
continue
consumer.close()
except Exception as e:
print("{0},{1}".format(e,traceback.print_exc()))
return None
return msg
# 查看剩余量
def get_count(self,topic):
"""
:param topic: topic
:return: count
"""
try:
consumer = KafkaConsumer(
topic,group_id = self.group_id
)
partitions = [TopicPartition(topic,p) for p in consumer.partitions_for_topic(topic)]
#print("start to cal offset:")
# total
toff = consumer.end_offsets(partitions)
toff = [(key.partition,toff[key]) for key in toff.keys()]
toff.sort()
#print("total offset: {}".format(str(toff)))
# current
coff = [(x.partition,consumer.committed(x)) for x in partitions]
coff.sort()
#print("current offset: {}".format(str(coff)))
# cal sum and left
toff_sum = sum([x[1] for x in toff])
cur_sum = sum([x[1] for x in coff if x[1] is not None])
left_sum = toff_sum - cur_sum
#print("kafka left: {}".format(left_sum))
consumer.close()
except Exception as e:
print("{0},traceback.print_exc()))
return None
return left_sum
if __name__ == "__main__":
send_data_li = {"test": 1}
#kp = KProducer(topic="test", bootstrap_servers='127.0.0.1:9001,127.0.0.1:9002')
kp = KProducer(bootstrap_servers="1.1.1.1:9092")
# 同步发送
#kp.sync_producer(send_data_li)
# 异步发送
# kp.asyn_producer(send_data_li)
# 异步+回调
kp.asyn_producer_callback(topic="test",data=send_data_li)
#kp.close_producer()
#cp = PConsumers(bootstrap_servers="1.1.1.1:9092",topic="detect-file")
cp = PConsumers(bootstrap_servers="1.1.1.1:9092",group_id = "Boxer")
#cp = PConsumers(bootstrap_servers="1.1.1.1:9092",topic="custom-event")
#print(cp.get_count(topic="test"))
print(cp.get_message(topic="test"))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。