微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

当队列已满时,带有 rdkafka 的 Pykafka 不会阻塞生产者

如何解决当队列已满时,带有 rdkafka 的 Pykafka 不会阻塞生产者

我正在运行一个使用 pykafka 生成消息的简单场景,但是当我同时启用 rdkafka 消息大小相对较大时出现异常。

我的代码与示例非常相似here

with topic.get_producer(use_rdkafka=True,delivery_reports=True,block_on_queue_full=True) as p:
    for i in range(40000):
        print(i)
        p._protocol_version = 1
        p.produce(str.encode(msg_str),partition_key=str(f'{i}').encode(),timestamp=datetime.datetime.Now())
        while True:
            try:
                msg,exc = p.get_delivery_report(block=False,timeout=.1)
                if exc is not None:
                    print('Failed to deliver msg {}: {}'.format(msg.partition_key,repr(exc)))
                else:
                    print(f'Successfully delivered msg {msg.partition_key}')
            except queue.Empty:
                break

msg_str 只是一个短字符串时,一切正常。但是当长度大约为 30000 个字符时,生产在大约 38847 次迭代后停止,一段时间后我得到一个异常 pykafka.exceptions.ProducerQueueFullError

看起来我添加block_on_queue_full=True 标志是明确的 ignored

    if rdkafka and use_rdkafka:
        Cls = rdkafka.RdKafkaProducer
        kwargs.pop('block_on_queue_full',None)

在我可以继续生产之前启用等待的正确顺序是什么?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。