目录
(1)concurrent.futures.ProcessPoolExecutor(),submit方法
(2)concurrent.futures.ProcessPoolExecutor(),map方法
concurrent.futures模块
1、基本概念
concurrent.futures模块也是一个即可实现多进程,也可用于多线程的模块。以下只介绍多进程,多线程的接口基本相同。
concurrent.futures模块的接口与multiprocessing差异比较大,要理解如何用它,需要搞清楚executor和future两个概念:
2、创建executor和future
(1)创建进程池Executor
concurrent.futures.ProcesspoolExecutor(max_workers=None) #建立进程池Executor。max_workers表示进程池的进程数量限制,缺省为None,表示与cpu数量相同。
concurrent.futures.ThreadPoolExecutor(max_workers=None) #多讲一个线程池Executor的创建。
(2)创建子进程Future
(a)直接提交单个子进程:
XXX.submit(fn,*args,**kwargs) #向进程池提交一个子进程(future对象),子进程直接开始运行,并返回该future对象。future对象对应函数fn(*args,**kwargs)。XXX为进程池Executor。
(b)map方式(同时提交多个子进程):
XXX.map(fn,*iterables,timeout=None,chunksize=1) #并发map函数,返回迭代器(与map(fn,*iterables)返回相同,只是并发执行)。注意这个并行与mulitprocessing模块中的并行也有差异,这里是同批次的子进程同步执行,直到该批次所有进程结束后,才开始下一批次执行。XXX为进程池Executor。
fn:被调函数,
iterables:个数与被调函数的参数个数相同(每个iterable对应一个参数)。
timeout:最大等待时间。缺省为None,表示无限等待。
chunksize:缺省为1,表示iterables中的元素将一次送出1个到进程池。如大于1,则一次送出chunksieze个元素到进程池。对于非常大的iterables,设置较大的chunksize,将显著加快执行速度(只对ProcesspoolExecutor有用。对ThreadPoolExecutor无用,该值忽略)。
(3)with上下文管理
如果直接使用executor和future,多进程执行完成后,最好手动释放资源:
XXX.shutdown() #所有任务完成后,清理并释放进程池Executor相关的资源。
更好的办法是采用with上下文管理:
with concurrent.futures.ProcesspoolExecutor() as executor:
res = executor.map(...)
pp模块
1、基本概念
pp模块(也即parallel python模块)是用纯Python编写的开源、跨平台、轻巧并行模块。
据了解,pp模块实现并行的方式与multiprocessing等不同,他是通过修改pyhton的GIL机制突破这个限制,而非开启多个解释器(未详细确认)。可能也是因为以上原因,pp模块调用子进程中的print函数可以正常输出到主进程标准输出,不像在multiprocessing、multiprocess、pathos.multiprocessing.Processpool、concurrent.futures等模块中还必须通过进程间通信传递给主进程操作。
要建立多进程,首先需通过Server类创建进程服务器(类似于进程池);然后,在这个服务中,创建具体的子进程任务Task。
2、建立多进程Server
pp.Server(ncpus='autodetect',ppservers=(),secret=None,restart=False,proto=2,socket_timeout=3600) #建立多进程服务器。ncpus为子进程数量限制。
ncpus:worker processes的数量。缺省为'autodetect',表示自动检测processors的数量。
ppservers:活动的parallel python execution servers的列表。
secret:用于网络连接的口令(passphrase),如果忽略,将使用默认口令。强烈建议使用自定义的口令。
restart:如为True,表示每项任务完成后,重启worker process。
proto:pickle模块的协议代码。
socket_timeout:秒数,表示远程任务执行的最大时间。如果需要运行时间较长的任务,则增大该值;如果远程ppservers经常掉线,则减小该值。
3、建立子进程任务Task
XXX.submit(func,args=(),depfuncs=(),modules=(),callback=None,callbackargs=(),group='default',globals=None) #向多进程服务器XXX提交任务,返回对应该任务的Task实例。submit方法是非阻塞的。
func: 被并行执行的函数
args: func的参数,以元组的形式传入
depfuncs: 被func调用的函数,以元组的形式传入
modules: 函数执行需要调用的模块,以元组的形式传入
callback:回调函数,其参数为(callbackargs,result),result为任务完成的返回值。
callbackargs:回调函数的额外参数。
group:任务分组,用于wait(group)语句的接口(按group阻塞等待完成)。
globals:包含所有模块、函数、类等的dict,比如globals()。
XXX(raw_result=False) #阻塞,直到获取子进程任务XXX的返回值。XXX为Task实例。
由于XXX()是阻塞的,其如果在向多进程Server添加Task过程中调用,将导致每个子进程任务阻塞,任务变为串行。因此,应在所有子进程任务添加完后,再统一调用XXX()获取子进程返回值。
实例(concurrent.futures模块)
(1)concurrent.futures.ProcesspoolExecutor(),submit方法
import concurrent.futures
import multiprocessing
import time
def f(x,conn,t0):
ans = 1
x0 = x
t = time.time() - t0
conn.send('factorial of %d: start@%.2fs' % (x0,t))
while x > 1:
ans *= x
time.sleep(0.5)
x -= 1
t = time.time() - t0
conn.send('factorial of %d: finish@%.2fs,res = %d' %(x0,t,ans))
return ans
def main():
res = []
var = (4,8,12,20,16)
p_conn,c_conn = multiprocessing.Pipe()
t0 = time.time()
with concurrent.futures.ProcesspoolExecutor() as executor:
futures = []
for i in var:
futures.append(executor.submit(f,i,c_conn,t0))
for fut in futures:
res.append(fut.result())
print('output:')
while p_conn.poll():
print(p_conn.recv())
t = time.time() - t0
print('factorial of %s@%.2fs: %s' % (var,res))
if __name__ == '__main__':
main()
结果:可以看出,第一批次4个子进程几乎同时开启;当一个子进程结束后,马上开启第5个子进程。
output:
factorial of 4: start@1.05s
factorial of 8: start@1.12s
factorial of 12: start@1.14s
factorial of 20: start@1.17s
factorial of 4: finish@2.55s,res = 24
factorial of 16: start@2.55s
factorial of 8: finish@4.62s,res = 40320
factorial of 12: finish@6.64s,res = 479001600
factorial of 16: finish@10.06s,res = 20922789888000
factorial of 20: finish@10.67s,res = 2432902008176640000
factorial of (4,16)@10.82s: [24,40320,479001600,2432902008176640000,20922789888000]
(2)concurrent.futures.ProcesspoolExecutor(),map方法
import concurrent.futures
import multiprocessing
import time
def f(x,ans))
return ans
def main():
var = (4,c_conn = multiprocessing.Pipe()
t0 = time.time()
conn_s = [c_conn] * len(var)
t0_s = [t0] * len(var)
with concurrent.futures.ProcesspoolExecutor() as executor:
res = list(executor.map(f,var,conn_s,t0_s))
print('output:')
while p_conn.poll():
print(p_conn.recv())
t = time.time() - t0
print('factorial of %s@%.2fs: %s' % (var,res))
if __name__ == '__main__':
main()
结果:可以看出,第一批次4个子进程几乎同时开启;当一个子进程结束后,马上开启第5个子进程。
output:
factorial of 4: start@1.10s
factorial of 8: start@1.11s
factorial of 12: start@1.12s
factorial of 20: start@1.15s
factorial of 4: finish@2.60s,res = 24
factorial of 16: start@2.60s
factorial of 8: finish@4.62s,res = 40320
factorial of 12: finish@6.63s,res = 479001600
factorial of 16: finish@10.12s,res = 20922789888000
factorial of 20: finish@10.65s,16)@10.83s: [24,20922789888000]
实例(pp模块)
(1)pp.Server(),submit方法
注意,multiprocessing.Pipe()或multiprocess.Pipe()产生的管道对象无法传入子进程Task(可能是pickle错误)。但是,pp模块中,子进程print函数可以直接输出到标准输出,因此也不必通过管道将信息传递到主进程了。
import pp
import time
def f(x,t0):
ans = 1
x0 = x
t = time.time() - t0
print('factorial of %d: start@%.2fs' % (x0,t))
while x > 1:
ans *= x
time.sleep(0.5)
x -= 1
t = time.time() - t0
print('factorial of %d: finish@%.2fs,16)
p = pp.Server()
t0 = time.time()
for i in var:
res.append(p.submit(f,args = (i,t0),modules = ('time',)))
print('output:')
for i in range(len(res)):
res[i] = res[i]()
t = time.time() - t0
print('factorial of %s@%.2fs: %s' % (var,res))
if __name__ == '__main__':
main()
结果:可以看出,第一批次4个子进程几乎同时开启;当一个子进程结束后,马上开启第5个子进程。
output:
factorial of 4: start@0.13s
factorial of 4: finish@1.63s,res = 24
factorial of 8: start@0.17s
factorial of 8: finish@3.67s,res = 40320
factorial of 12: start@0.17s
factorial of 12: finish@5.67s,res = 479001600
factorial of 20: start@0.20s
factorial of 20: finish@9.70s,res = 2432902008176640000
factorial of 16: start@1.63s
factorial of 16: finish@9.14s,res = 20922789888000
factorial of (4,16)@9.75s: [24,20922789888000]
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。