如何解决Python stomp.py 连接断开并且侦听器停止工作 Dockerfilesupervisor.conf
我正在使用 python stomp 库编写 python 脚本来连接和订阅 ActiveMQ 消息队列。
我的代码与文档“Dealing with disconnects”中的示例非常相似,其中添加了用于长时间运行的侦听器的循环中的计时器。
侦听器类正在接收和处理消息。但是几分钟后,连接断开,然后侦听器停止接收消息。
问题:
运行 connect_and_subscribe() 方法的 on_disconnected 方法正在被调用,但是在发生这种情况后,侦听器似乎停止工作。也许侦听器需要重新初始化?再次运行脚本后,重新创建侦听器,它再次开始接收消息,但是定期再次运行脚本是不切实际的。
问题 1:如何设置以自动重新连接和重新创建侦听器?
问题 2:有没有比 the timeout loop 更好的初始化长时间运行的监听器的方法?
import os,time,datetime,stomp
_host = os.getenv('MQ_HOST')
_port = os.getenv('MQ_PORT')
_user = os.getenv('MQ_USER')
_password = os.getenv('MQ_PASSWORD')
_queue = os.getenv('QUEUE_NAME')
# Subscription id is unique to the subscription in this case there is only one subscription per connection
sub_id = 1
def connect_and_subscribe(conn):
conn.connect(_user,_password,wait=True)
conn.subscribe(destination=_queue,id=sub_id,ack='client-individual')
print('connect_and_subscribe connecting {} to with connection id {}'.format(_queue,sub_id),flush=True)
class MqListener(stomp.ConnectionListener):
def __init__(self,conn):
self.conn = conn
self.sub_id = sub_id
print('MqListener init')
def on_error(self,frame):
print('received an error "%s"' % frame.body)
def on_message(self,headers,body):
print('received a message headers "%s"' % headers)
print('message body "%s"' % body)
time.sleep(1)
print('processed message')
print('AckNowledging')
self.conn.ack(headers['message-id'],self.sub_id)
def on_disconnected(self):
print('disconnected! reconnecting...')
connect_and_subscribe(self.conn)
def initialize_mqlistener():
conn = stomp.Connection([(_host,_port)],heartbeats=(4000,4000))
conn.set_listener('',MqListener(conn))
connect_and_subscribe(conn)
# https://github.com/jasonrbriggs/stomp.py/issues/206
while conn.is_connected():
time.sleep(2)
conn.disconnect()
if __name__ == '__main__':
initialize_mqlistener()
解决方法
我能够通过重构重试尝试循环和 on_error 处理程序来解决这个问题。另外,我已经在 docker 容器中安装并配置了 supervisor 来运行和管理监听器进程。这样,如果监听程序停止,它会被主管进程管理器自动重新启动。
更新了 python stomp 监听器脚本
init_listener.py
import os,json,time,datetime,stomp
_host = os.getenv('MQ_HOST')
_port = os.getenv('MQ_PORT')
_user = os.getenv('MQ_USER')
_password = os.getenv('MQ_PASSWORD')
# The listener will listen for messages that are relevant to this specific worker
# Queue name must match the 'worker_type' in job tracker file
_queue = os.getenv('QUEUE_NAME')
# Subscription id is unique to the subscription in this case there is only one subscription per connection
_sub_id = 1
_reconnect_attempts = 0
_max_attempts = 1000
def connect_and_subscribe(conn):
global _reconnect_attempts
_reconnect_attempts = _reconnect_attempts + 1
if _reconnect_attempts <= _max_attempts:
try:
conn.connect(_user,_password,wait=True)
print('connect_and_subscribe connecting {} to with connection id {} reconnect attempts: {}'.format(_queue,_sub_id,_reconnect_attempts),flush=True)
except Exception as e:
print('Exception on disconnect. reconnecting...')
print(e)
connect_and_subscribe(conn)
else:
conn.subscribe(destination=_queue,id=_sub_id,ack='client-individual')
_reconnect_attempts = 0
else:
print('Maximum reconnect attempts reached for this connection. reconnect attempts: {}'.format(_reconnect_attempts),flush=True)
class MqListener(stomp.ConnectionListener):
def __init__(self,conn):
self.conn = conn
self._sub_id = _sub_id
print('MqListener init')
def on_error(self,headers,body):
print('received an error "%s"' % body)
def on_message(self,body):
print('received a message headers "%s"' % headers)
print('message body "%s"' % body)
message_id = headers.get('message-id')
message_data = json.loads(body)
task_name = message_data.get('task_name')
prev_status = message_data.get('previous_step_status')
if prev_status == "success":
print('CALLING DO TASK')
resp = True
else:
print('CALLING REVERT TASK')
resp = True
if (resp):
print('Ack message_id {}'.format(message_id))
self.conn.ack(message_id,self._sub_id)
else:
print('NON Ack message_id {}'.format(message_id))
self.conn.nack(message_id,self._sub_id)
print('processed message message_id {}'.format(message_id))
def on_disconnected(self):
print('disconnected! reconnecting...')
connect_and_subscribe(self.conn)
def initialize_mqlistener():
conn = stomp.Connection([(_host,_port)],heartbeats=(4000,4000))
conn.set_listener('',MqListener(conn))
connect_and_subscribe(conn)
# https://github.com/jasonrbriggs/stomp.py/issues/206
while True:
time.sleep(2)
if not conn.is_connected():
print('Disconnected in loop,reconnecting')
connect_and_subscribe(conn)
if __name__ == '__main__':
initialize_mqlistener()
Supervisor 安装和配置
Dockerfile
为简洁起见删除了一些细节
# Install supervisor
RUN apt-get update && apt-get install -y supervisor
# Add the supervisor configuration file
ADD supervisord.conf /etc/supervisor/conf.d/supervisord.conf
# Start supervisor with the configuration file
CMD ["/usr/bin/supervisord","-c","/etc/supervisor/conf.d/supervisord.conf"]
supervisor.conf
[supervisord]
nodaemon=true
logfile=/home/exampleuser/logs/supervisord.log
[program:mqutils]
command=python3 init_listener.py
directory=/home/exampleuser/mqutils
user=exampleuser
autostart=true
autorestart=true
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。