如何解决Kafka-Python,生产者发送记录但消费者未收到
我在将 kafka 用于我的 python 代码时遇到问题。 我使用 python 2.7.5,以及包 kafka-python。
我想通过 kafka 主题发送 csv(300000 行,每行 20 个字段)。在此之前,我将每个序列化 排成一个 json 文件,直到这里,一切正常。 My Producer 发送文件的每一行,然后关闭。但 另一方面,我的消费者不消费任何东西...
就 kafka 而言,我有一个带有单个分区的主题。 我的 kafka 和 zookeeper 实例包含在 docker 容器中,但不包含在我的消费者或生产者中。
这是我的生产者代码: ...
def producer(path) :
producer = KafkaProducer(bootstrap_servers="localhost:9092",retries = 5)
with open(path,newline = '',encoding='utf-8-sig') as csvFile :
reader = csv.DictReader(csvFile,fieldnames = dataElements)
for row in reader :
log = process_row(row)
producer.send(topic = TOPIC,value = json.dumps(log).encode())
producer.flush()
producer.close()
print('processing done')
这是我的消费者代码:
consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
consumer.subscribe(TOPIC)
for message in consumer:
log = json.loads(message.value.decode())
print(log)
consumer.close()
在运行我的生产者后,我得到了“处理完成”。当我运行我的消费者时,我什么也没得到。 (我先运行我的消费者)。
我阅读了文档,它可能来自生产者配置。我仍然不确定我应该修改哪些参数(linger_ms、batch_size...?)。在我看来,默认值适用于我的情况。
有什么想法吗?
解决方法
我使用以下内容找到了它:https://www.kaaproject.org/blog/kafka-docker https://github.com/wurstmeister/kafka-docker/wiki/Connectivity
需要在 docker-compose.yml 中添加一些环境变量,如 KAFKA_ADVERTISED_HOST,以便客户端可以从 docker 主机外部连接到 kafka broker。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。