微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在插槽可用时立即将新项目添加到 asyncio 队列?

如何解决如何在插槽可用时立即将新项目添加到 asyncio 队列?

这段代码是这样工作的:

名为“ids”的列表包含 ID 号。我通过 ID 号下载特定的消息。 'nDownload' 是列表索引。队列的大小值等于 5。

我从列表中选取项目,一次下载一条消息并将其添加到队列中。 当 nDownload 等于 6 时:

  1. 出现QueueFull 异常。
  2. 创建 5 个工作器。
  3. 工作人员从消息中提取元数据用于其他目的。
  4. await queue.join() 阻塞直到队列中的所有项目都被获取和处理。
  5. 结束 -> 删除工作人员。

代码有效,直到现在我都没有问题。

                nDownload = 0
                workers = []
                while (nDownload <= len(ids)):                        
                    try:
                        async for msg in get_messages(channel,ids=ids[nDownload]):
                           nDownload = nDownload + 1
                            try:   
                                queue.put_Nowait(msg)
                            except (asyncio.QueueFull,IndexError) as qErr:
                                nDownload = nDownload - 1 
                                workers = [asyncio.create_task(worker(queue)) for _ in range(5)] 
                                await queue.join() 
                                for cancel in workers:
                                    cancel.cancel()                                    
                    except IndexError as iErr:
                        break    

问题: 有时消息有不同的大小。例如:

消息 1 = 8 分钟内下载了 100MB

消息 2 = 5 秒内下载了 1MB

一旦它下载了最短的消息(消息 2),我就会在队列中获得一个空闲的“插槽”。 不幸的是,我必须等待消息 1,因为 queue.join()

此时如何将新项目添加到队列中?

为什么我要使用 queue.join() ? 因为我不知道如何将最多 5 条消息添加到队列中,等待下载并继续 我真的需要下载一组消息而不是一次全部 谢谢

编辑: 是的,我的工人是这样定义的(简化)

async def worker(queue):
while True:
    queue_msg = await queue.get()
    loop = asyncio.get_event_loop()
    try:
        task = loop.create_task(extract(queue_msg))
        await asyncio.wait_for(task,timeout=timeout)

    except errors.Fail:
  #Here I have to requeue the message when it fails,#so it requeues the ID in order to download the same msg later
     await queue.put(queue_msg.id)
    except asyncio.TimeoutError: 
     #requeue the msg etcc...

    finally:    
        queue.task_done()

你的回答很聪明,谢谢 但是,我选择队列“大小 > 1”,因为我需要在“提取”任务失败时重新获取消息。 (sry我没告诉你) 我不知道如果队列大小 = 1 会发生什么并且我尝试添加项目。这个有点难

解决方法

目前还不清楚您的约束是什么,但如果我理解正确的话:

  • 您最多想同时下载 5 个东西
  • 您不想浪费时间 - 工作人员处理完一件物品后,应该立即获得一件新物品

队列大小应该与您的目的无关,它仅在工作程序暂时比 get_messages 快时用作缓冲区。我什至会从队列大小 1 开始,并试验较大的值是否有助于提高性能。

QueueFull 上生成任务似乎很奇怪且没有必要。处理生产者-消费者模式的惯用方法是创建固定数量的消费者,并让他们在到达时处理多个项目。您没有显示 worker,因此不清楚每个工作人员是只处理一条消息还是多条消息。

我将循环重写为:

queue = asyncio.Queue(1)
workers = [asyncio.create_task(worker(queue)) for _ in range(5)]
for current in ids:
    async for msg in get_messages(channel,id=current):
        # enqueue msg,waiting (if needed) for a free slot in the queue
        await queue.put(msg)
# wait for the remaining enqueued items to be processed
await queue.join()
# cancel the now-idle workers,which wait for a new message
# that will never arrive
for w in workers:
    w.cancel()

一个worker的定义如下:

async def worker(queue):
    while True:
        msg = await queue.get()
        ... process msg ...
        queue.task_done()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。