如何解决具有大块大小的多处理 pool.imap 正在跳过处理可迭代中的某些记录
我正在使用 multiprocessing pool.imap() 处理字符串列表并传递 chunksize。我的列表长度是 1821,imap 中的进程是 4。我试图为每个进程提供几乎相等数量的块大小,因此将块大小设置为 455。也尝试使用 500。但这使我的 imap 跳过一些记录。 跳过也不是那么随机,因为它是有序列表。一旦我将块大小更改为 200,imap 就开始将所有记录发送到我的目标函数。 有人可以解释为什么 chunksize > 450 在这里引起问题,而根据文档,理想情况下应该在每个进程中划分 1821/4 = 455 或 456 rec。 另请注意,在我的函数中,我使用该字符串并运行一些步骤,每个步骤需要几秒钟。在进行测试时,我尝试仅在目标函数内的文件中写入字符串,即使这样它也会跳过一些记录。
def process_init(self,l):
global process_lock
process_lock = l
def _run_multiprocess(self,num_process,input_list,target_func,chunk):
l = mp.Lock()
with mp.Pool(processes=(num_process),initializer=self.process_init,initargs=(l,)) as p:
start = time.time()
async_result = p.imap(target_func,chunksize =chunk)
p.close()
p.join()
print("Completed the multiprocess")
end = time.time()
print('total time (s)= ' + str(end-start))
chunksize = 500
self._run_multiprocess(4,iterator_source,self._process_data,chunksize)
def _process_data(self,well_name):
with open("MultiProcessMethod_RecordReceived.csv","a") as openfile:
process_lock.acquire()
openfile.write( "\t" +well_name.upper() + "\n")
openfile.flush()
process_lock.release()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。