如何解决FastAPI websocket 连接导致 docker 容器内的 CPU 峰值达到 100%
我正在为两个或更多用户开发私人聊天功能,以便相互交流。我有一个 websocket 连接的端点,只有经过身份验证的用户才能在客户端和服务器之间进行握手
接受websocket连接时出现问题。消费者处理程序在无限循环中平稳运行,等待来自客户端的消息实际执行某些请求的特定任务,但另一端的生产者在无限循环中挂起并导致 cpu 峰值 高达 100%
显然,我需要一个特定 redis 频道的监听器,我可以在其中实时获取用户的所有消息,我应该以某种方式监听它,而循环可以做到这一点,但由于 cpu 峰值显然这不是一个好的解决方案。
# api.py
async def consumer_handler(service):
"""Messages received on the websocket connection Consumer - (Publisher)"""
try:
while True:
received_data = await service.websocket.receive_json()
if received_data['event_type'] == "online.users":
await service.get_online_user_status(received_data['role_id'])
elif received_data['event_type'] == "message.user":
await service.send_message(received_data['user_id'],received_data['content'])
elif received_data['event_type'] == "info":
await service.get_info()
except WebSocketdisconnect:
logger.debug("WebSocketdisconnect - consumer handler disconnected")
async def producer_handler(service):
"""Messages generated at the backend to send to the websocket Producer - (Subscriber)"""
try:
while True:
if service.pubsub.subscribed:
message = await service.pubsub.get_message(ignore_subscribe_messages=True)
if message:
await service.websocket.send_json(message['data'].decode())
except (ConnectionClosedOK,aioredis.exceptions.ConnectionError) as e:
logger.debug(f"{e.__class__}","producer handler disconnected")
@chat_app.websocket("/")
async def websocket_endpoint(websocket: WebSocket,current_user: User = Depends(is_authenticated_ws)):
if not current_user:
return
async with ConnectionContextManager(user_id=current_user.id,websocket=websocket) as service:
producer_task = asyncio.ensure_future(producer_handler(service))
consumer_task = asyncio.ensure_future(consumer_handler(service))
done,pending = await asyncio.wait(
[consumer_task,producer_task],return_when=asyncio.FirsT_COMPLETED
)
for task in pending:
task.cancel()
此端点处理 websockets documentation
中描述的生产者/订阅者逻辑
#websocket_utils.py
class WebsocketService:
"""
This acts like a service for websocket,is returned within the context manager
this class is used to not interact with consumer directly,instead interact it with the manager
"""
def __init__(self,*,user_id: UUID4,websocket: WebSocket,pubsub: PubSub):
self.user_id = user_id
self.websocket = websocket
self.pubsub = pubsub
async def get_online_user_status(self,role_id):
await consumer.online_user_status_per_role(role_id,self.websocket)
async def send_message(self,content: str):
await consumer.send_message_to_user(user_id=user_id,message=content,websocket=self.websocket)
async def get_info(self):
await consumer.fetch_info(self.websocket)
class ConnectionContextManager:
"""
This context manager handles the websocket connection
on enter,it returns a controller for the websocket events
"""
websocket_service: WebsocketService
def __init__(self,websocket: WebSocket):
self.websocket_service = WebsocketService(user_id=user_id,websocket=websocket,pubsub=websocket.app.redis.pubsub())
async def __aenter__(self):
logger.debug("Context manager enter")
await consumer.connect(
user_id=self.websocket_service.user_id,websocket=self.websocket_service.websocket,pubsub=self.websocket_service.pubsub
)
return self.websocket_service
async def __aexit__(self,exc_type,exc_val,exc_tb) -> None:
await consumer.disconnect(
user_id=self.websocket_service.user_id,pubsub=self.websocket_service.pubsub,)
logger.debug("Context manager exit")
这个上下文管理器确保每个用户都有自己的发布订阅频道,并为实际的消费者创建一个控制器,这样我就不必在需要特定资源时一直传递 user_id 和其他方便的参数。
class ConnectionConsumer:
__redis: aioredis.Redis
def __init__(self):
self.__redis = aioredis.from_url(settings.ws_redis_url,encoding='utf-8',decode_responses=True)
async def __send_json(self,obj: dict,websocket: WebSocket):
await websocket.send_json(obj)
async def connect(self,pubsub: PubSub):
# Accept connection if authorization is successful,set the user online and subscribe to its channel layer
await websocket.accept()
await self.__redis.set(f"status:{user_id}","1") # status:UUID4 (means online)
await pubsub.subscribe(f"channel:{user_id}") # subscribe to itself's channel
async def disconnect(self,pubsub: PubSub):
# Gracefully disconnect from the websocket and remove the channel layer from pubsub
await self.__redis.delete(f"status:{user_id}")
await pubsub.unsubscribe(f"channel:{user_id}")
await pubsub.close()
await self.__redis.close()
await websocket.close()
这是从上下文管理器返回的服务调用的实际使用者。
CONTAINER ID NAME cpu % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS
4ed80g7fb093 s_be 1.77% 76.09MiB / 15.29GiB 0.49% 37.3kB / 21.1kB 0B / 0B 7
这是仅处理消费者时容器的 docker stats
CONTAINER ID NAME cpu % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS
4ed80g7fb093 s_be 100.36% 76.08MiB / 15.29GiB 0.49% 42.9kB / 25.7kB 0B / 0B 7
这是当生产者和消费者处理程序都在运行时容器的 docker stats
我也尝试过拆分连接,但我遇到了同样的问题。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。