微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在线程中运行的异步代码的正确销毁过程

如何解决在线程中运行的异步代码的正确销毁过程

以下是通用 websocket 流媒体的(工作)代码

它创建一个守护线程,从中执行 asyncio.run(...)

asyncio 代码产生 2 个任务,它们永远不会完成。

如何正确销毁这个对象?

其中一项任务是执行保活“ping”,因此我可以使用标志轻松退出该循环。但是另一个阻塞了来自 websocket 的消息。

import json
import aiohttp
import asyncio
import gzip

import asyncio
from threading import Thread

class WebSocket:
    KEEPALIVE_INTERVAL_S = 10

    def __init__(self,url,on_connect,on_msg):
        self.url = url
        self.on_connect = on_connect
        self.on_msg = on_msg

        self.streams = {}
        self.worker_thread = Thread(name='WebSocket',target=self.thread_func,daemon=True).start()

    def thread_func(self):
        asyncio.run(self.aio_run())

    async def aio_run(self):
        async with aiohttp.ClientSession() as session:

            self.ws = await session.ws_connect(self.url)

            await self.on_connect(self)

            async def ping():
                while True:
                    print('KEEPALIVE')
                    await self.ws.ping()
                    await asyncio.sleep(WebSocket.KEEPALIVE_INTERVAL_S)

            async def main_loop():
                async for msg in self.ws:
                    def extract_data(msg):
                        if msg.type == aiohttp.WSMsgType.BINARY:
                            as_bytes = gzip.decompress(msg.data)
                            as_string = as_bytes.decode('utf8')
                            as_json = json.loads(as_string)
                            return as_json

                        elif msg.type == aiohttp.WSMsgType.TEXT:
                            return json.loads(msg.data)

                        elif msg.type == aiohttp.WSMsgType.ERROR:
                            print('⛔️ aiohttp.WSMsgType.ERROR')

                        return msg.data

                    data = extract_data(msg)

                    self.on_msg(data)

            # May want this approach if we want to handle graceful shutdown
            # W.task_ping = asyncio.create_task(ping())
            # W.task_main_loop = asyncio.create_task(main_loop())

            await asyncio.gather(
                ping(),main_loop()
            )

    async def send_json(self,J):
        await self.ws.send_json(J)

解决方法

我建议使用 asyncio.run_coroutine_threadsafe 而不是 asyncio.run。它返回一个您可以取消的 concurrent.futures.Future 对象:

def thread_func(self):
    self.future = asyncio.run_coroutine_threadsafe(
        self.aio_run(),asyncio.get_event_loop()
    )

# somewhere else
self.future.cancel()

另一种方法是将 pingmain_loop 设为任务,并在必要时取消它们:

# inside `aio_run`
self.task_ping = asyncio.create_task(ping())
self.main_loop_task = asyncio.create_task(main_loop())

await asyncio.gather(
    self.task_ping,self.main_loop_task
    return_exceptions=True
)


# somewhere else
self.task_ping.cancel()
self.main_loop_task.cancel()

这不会改变 aio_run 也应该与 asyncio.run_coroutine_threadsafe 一起调用的事实。 asyncio.run 应该用作 asyncio 程序的主要入口点,并且应该只调用一次。

,

我想建议解决方案的另一种变体。在完成协程(任务)时,我更喜欢尽量减少 cancel() 的使用(但不排除),因为有时它会使调试业务逻辑变得困难(请记住,asyncio.CancelledError 不继承自Exception).

在您的情况下,代码可能如下所示(仅更改):

class WebSocket:
    KEEPALIVE_INTERVAL_S = 10

    def __init__(self,url,on_connect,on_msg):
        # ...      
        self.worker_thread = Thread(name='WebSocket',target=self.thread_func)
        self.worker_thread.start()

    async def aio_run(self):
        self._loop = asyncio.get_event_loop()
        # ...
 
        self._ping_task = asyncio.create_task(ping())
        self._main_task = asyncio.create_task(main_loop())

        await asyncio.gather(
            self._ping_task,self._main_task,return_exceptions=True
        )
        # ...

    async def stop_ping(self):
        self._ping_task.cancel()
        try:
            await self._ping_task
        except asyncio.CancelledError:
            pass

    async def _stop(self):
        # wait ping end before socket closing
        await self.stop_ping()
        # lead to correct exit from `async for msg in self.ws`
        await self.ws.close()

    def stop(self):
        # wait stopping ping and closing socket
        asyncio.run_coroutine_threadsafe(
            self._stop(),self._loop
        ).result() 
        self.worker_thread.join()  # wait thread finish

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。