微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Kafka-Python,生产者发送记录但消费者未收到

如何解决Kafka-Python,生产者发送记录但消费者未收到

我在将 kafka 用于我的 python 代码时遇到问题。 我使用 python 2.7.5,以及包 kafka-python

我想通过 kafka 主题发送 csv(300000 行,每行 20 个字段)。在此之前,我将每个序列化 排成一个 json 文件,直到这里,一切正常。 My Producer 发送文件的每一行,然后关闭。但 另一方面,我的消费者不消费任何东西...

就 kafka 而言,我有一个带有单个分区的主题。 我的 kafka 和 zookeeper 实例包含在 docker 容器中,但不包含在我的消费者或生产者中。

这是我的生产者代码: ...

def producer(path) :
    producer = KafkaProducer(bootstrap_servers="localhost:9092",retries = 5)

    with open(path,newline = '',encoding='utf-8-sig') as csvFile :
        reader = csv.DictReader(csvFile,fieldnames = dataElements)
        for row in reader :
            log = process_row(row)
            producer.send(topic = TOPIC,value = json.dumps(log).encode())
    producer.flush()
    producer.close()
    print('processing done')

这是我的消费者代码

consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
consumer.subscribe(TOPIC)
for message in consumer:
    log = json.loads(message.value.decode())
    print(log)
consumer.close()

在运行我的生产者后,我得到了“处理完成”。当我运行我的消费者时,我什么也没得到。 (我先运行我的消费者)。

我阅读了文档,它可能来自生产者配置。我仍然不确定我应该修改哪些参数(linger_ms、batch_size...?)。在我看来,认值适用于我的情况。

有什么想法吗?

解决方法

我使用以下内容找到了它:https://www.kaaproject.org/blog/kafka-docker https://github.com/wurstmeister/kafka-docker/wiki/Connectivity

需要在 docker-compose.yml 中添加一些环境变量,如 KAFKA_ADVERTISED_HOST,以便客户端可以从 docker 主机外部连接到 kafka broker。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。