如何解决Python Multiprocessing,函数的一个参数是迭代器,Got TypeError
import multiprocessing
from itertools import product,imap,ifilter
def test(it):
for x in it:
print x
return None
mp_pool = multiprocessing.Pool(multiprocessing.cpu_count())
it = imap(lambda x: ifilter(lambda y: x+y > 10,xrange(10)),xrange(10))
result = mp_pool.map(test,it)
我收到错误消息:
File "/usr/lib64/python2.7/multiprocessing/process.py",line 114,in run
self._target(*self._args,**self._kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py",line 102,in worker
task = get()
File "/usr/lib64/python2.7/multiprocessing/queues.py",line 376,in get
return recv()
task = get()
File "/usr/lib64/python2.7/multiprocessing/queues.py",in get
TypeError: ifilter expected 2 arguments,got 0
return recv()
多处理不能使用带有迭代器参数的函数?谢谢!
解决方法
您的迭代器 it
必须生成单个值(每个值可以是“复杂的”,例如元组或列表)。现在我们有:
>>> it
<itertools.imap object at 0x000000000283DB70>
>>> list(it)
[<itertools.ifilter object at 0x000000000283DC50>,<itertools.ifilter object at 0x000000000283DF98>,<itertools.ifilter object at 0x000000000283DBE0>,<itertools.ifilter object at 0x000000000283DF60>,<itertools.ifilter object at 0x000000000283DB00>,<itertools.ifilter object at 0x000000000283DCC0>,<itertools.ifilter object at 0x000000000283DD30>,<itertools.ifilter object at 0x000000000283DDA0>,<itertools.ifilter object at 0x000000000283DE80>,<itertools.ifilter object at 0x000000000284F080>]
it
的每次迭代都会产生另一个迭代器,这就是问题的根源。
所以你必须“迭代你的迭代器”:
import multiprocessing
from itertools import imap,ifilter
import sys
def test(t):
return 't = ' + str(t) # return value rather than printing
if __name__ == '__main__': # required for Windows
mp_pool = multiprocessing.Pool(multiprocessing.cpu_count())
it = imap(lambda x: ifilter(lambda y: x+y > 10,xrange(10)),xrange(10))
for the_iterator in it:
result = mp_pool.map(test,the_iterator)
print result
mp_pool.close() # needed to ensure all processes terminate
mp_pool.join() # needed to ensure all processes terminate
打印的结果,如您定义的 it
,是:
[]
[]
['t = 9']
['t = 8','t = 9']
['t = 7','t = 8','t = 9']
['t = 6','t = 7','t = 9']
['t = 5','t = 6','t = 9']
['t = 4','t = 5','t = 9']
['t = 3','t = 4','t = 9']
['t = 2','t = 3','t = 9']
但是如果您想充分利用多处理(假设您有足够的处理器),那么您可以使用 map_async
以便可以一次提交所有作业:
import multiprocessing
from itertools import imap,xrange(10))
results = [mp_pool.map_async(test,the_iterator) for the_iterator in it]
for result in results:
print result.get()
mp_pool.close() # needed to ensure all processes terminate
mp_pool.join() # needed to ensure all processes terminate
或者您可以考虑使用 my_pool.imap
,它与 my_pool.map_async
不同,它不会首先将可迭代参数转换为列表来确定用于提交作业的最佳 chunksize
值(阅读文档,这不是很好),但默认情况下使用 chunksize
值 1,这对于非常大的迭代通常是不可取的:
results = [mp_pool.imap(test,the_iterator) for the_iterator in it]
for result in results:
print list(result) # to get a comparable printout as when using map_async
更新:使用多处理生成列表
import multiprocessing
from itertools import imap,ifilter
import sys
def test(t):
return 't = ' + str(t) # return value rather than printing
def generate_lists(x):
return list(ifilter(lambda y: x+y > 10,xrange(10)))
if __name__ == '__main__': # required for Windows
mp_pool = multiprocessing.Pool(multiprocessing.cpu_count())
lists = mp_pool.imap(generate_lists,xrange(10))
# lists,returned by mp_pool.imap,is an iterable
# as each element of lists becomes available it is passed to test:
results = mp_pool.imap(test,lists)
# as each result becomes available
for result in results:
print result
mp_pool.close() # needed to ensure all processes terminate
打印:
t = []
t = []
t = [9]
t = [8,9]
t = [7,8,9]
t = [6,7,9]
t = [5,6,9]
t = [4,5,9]
t = [3,4,9]
t = [2,3,9]
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。