微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

1 项异步队列 - 这是一些标准的东西吗?

如何解决1 项异步队列 - 这是一些标准的东西吗?

在我的一个 asyncio 项目中,我经常使用一种同步方法,并且想知道它是否是某种带有名称的标准工具,我可以给 google 以了解更多信息。我使用术语“1 项队列”只是因为我没有更好的名字。这是一个降级队列,与 Queue(maxsize=1) 无关。

#  [controller] ---- commands ---> [worker]

控制器向工作线程发送命令(queue.put,实际上是put_Nowait),工作线程等待它们(queue.get)并执行它们,但特殊规则是唯一的last 命令很重要,它会立即替换所有先前未完成的命令。因此,队列中等待执行的命令永远不会超过 1 个。

为了实现这一点,控制器在放置之前清除队列。没有 queue.clear,因此它必须丢弃(使用 get_Nowait)等待项(如果有)。 (queue.clear 的缺失引发了我的怀疑,从而导致了这个问题。)

在工作人员方面,如果命令执行需要休眠,则将其替换为带有超时的 newcmd=queue.get。当超时发生时,它是一个睡眠;当 get 成功时,当前工作被中止并开始执行 newcmd

解决方法

您使用的队列类型不是标准的 - 存在一次性队列之类的东西,但完全不同。

队列并不真正适合您的用例,尽管您付出了一些努力。你真的不需要任何类型的排队,你需要一个包含单个对象(可以替换)和唤醒机制的插槽。 asyncio.Event 可用于唤醒,您可以将负载对象(命令)附加到事件的属性。例如:

async def worker(evt):
    while True:
        await evt.wait()
        evt.clear()
        if evt.last_command is None:
            continue
        last_command = evt.last_command
        evt.last_command = None
        # execute last_command,possibly with timeout
        print(last_command)

async def main():
    evt = asyncio.Event()
    workers = [asyncio.create_task(worker(evt)) for _ in range(5)]
    for i in itertools.count():
        await asyncio.sleep(1)
        evt.last_command = f"foo {i}"
        evt.set()

asyncio.run(main())

这与基于队列的方法之间的一个区别是,设置事件将唤醒所有工作人员(如果有多个工作人员),即使第一个工作人员立即调用 evt.clear()。另一方面,队列项将保证移交给 queue.get() 的单个等待者。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。