微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

FastAPI websocket 连接导致 docker 容器内的 CPU 峰值达到 100%

如何解决FastAPI websocket 连接导致 docker 容器内的 CPU 峰值达到 100%

我正在为两个或更多用户开发私人聊天功能,以便相互交流。我有一个 websocket 连接的端点,只有经过身份验证的用户才能在客户端和服务器之间进行握手

接受websocket连接时出现问题。消费者处理程序在无限循环中平稳运行,等待来自客户端的消息实际执行某些请求的特定任务,但另一端的生产者在无限循环中挂起并导致 cpu 峰值 高达 100%

显然,我需要一个特定 redis 频道的监听器,我可以在其中实时获取用户的所有消息,我应该以某种方式监听它,而循环可以做到这一点,但由于 cpu 峰值显然这不是一个好的解决方案。

# api.py

async def consumer_handler(service):
    """Messages received on the websocket connection Consumer - (Publisher)"""
    try:
        while True:
            received_data = await service.websocket.receive_json()
            if received_data['event_type'] == "online.users":
                await service.get_online_user_status(received_data['role_id'])
            elif received_data['event_type'] == "message.user":
                await service.send_message(received_data['user_id'],received_data['content'])
            elif received_data['event_type'] == "info":
                await service.get_info()
    except WebSocketdisconnect:
        logger.debug("WebSocketdisconnect - consumer handler disconnected")

async def producer_handler(service):
    """Messages generated at the backend to send to the websocket Producer - (Subscriber)"""
    try:
        while True:
            if service.pubsub.subscribed:
                message = await service.pubsub.get_message(ignore_subscribe_messages=True)
                if message:
                    await service.websocket.send_json(message['data'].decode())
    except (ConnectionClosedOK,aioredis.exceptions.ConnectionError) as e:
        logger.debug(f"{e.__class__}","producer handler disconnected")


@chat_app.websocket("/")
async def websocket_endpoint(websocket: WebSocket,current_user: User = Depends(is_authenticated_ws)):
    if not current_user:
        return

    async with ConnectionContextManager(user_id=current_user.id,websocket=websocket) as service:
        producer_task = asyncio.ensure_future(producer_handler(service))
        consumer_task = asyncio.ensure_future(consumer_handler(service))
        done,pending = await asyncio.wait(
            [consumer_task,producer_task],return_when=asyncio.FirsT_COMPLETED
        )
        for task in pending:
            task.cancel()

此端点处理 websockets documentation

中描述的生产者/订阅者逻辑

#websocket_utils.py

class WebsocketService:
    """
    This acts like a service for websocket,is returned within the context manager
    this class is used to not interact with consumer directly,instead interact it with the manager
    """

    def __init__(self,*,user_id: UUID4,websocket: WebSocket,pubsub: PubSub):
        self.user_id = user_id
        self.websocket = websocket
        self.pubsub = pubsub

    async def get_online_user_status(self,role_id):
        await consumer.online_user_status_per_role(role_id,self.websocket)

    async def send_message(self,content: str):
        await consumer.send_message_to_user(user_id=user_id,message=content,websocket=self.websocket)

    async def get_info(self):
        await consumer.fetch_info(self.websocket)


class ConnectionContextManager:
    """
    This context manager handles the websocket connection
    on enter,it returns a controller for the websocket events
    """

    websocket_service: WebsocketService

    def __init__(self,websocket: WebSocket):
        self.websocket_service = WebsocketService(user_id=user_id,websocket=websocket,pubsub=websocket.app.redis.pubsub())

    async def __aenter__(self):
        logger.debug("Context manager enter")
        await consumer.connect(
            user_id=self.websocket_service.user_id,websocket=self.websocket_service.websocket,pubsub=self.websocket_service.pubsub
        )
        return self.websocket_service

    async def __aexit__(self,exc_type,exc_val,exc_tb) -> None:
        await consumer.disconnect(
            user_id=self.websocket_service.user_id,pubsub=self.websocket_service.pubsub,)
        logger.debug("Context manager exit")

这个上下文管理器确保每个用户都有自己的发布订阅频道,并为实际的消费者创建一个控制器,这样我就不必在需要特定资源时一直传递 user_id 和其他方便的参数。


class ConnectionConsumer:
    __redis: aioredis.Redis

    def __init__(self):
        self.__redis = aioredis.from_url(settings.ws_redis_url,encoding='utf-8',decode_responses=True)

    async def __send_json(self,obj: dict,websocket: WebSocket):
        await websocket.send_json(obj)

    async def connect(self,pubsub: PubSub):
        # Accept connection if authorization is successful,set the user online and subscribe to its channel layer
        await websocket.accept()
        await self.__redis.set(f"status:{user_id}","1")  # status:UUID4 (means online)
        await pubsub.subscribe(f"channel:{user_id}")  # subscribe to itself's channel

    async def disconnect(self,pubsub: PubSub):
        # Gracefully disconnect from the websocket and remove the channel layer from pubsub
        await self.__redis.delete(f"status:{user_id}")
        await pubsub.unsubscribe(f"channel:{user_id}")
        await pubsub.close()
        await self.__redis.close()
        await websocket.close()

这是从上下文管理器返回的服务调用的实际使用者。

CONTAINER ID   NAME           cpu %     MEM USAGE / LIMIT     MEM %     NET I/O           BLOCK I/O   PIDS
4ed80g7fb093   s_be   1.77%     76.09MiB / 15.29GiB   0.49%     37.3kB / 21.1kB   0B / 0B     7

这是仅处理消费者时容器的 docker stats

CONTAINER ID   NAME           cpu %     MEM USAGE / LIMIT     MEM %     NET I/O           BLOCK I/O   PIDS
4ed80g7fb093   s_be   100.36%   76.08MiB / 15.29GiB   0.49%     42.9kB / 25.7kB   0B / 0B     7

这是当生产者和消费者处理程序都在运行时容器的 docker stats

我也尝试过拆分连接,但我遇到了同样的问题。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。