微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python Asyncio 生产者-消费者工作流拥塞/增长队列

如何解决Python Asyncio 生产者-消费者工作流拥塞/增长队列

我一直在写一个 Python 应用程序,其中:

  • 一个 async 函数 producer,它通过 websocket 侦听传入的 items,并将这些 items 放入 queue = asyncio.Queue()
  • 一个 async 函数 consumer 执行 queue.get(),并通过不同的 websocket 连接查询 item_details

问题:将传入的items放入queue的平均速度远高于consumerget的速度来自 queue 的项目,因此 queue 会在一段时间后堆积起来。

问题:consumer 无需多处理不限制传入连接的情况下扩展的正确方法是什么? ?我还不是很精通asynciothreading。我想在单独的工作人员中运行 consumer,但据我了解 asyncio 的 run_in_executor 不能用于 async 函数,还有 asyncio.Queue() 不是线程安全。

解决方法

如果消费者执行 IO 绑定工作,您可以扩展其数量。而且你不关心多线程,因为 asyncio 基于非阻塞 IO 的思想,并且设计为在单线程中工作。如果没有本机异步替代方案,例如,您可以甚至必须使用线程来处理阻塞 IO。用于文件 IO,但这是一个单独的故事。

这里是一个简单的例子,用于说明生产者创建任务的速度比单个消费者处理它们的速度快的情况。我使用 asyncio.sleep 模拟 IO 工作负载。

import asyncio
import itertools

async def producer(queue: asyncio.Queue):
    """producer emulator,creates ~ 10 tasks per second"""
    sleep_seconds=0.1
    counter = itertools.count(1)
    while True:
        await queue.put(next(counter))
        await asyncio.sleep(sleep_seconds)


async def consumer(queue: asyncio.Queue,index):
    """slow io-bound consumer emulator,process ~ 5 tasks per second"""
    sleep_seconds=0.2
    while True:
        task = await queue.get()
        print(f"consumer={index},task={task},queue_size={queue.qsize()}")
        await asyncio.sleep(sleep_seconds)


async def main():
    q = asyncio.Queue()
    concurrency = 2  # consumers count
    tasks = [asyncio.create_task(consumer(q,i)) for i in range(concurrency)]
    tasks += [asyncio.create_task(producer(q))]
    await asyncio.wait(tasks)


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

单个消费者的输出,队列大小不断增长

consumer=0,task=1,queue_size=0
consumer=0,task=2,task=3,queue_size=1
consumer=0,task=4,queue_size=2
consumer=0,task=5,queue_size=3
consumer=0,task=6,queue_size=4
consumer=0,task=7,queue_size=5
consumer=0,task=8,queue_size=6
consumer=0,task=9,queue_size=7
consumer=0,task=10,queue_size=8

两个消费者的输出,队列为空

consumer=0,queue_size=0
consumer=1,queue_size=0

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。