如何解决pika 和 rabbitMQ 每次从队列中获取一条消息,但在 python 中不断调用回调函数
我希望进程正常退出,直到其当前任务完成。 (如果它在回调函数中做某事)。逻辑很简单,一旦有 SIGINT,我就会引发 is_interrupt
,并且在我的无限 while 循环中,我不断检查队列中是否有消息。
如果队列中有消息,我会生成一个子进程来执行回调函数。在回调函数中,它以 sys.exit(0)
结尾,以确保一旦孩子完成回调,它就会退出。
目前我面临的问题是:
如果发送方将一些消息发送到队列中,然后接收方(下面的代码)启动。它可以处理那些存在的消息,然后继续进入回调函数。子进程不断生成并失败。 (我不明白是什么导致这种情况发生。一旦子进程完成它的任务,它就会消失,主进程将继续循环并等待消息。
如果接收者先启动,然后发送者发布一些消息,接收者将什么也收不到。
请帮帮我。谢谢!
import pika,sys,os,signal,requests
is_interrupt = 0
def signal_handler(sig,frame):
global is_interrupt
is_interrupt = 1
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
queue_state = channel.queue_declare(queue='task_queue',durable=True,passive=True)
# Register signal handler.
signal.signal(signal.SIGINT,signal_handler)
def callback(ch,method,properties,body):
# Do something here..
os._exit(0)
print('Worker waiting for messages. To exit press CTRL+C')
while(True):
if is_interrupt == 1:
print("\nExit")
os._exit(0)
queue_empty = queue_state.message_count == 0
#print(queue_empty)
if not queue_empty:
child_pid = os.fork()
if child_pid == 0:
method,body = channel.basic_get(queue='task_queue',auto_ack=True)
callback(channel,body)
else:
queue_empty = True
os.wait()
if __name__ == '__main__':
main()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。