如何解决如何正确处理与多个生产者的 AMQP 连接和 api
我正在开发一个 api,它使用 RabbitMQ 主题与来自事件架构的其他服务进行通信。 我的 API 的几个路由将发布事件,我希望在我的 API 中始终有一个实时连接。这样,对于每个新请求,我只创建一个新通道,并且只保留一个连接(我在阅读了 amqp 0-9-2 连接的成本很高后决定这样做)。
现在我有这样的事情:
class Singleton:
def __init__(self,target):
self.target = target
def __call__(self,*args,**kwargs) -> Any:
try:
return self._instance
except AttributeError:
self._instance = self.target(*args,**kwargs)
return self._instance
@Singleton
class RabbitConnection(pika.BlockingConnection):
def __init__(self):
ssl_options = None
if settings.RABBIT_SSL:
context = ssl.create_default_context()
ssl_options = pika.SSLOptions(context)
credentials = pika.credentials.PlainCredentials(
username=settings.RABBIT_USER,password=str(settings.RABBIT_PASSWORD),)
parameters = pika.ConnectionParameters(
host=settings.RABBIT_SERVER,port=settings.RABBIT_PORT,virtual_host="/",credentials=credentials,ssl_options=ssl_options,heartbeat=0
)
super().__init__(parameters=parameters)
class RabbitChannelProvider:
_channel = None
def __init__(self):
self._connection = RabbitConnection()
def __enter__(self) -> BlockingChannel:
if not self._channel:
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=settings.RABBIT_EXCHANGE,exchange_type=ExchangeType.topic,passive=False,durable=True,auto_delete=False,)
return self._channel
def __exit__(self,exc_type,exc_value,tb) -> None:
self._channel.close()
self._channel = None
class MessagePublisher(SingletonCreateMixin,PublisherMessageBackend):
id = "publisher_rabbitmq"
def publish(self,routing_key: str,body: Any) -> None:
try:
message = build_message(body=body)
logger.info(
event="message_broker",event_type=LogEventType.SUCCESS,location=LogLocation.BACKEND,body=body,message="Sending message",)
with RabbitChannelProvider() as channel:
channel.basic_publish(
exchange=settings.RABBIT_EXCHANGE,routing_key=routing_key,body=message,properties=pika.BasicProperties(
content_type="application/json"
),)
except Exception as err:
logger.error(
event="message_broker",event_type=LogEventType.ERROR,error=err,)
raise MessagebrokerException(message=err)
这是在api进程中只维护一个连接的正确方法吗?我这样做对吗?
解决方法
Pika 线程安全吗?
Pika 在代码中没有任何线程的概念。如果您想将 Pika 与线程一起使用,请确保每个线程都有一个 Pika 连接,并在该线程中创建。跨线程共享一个 Pika 连接是不安全的,只有一个例外:您可以从另一个线程调用连接方法 add_callback_threadsafe 以在活动的 pika 连接内安排回调。
因此您的解决方案可以使用单个线程
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。