微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在python中正确实现生产者消费者

如何解决如何在python中正确实现生产者消费者

我在生产者消费者模式中有两个线程。当消费者收到数据时,它会调用一个耗时的函数 expensive(),然后进入一个 for 循环。

但是如果消费者正在处理新数据到达时,它应该中止当前工作,(退出循环)并从新数据开始。

我尝试使用 queue.Queue 这样的东西:

q = queue.Queue()

def producer():
    while True:
        ...
        q.put(d)
      
def consumer():
    while True:
        d = q.get()
        expensive(d)
        for i in range(10000):
            ...
            if not q.empty():
                break
    

但是这段代码的问题是,如果生产者放入数据太快,并且队列中有很多项目,消费者将执行 expensive(d) 调用加上一次循环迭代,然后对每个项目中止,这很耗时。代码应该可以工作,但没有优化。

解决方法

在不修改 expensive 中的代码的情况下,一种解决方案可能是将其作为单独的进程运行,这将使您能够提前终止它。但是,由于没有提及 expensive 运行多长时间,因此这可能会或可能不会提高时间效率。

import multiprocessing as mp

q = queue.Queue()


def producer():
    while True:
        ...
        q.put(d)
  
def consumer():
    while True:
        d = q.get()
        exp = mp.Thread(target=expensive,args=(d,))
        for i in range(10000):
            ...
            if not q.empty():
                exp.terminate() # or exp.kill()
                break
,

嗯,一种方法是使用队列设计,可以保留等待和工作线程的内部列表。然后,您可以创建多个消费者线程来等待队列,并在工作到达时设置一个已知的消费者线程来完成工作。当线程完成后,它调用队列将自己从工作列表中删除,并将自己添加到等待列表中。

每个消费者线程都有一个“中止”原子,可以通知线程提前结束。线程执行内部循环时会有一些延迟,但这无关紧要....

如果新工作从生产者到达队列,并且工作队列不为空,则可以设置工作线程的“中止”布尔值,并将它们的优先级设置为尽可能低。然后可以将新工作分派到池中的一个等待线程上,从而将其设置为工作。

等待线程将需要一个“开始”函数,该函数向等待线程……好吧……等待的事件/sema/condvar 发出信号。这允许提供工作的生产者设置该特定线程运行,而不是池中的任何线程都可以获取工作的“通常”做法。

这样的设计允许“立即”开始新的工作,通过降低前一个工作线程的优先级使其变得无关紧要,并避免线程/进程终止的开销。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。