微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 pika 和 connexion 初始化 RabbitMQ 消费者

如何解决如何使用 pika 和 connexion 初始化 RabbitMQ 消费者

我正在尝试设置通过 RabbitMQ 接收消息的 Python 微服务,同时具有用于 Kubernetes 运行状况检查的 /health REST 端点。我将 pika 用于 RabbitMQ 使用者,connexion 用于 REST 端点。

但是,当我在 main() 中启动 RabbitMQ 使用者时,连接应用程序不会启动。

python-app.py

#!/usr/bin/env python
import pika,sys,os,connexion
from flask import Flask,request,jsonify

app = connexion.FlaskApp(__name__,specification_dir='./')

def main():
    # Connection
    ...
    # Exchange and queues
    ...

    def callback(ch,method,properties,body):
        ...

    channel.basic_consume(queue='pg-python',on_message_callback=callback,auto_ack=True)

    print(' [*] Waiting for messages.')
    channel.start_consuming()
    app.run(port=8080,use_reloader=False)

@app.route('/api/v1/health',methods=['GET'])
def return_health():
    message = {'status':'Healthy! <3'}
    return jsonify(message)

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

输出

[*] Waiting for messages.

如何正确初始化两个组件?我需要使用线程吗?

解决方法

我已经通过在单独的线程中初始化 RabbitMQ 消费者来解决这个问题:

#!/usr/bin/env python
import pika,sys,os,threading
from flask import Flask,request,jsonify

app = Flask(__name__)

def start_rmq_connection():
    # Connection
    ...
    # Exchange and queues
    ...

    def callback(ch,method,properties,body):
        ...

    channel.basic_consume(queue='pg-python',on_message_callback=callback,auto_ack=True)
    print(' [*] Waiting for messages.')
    channel.start_consuming()

@app.route('/api/v1/health',methods=['GET'])
def return_health():
    message = {'status':'Healthy! <3'}
    return jsonify(message)

if __name__ == '__main__':
    try:
        thread_1 = threading.Thread(target=start_rmq_connection)
        thread_1.start()
        thread_1.join(0)
        app.run()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?