如何解决当队列已满时,带有 rdkafka 的 Pykafka 不会阻塞生产者
我正在运行一个使用 pykafka
生成消息的简单场景,但是当我同时启用 rdkafka
和消息大小相对较大时出现异常。
with topic.get_producer(use_rdkafka=True,delivery_reports=True,block_on_queue_full=True) as p:
for i in range(40000):
print(i)
p._protocol_version = 1
p.produce(str.encode(msg_str),partition_key=str(f'{i}').encode(),timestamp=datetime.datetime.Now())
while True:
try:
msg,exc = p.get_delivery_report(block=False,timeout=.1)
if exc is not None:
print('Failed to deliver msg {}: {}'.format(msg.partition_key,repr(exc)))
else:
print(f'Successfully delivered msg {msg.partition_key}')
except queue.Empty:
break
当 msg_str
只是一个短字符串时,一切正常。但是当长度大约为 30000 个字符时,生产在大约 38847 次迭代后停止,一段时间后我得到一个异常 pykafka.exceptions.ProducerQueueFullError
。
看起来我添加的 block_on_queue_full=True
标志是明确的 ignored
if rdkafka and use_rdkafka:
Cls = rdkafka.RdKafkaProducer
kwargs.pop('block_on_queue_full',None)
在我可以继续生产之前启用等待的正确顺序是什么?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。