微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

同步kafka生产者发送仍然有0秒超时?

如何解决同步kafka生产者发送仍然有0秒超时?

我在Python脚本中有以下代码

from kafka import KafkaProducer

kafka_producer = KafkaProducer(....)

kafka_producer.send(topic,value=message)
kafka_producer.flush()

logger.info('Done!') # this message is displayed

但是,我仍然看到以下消息。消息似乎已成功发送。为什么即使调用flush()也会显示“ 0秒超时”消息?

INFO:root:完成!

INFO:kafka.producer.kafka:以0秒超时关闭Kafka生产者。

INFO:kafka.producer.kafka:由于无法在超时0内完成挂起的请求,因此强制关闭生产者。

INFO:kafka.conn::正在关闭连接。

解决方法

在这种情况下,我认为错误消息不正确。相关代码-source on github

        if timeout > 0:
            if invoked_from_callback:
                log.warning("Overriding close timeout %s secs to 0 in order to"
                            " prevent useless blocking due to self-join. This"
                            " means you have incorrectly invoked close with a"
                            " non-zero timeout from the producer call-back.",timeout)
            else:
                # Try to close gracefully.
                if self._sender is not None:
                    self._sender.initiate_close()
                    self._sender.join(timeout)

        if self._sender is not None and self._sender.is_alive():
            log.info("Proceeding to force close the producer since pending"
                     " requests could not be completed within timeout %s.",timeout)
            self._sender.force_close()

我对代码的解释: 如果timeout0,我们将跳过优美的关闭代码,直接进入强制关闭。记录之前在_sender上检查的唯一条件是它存在并且is_alive()。当然存在,并且它还活着,因为它从未被告知要关闭。

如果timeout0,它从不检查是否可以完成任何事情。因此,在这种情况下,日志记录是不正确的。

timeout > 0的情况下,日志记录才有意义。 initiate_close()join()一样被调用,后者表示您只能稍后再检查is_alive()才能知道加入是否成功。如果尝试join()后它仍然存在,则将其强制关闭,并且无法在超时时间内完成请求。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。