如何解决使用两个队列作为生产者-消费者模式的 Python 多处理可能出现死锁?
我想知道以下代码中是否存在某种死锁。我必须读取数据库的每个元素(大约 100 万个项目),对其进行处理,然后将结果收集到一个唯一的文件中。
我使用两个队列和三种类型的进程通过多处理并行执行:
- Reader:读取数据库并将读取的项目添加到task_queue中的主进程
- Worker:进程池。每个工作人员从 task_queue 中获取一个项目,处理该项目,将结果保存在存储在 item_name/item_name.txt 中的中间文件中,并将 item_name 放入 Completed_queue
- Writer:进程从completed_queue获取item_name,从item_name/item_name.txt获取中间结果,写入results.txt
from multiprocessing import Pool,Process,Queue
class computation():
def __init__(self,K):
self.task_queue = Queue()
self.completed_queue = Queue()
self.n_cpus = K
def reader(self,):
with open(db,"r") as db:
... # Read an item
self.task_queue.put(item)
def worker(self,):
while True:
item = self.task_queue.get(True)
if item == "STOP":
break
self.process_item(item)
def writer_process(self,):
while True:
f = self.completed_queue.get(True)
if f == "DONE":
break
self.write_f(f)
def run(self,):
pool = Pool(n_cpus,self.worker,args=())
writer = Process(target=self.writer_process,args=())
writer.start()
self.reader()
pool.close()
pool.join()
self.completed_queue.put("DONE")
writer.join()
代码有效,但似乎有时编写器或池停止工作(或者它们非常慢)。在这种情况下是否可能出现死锁?
解决方法
您的代码存在一些问题。首先,通过按原样使用队列,您实际上是在创建自己的进程池,而根本不需要使用 multiprocessing.Pool
类。您正在使用池初始值设定项作为实际的池工作者,这有点滥用此类;您最好只使用常规的 Process
实例(无论如何,我的意见)。
其次,尽管您将消息 DONE
发送到 writer_process
以通知它终止是很好的,但您没有对 self.n_cpus
{{1} } 进程,这些进程正在寻找“STOP”消息,因此 worker
函数需要将 reader
self.n_cpus
消息放入任务队列:
STOP
就个人而言,我不会使用“STOP”和“DONE”作为哨兵m>消息,而是使用 from multiprocessing import Process,Queue
class Computation():
def __init__(self,K):
self.task_queue = Queue()
self.completed_queue = Queue()
self.n_cpus = K
def reader(self,):
with open(db,"r") as db:
... # Read an item
self.task_queue.put(item)
# signal to the worker processes to terminate:
for _ in range(self.n_cpus):
self.task_queue.put('STOP')
def worker(self,):
while True:
item = self.task_queue.get(True)
if item == "STOP":
break
self.process_item(item)
def writer_process(self,):
while True:
f = self.completed_queue.get(True)
if f == "DONE":
break
self.write_f(f)
def run(self):
processes = [Process(target=self.worker) for _ in range(self.n_cpus)]
for p in processes:
p.start()
writer = Process(target=self.writer_process,args=())
writer.start()
self.reader()
for p in processes:
p.join()
self.completed_queue.put("DONE")
writer.join()
来代替,假设这不是有效的实际消息。我已经测试了上面的代码,其中 None
只是处理列表中的字符串,reader
只是简单地将 'done' 附加到这些字符串中的每一个,并将修改后的字符串放在 self.process_item(item)
上并替换了 { {1}} 在带有 completed_queue
调用的 self.write_f
中。我没有看到代码有任何问题。
更新以使用托管队列
免责声明:我没有使用 mpi4py 的经验,也不知道队列代理如何分布在不同的计算机上。上面的代码可能不够充分,如以下文章 How to share mutliprocessing queue object between multiple computers 所建议的那样。 但是,该代码正在创建 Queue.Queue 的实例(该代码是 Python 2 代码),不是 multiprocessing.SyncManager 返回的代理。 这方面的文档很差。尝试上面的更改,看看它是否效果更好(会更慢)。
因为writer_process
返回的代理,我不得不重新整理一下代码;队列现在作为参数显式传递给进程函数:
print
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。