微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何正确处理与多个生产者的 AMQP 连接和 api

如何解决如何正确处理与多个生产者的 AMQP 连接和 api

我正在开发一个 api,它使用 RabbitMQ 主题与来自事件架构的其他服务进行通信。 我的 API 的几个路由将发布事件,我希望在我的 API 中始终有一个实时连接。这样,对于每个新请求,我只创建一个新通道,并且只保留一个连接(我在阅读了 amqp 0-9-2 连接的成本很高后决定这样做)。

现在我有这样的事情:

class Singleton:
    def __init__(self,target):
        self.target = target

    def __call__(self,*args,**kwargs) -> Any:
        try:
            return self._instance
        except AttributeError:
            self._instance = self.target(*args,**kwargs)
            return self._instance


@Singleton
class RabbitConnection(pika.BlockingConnection):
    def __init__(self):
        ssl_options = None

        if settings.RABBIT_SSL:
            context = ssl.create_default_context()
            ssl_options = pika.SSLOptions(context)

        credentials = pika.credentials.PlainCredentials(
            username=settings.RABBIT_USER,password=str(settings.RABBIT_PASSWORD),)
        parameters = pika.ConnectionParameters(
            host=settings.RABBIT_SERVER,port=settings.RABBIT_PORT,virtual_host="/",credentials=credentials,ssl_options=ssl_options,heartbeat=0
        )
        super().__init__(parameters=parameters)


class RabbitChannelProvider:
    _channel = None

    def __init__(self):
        self._connection = RabbitConnection()

    def __enter__(self) -> BlockingChannel:
        if not self._channel:
            self._channel = self._connection.channel()
            self._channel.exchange_declare(
                exchange=settings.RABBIT_EXCHANGE,exchange_type=ExchangeType.topic,passive=False,durable=True,auto_delete=False,)

        return self._channel

    def __exit__(self,exc_type,exc_value,tb) -> None:
        self._channel.close()
        self._channel = None


class MessagePublisher(SingletonCreateMixin,PublisherMessageBackend):
    id = "publisher_rabbitmq"

    def publish(self,routing_key: str,body: Any) -> None:
        try:
            message = build_message(body=body)
            logger.info(
                event="message_broker",event_type=LogEventType.SUCCESS,location=LogLocation.BACKEND,body=body,message="Sending message",)
            with RabbitChannelProvider() as channel:
                channel.basic_publish(
                    exchange=settings.RABBIT_EXCHANGE,routing_key=routing_key,body=message,properties=pika.BasicProperties(
                        content_type="application/json"
                    ),)
        except Exception as err:
            logger.error(
                event="message_broker",event_type=LogEventType.ERROR,error=err,)
            raise MessagebrokerException(message=err)

这是在api进程中只维护一个连接的正确方法吗?我这样做对吗?

解决方法

形成官方pika documentation

Pika 线程安全吗?

Pika 在代码中没有任何线程的概念。如果您想将 Pika 与线程一起使用,请确保每个线程都有一个 Pika 连接,并在该线程中创建。跨线程共享一个 Pika 连接是不安全的,只有一个例外:您可以从另一个线程调用连接方法 add_callback_threadsafe 以在活动的 pika 连接内安排回调。

因此您的解决方案可以使用单个线程

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。