使用 pika 异步扭曲的更简单方法?

如何解决使用 pika 异步扭曲的更简单方法?

这是我使用 rabbitmq 的第一个项目,我完全迷失了,因为我不确定解决问题的最佳方法是什么。

该程序相当简单,它只是侦听警报事件,然后将这些事件放入rabbitmq 队列中,但我在程序的架构上苦苦挣扎。

如果我为每个事件打开、发布然后关闭连接,我会增加很多延迟,并且会传输不必要的包(甚至比平时更多,因为我使用的是 TLS)...

如果我保持连接打开,并创建一个发布消息的函数(我只使用一个队列,非常基本),我最终会遇到问题,因为多个事件可能同时发生,我的程序将如果与rabbitmq 代理的连接结束,不知道该怎么办。

阅读他们的文档,该解决方案似乎使用了他们的“连接适配器”之一,这对我来说就像手套一样适合我,因为我只是将基本套接字中的所有连接内容重写为使用 Twisted(我真的很喜欢他们的高级方法)。但有一个问题。对于几乎不认为自己是“中级”的人来说,他们的 "basic example" 相当复杂。

在一个完美的世界中,我将能够在与“警报服务器”相同的反应器中运行该服务并调用一个方法来发布消息。但我很难理解代码。有没有和鼠兔一起工作过的人可以给我指出更好的方向,甚至告诉我是否有更简单的方法?

解决方法

好吧,我会发布对我有用的内容。可能不是最好的选择,但也许它可以帮助遇到同样问题的人。

首先我决定放弃 Twisted 并使用 Asyncio(没什么个人的,我只是想使用它,因为它已经在 python 中),即使 pika 有一个很好的使用异步的例子,我尝试并发现使用 aio_pika 更容易.

我最终得到了 2 个主要功能。一个用于发布者,另一个用于订阅者。 波纹管是我的代码对我有用...

# -*- coding: utf-8 -*-

import asyncio
import aio_pika
from myapp import conf

QUEUE_SEND = []


def add_queue_send(msg):
    """Add MSG to QUEUE

    Args:
        msg (string): JSON
    """
    QUEUE_SEND.append(msg)


def build_url(amqp_user,amqp_pass,virtual_host):
    """Build Auth URL

    Args:
        amqp_user (str): User name
        amqp_pass (str): Password
        virtual_host (str): Virtual Host

    Returns:
        str: AMQP URL
    """
    return ''.join(['amqps://',amqp_user,':','@',conf.get('amqp_host'),'/',virtual_host,'?cafile=',conf.get('ca_cert'),'&keyfile=',conf.get('client_key'),'&certfile=',conf.get('client_cert'),'&no_verify_ssl=0'])


async def process_message(message: aio_pika.IncomingMessage):
    """Read a new message

    Args:
        message (aio_pika.IncomingMessage): Mensagem
    """
    async with message.process():
        #   TODO: Do something with the new message
        await asyncio.sleep(1)


async def consumer(url):
    """Keep listening to a MQTT queue

    Args:
        url (str): URL

    Returns:
        aio_pika.Connection: Conn?
    """
    connection = await aio_pika.connect_robust(url=url)
    # Channel
    channel = await connection.channel()
    # Max concurrent messages?
    await channel.set_qos(prefetch_count=100)
    # Queue
    queue = await channel.declare_queue(conf.get('amqp_queue_client'))
    #   What call when a new message is received
    await queue.consume(process_message)
    #   Returns the connection?
    return connection


async def publisher(url):
    """Send messages from the queue.

    Args:
        url (str): URL de autenticação
    """
    connection = await aio_pika.connect_robust(url=url)
    # Channel
    channel = await connection.channel()
    while True:
        if QUEUE_SEND:
            #   If the list (my queue) is not empty
            msg = aio_pika.Message(body=QUEUE_SEND.pop().encode())
            await channel.default_exchange.publish(msg,routing_key='queue')
        else:
            #   Just wait
            await asyncio.sleep(1)
    await connection.close()

我开始使用``loop.create_task```。

正如我所说。它对我有用(即使我的代码的另一部分仍然有问题),但我不想让这个问题悬而未决,因为大多数人都会遇到同样的问题。

如果您知道更好的方法或更优雅的方法,请分享。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)> insert overwrite table dwd_trade_cart_add_inc > select data.id, > data.user_id, > data.course_id, > date_format(
错误1 hive (edu)> insert into huanhuan values(1,'haoge'); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive> show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 <configuration> <property> <name>yarn.nodemanager.res