微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

python 并行计算 pathos模块 简介

目录

pathos模块

1、pathos自身的多进程方法(pathos.multiprocessing.ProcessPool、pathos.multiprocessing.ProcessingPool、pathos.pools.ProcessPool)

2、映射multiprocess模块的多进程方法(pathos.multiprocessing.Pool)

3、映射pp模块的多进程方法1(pathos.pools.ParallelPool、pathos.pp.ParallelPool、pathos.pp.ParallelPythonPool、pathos.parallel.ParallelPythonPool、pathos.parallel.ParallelPool)

4、映射pp模块的多进程方法2(pathos.pp.pp模块)

5、映射python内置map函数的方法(pathos.serial.SerialPool、pathos.pools.SerialPool)

实例(pathos模块)

(1)pathos.multiprocessing.ProcessPool(),pipe方法

(2)pathos.multiprocessing.ProcessPool(),apipe方法

(3)pathos.multiprocessing.ProcessPool(),map方法

(4)pathos.multiprocessing.ProcessPool(),imap方法

(5)pathos.multiprocessing.ProcessPool(),uimap方法

(6)pathos.multiprocessing.ProcessPool(),amap方法

(7)pathos.pp.ParallelPool(),pipe方法

(8)pathos.pp.ParallelPool(),apipe方法

(9)pathos.pp.ParallelPool(),map方法

(10)pathos.pp.ParallelPool(),amap方法

(11)pathos.pp.ParallelPool(),imap方法

(12)pathos.pp.ParallelPool(),uimap方法


pathos模块

pathos是一个较为综合性的模块,既能多进程,也能多线程。其主要采用进程池/线程池方法

pathos本身有一套进程池方法,同时也集成了multiprocess、pp模块的进程池方法

1、pathos自身的多进程方法(pathos.multiprocessing.Processpool、pathos.multiprocessing.ProcessingPool、pathos.pools.Processpool)

(1)建立进程池

pathos.multiprocessing.Processpool(*args,**kwds) #建立pathos的进程池(pathos.multiprocessing.Processpool实例)。

pathos.multiprocessing.ProcessingPool(*args,**kwds) #同上。

pathos.pools.Processpool(*args,**kwds) #同上。

nodes:workers的数量。如果不指定nodes,则自动检测processors的数量(即ncpus)。
ncpus:worker processors的数量
servers:worker servers的列表。
scheduler:相应的scheduler。
workdir:用于scratch calculations/files的$workdir
scatter:如为True,表示采用scatter-gatter(认为worker-pool)。
source:如为False,表示尽可能少使用TemporaryFiles。
timeout:等待scheduler返回值的时间。

同样也有几个进程池通用的方法

XXX.close() #关闭进程池,关闭后不能往pool中增加新的子进程,然后可以调用join()函数等待已有子进程执行完毕。XXX为进程池。

XXX.join() #等待进程池中的子进程执行完毕。需在close()函数调用。XXX为进程池。

def f(a,b = value):
    pass

pool = pathos.multiprocessing.Pool() 
pool.map(f,a_seq,b_seq)
pool.close()
pool.join()

(2)创建子进程

(a)单个子进程可通过pipe方法创建:

XXX.pipe(f,*args,**kwds) #采用阻塞方式(非并行)提交一个任务,阻塞直到返回结果为止。XXX为进程池实例。

XXX.apipe(f,**kwds) #异步(并行)提交一个任务到队列(queue)中,返回ApplyResult实例(其get方法可获得任务返回值,但get方法是阻塞的,应在所有子进程添加完后再调用)。XXX为进程池实例。

f(*args,**kwds)为子进程对应的活动。

(b)如果子进程有返回值,且返回值需要集中处理,则建议采用map方式(子进程活动允许多个参数)

XXX.map(f,**kwds) #采用阻塞方式按顺序运行一批任务,返回结果组成的list。func(iterable1[i],iterable2[i],...)为子进程对应的活动。XXX为进程池实例。

XXX.amap(f,**kwds) #XXX.map()的异步(并行)版本,返回MapResult实例(其具有get()方法获取结果组成的list)。XXX为进程池实例。

def f(a,b): #map方法允许多个参数
    pass

pool = pathos.multiprocessing.Pool() 
result = pool.map_async(f,(a0,a1,...),(b0,b1,...)).get()
pool.close()
pool.join()

(c)如果内存不够用,也可采用imap迭代器方式

XXX.imap(f,**kwds) #XXX.map()的非阻塞、按顺序迭代器版本,返回迭代器实例。XXX为进程池实例。

XXX.uimap(f,**kwds) #XXX.imap()的无序版本(不会按照调用顺序返回,而是按照结束顺序返回),返回迭代器实例。XXX为进程池实例。

def f(a,b): 
    pass

pool = pathos.multiprocessing.Pool() 
result = pool.uimap(f,b_seq)
pool.close()
pool.join()

for item in result:
    pass

2、映射multiprocess模块的多进程方法(pathos.multiprocessing.Pool)

(1)建立进程池

pathos.multiprocessing.Pool(processes=None,initializer=None,initargs=(),maxtasksperchild=None,context=None) #建立multiprocess的进程池。

processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量
initializer: 如果initializer不是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用 multiprocess.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。

(2)创建子进程

该进程池对应的创建子进程方法与multiprocess.Pool()(也即multiprocessing.Pool())完全相同。

3、映射pp模块的多进程方法1(pathos.pools.ParallelPool、pathos.pp.ParallelPool、pathos.pp.ParallelpythonPool、pathos.parallel.ParallelpythonPool、pathos.parallel.ParallelPool)

(1)建立进程池

pathos.pp.ParallelPool(*args,**kwds) #建立映射pp模块方法的进程池,返回pathos.parallel.ParallelPool实例。注意,建立的进程池的方法与pp模块完全不同

pathos.pp.ParallelpythonPool(*args,**kwds) #等价pathos.pp.ParallelPool()。

pathos.pools.ParallelPool(*args,**kwds) #等价pathos.pp.ParallelPool()。

pathos.parallel.ParallelPool(*args,**kwds) #等价pathos.pp.ParallelPool()。

pathos.parallel.ParallelpythonPool(*args,**kwds) #等价pathos.pp.ParallelPool()。

nodes:workers的数量。如果不指定nodes,则自动检测processors的数量(即ncpus)。
ncpus:worker processors的数量
servers:worker servers的列表。
scheduler:相应的scheduler。
workdir:用于scratch calculations/files的$workdir
scatter:如为True,表示采用scatter-gatter(认为worker-pool)。
source:如为False,表示尽可能少使用TemporaryFiles。
timeout:等待scheduler返回值的时间。

(2)创建子进程

该进程池对应的创建子进程方法与pathos.multiprocessing.Processpool()完全相同(与pp模块完全不同)。

注意,multiprocessing.Pipe()或multiprocess.Pipe()建立的管道对象无法传入子进程(可能是pickle错误)。但是,ParallelPool进程池中,子进程print函数可以直接输出到标准输出,因此也不必通过管道将信息传递到主进程了。但是,子进程print输出的格式经常出现异常,最好还是通过返回值在主进程输出

而且,amap方法是个特例。amap方法中,如果子进程有print语句,会导致返回结果不对,只包含最后一个子进程返回值的tuple,而不是所有子进程的返回值组成完整list,原因暂不清楚。因此,amap方法中,子进程需要输出内容只能通过返回值在主进程输出

4、映射pp模块的多进程方法2(pathos.pp.pp模块)

方法实质就是pp模块。

5、映射python内置map函数方法(pathos.serial.SerialPool、pathos.pools.SerialPool)

该类方法实际是串行(非并行),不做具体介绍。

SerialPool建立的进程池实际只能用pipe、map、imap方法(均是阻塞的),不能使用apipe、amap、uimap方法

实例(pathos模块)

(1)pathos.multiprocessing.Processpool(),pipe方法

import pathos
import multiprocess
import time

def f(x,conn,t0):
    ans = 1
    x0 = x
    t = time.time() - t0
    conn.send('factorial of %d: start@%.2fs' % (x0,t))
    while x > 1:
        ans *= x
        time.sleep(0.5)
        x -= 1
    t = time.time() - t0
    conn.send('factorial of %d: finish@%.2fs,res = %d' %(x0,t,ans))
    return ans

def main():
    res = []
    var = (4,8,12,20,16)
    p = pathos.multiprocessing.Processpool()
    p_conn,c_conn = multiprocess.Pipe()
    t0 = time.time()
    for i in var:
        res.append(p.pipe(f,i,c_conn,t0))

    print('output:')
    while p_conn.poll():
        print(p_conn.recv())
    t = time.time() - t0
    print('factorial of %s@%.2fs: %s' % (var,res))

if __name__ == '__main__':
    main()

结果:可以看出,所有子进程都是逐个执行的。

 output:
factorial of 4: start@1.11s
factorial of 4: finish@2.61s,res = 24
factorial of 8: start@2.61s
factorial of 8: finish@6.12s,res = 40320
factorial of 12: start@6.12s
factorial of 12: finish@11.62s,res = 479001600
factorial of 20: start@11.63s
factorial of 20: finish@21.13s,res = 2432902008176640000
factorial of 16: start@21.15s
factorial of 16: finish@28.65s,res = 20922789888000
factorial of (4,16)@28.73s: [24,40320,479001600,2432902008176640000,20922789888000]

(2)pathos.multiprocessing.Processpool(),apipe方法

import pathos
import multiprocess
import time

def f(x,c_conn = multiprocess.Pipe()
    t0 = time.time()
    for i in var:
        res.append(p.apipe(f,t0))
    for i in range(len(res)):
        res[i] = res[i].get()

    print('output:')
    while p_conn.poll():
        print(p_conn.recv())
    t = time.time() - t0
    print('factorial of %s@%.2fs: %s' % (var,res))

if __name__ == '__main__':
    main()

结果:

output:
factorial of 4: start@1.10s
factorial of 8: start@1.18s
factorial of 12: start@1.19s
factorial of 20: start@1.25s
factorial of 4: finish@2.60s,res = 24
factorial of 16: start@2.61s
factorial of 8: finish@4.69s,res = 40320
factorial of 12: finish@6.69s,res = 479001600
factorial of 16: finish@10.11s,res = 20922789888000
factorial of 20: finish@10.75s,res = 2432902008176640000
factorial of (4,16)@10.85s: [24,20922789888000]

(3)pathos.multiprocessing.Processpool(),map方法

注意,实例将multiprocessing.Pipe()创建的连接作为参数传递给子进程,pickle出错,改为multiprocess.Pipe()创建连接即可解决

import pathos
import multiprocess
import time

def f(x,ans))
    return ans

def main():
    var = (4,c_conn = multiprocess.Pipe()
    t0 = time.time()
    conn_s = [c_conn] * len(var)
    t0_s = [t0] * len(var)
    res = p.map(f,var,conn_s,t0_s)

    print('output:')
    while p_conn.poll():
        print(p_conn.recv())
    t = time.time() - t0
    print('factorial of %s@%.2fs: %s' % (var,res))

if __name__ == '__main__':
    main()

结果:可以看出,第一批次4个子进程几乎同时开启;当一个子进程结束后,马上开启第5个子进程。

output:
factorial of 4: start@1.15s
factorial of 8: start@1.15s
factorial of 12: start@1.19s
factorial of 20: start@1.26s
factorial of 4: finish@2.65s,res = 24
factorial of 16: start@2.65s
factorial of 8: finish@4.66s,res = 40320
factorial of 12: finish@6.70s,res = 479001600
factorial of 16: finish@10.15s,res = 20922789888000
factorial of 20: finish@10.76s,16)@10.91s: [24,20922789888000]

(4)pathos.multiprocessing.Processpool(),imap方法

import pathos
import multiprocess
import time

def f(x,c_conn = multiprocess.Pipe()
    t0 = time.time()
    conn_s = [c_conn] * len(var)
    t0_s = [t0] * len(var)
    res = list(p.imap(f,t0_s))

    print('output:')
    while p_conn.poll():
        print(p_conn.recv())
    t = time.time() - t0
    print('factorial of %s@%.2fs: %s' % (var,res))

if __name__ == '__main__':
    main()

结果:可以看出,第一批次4个子进程几乎同时开启;当一个子进程结束后,马上开启第5个子进程。

output:
factorial of 4: start@1.27s
factorial of 8: start@1.29s
factorial of 12: start@1.30s
factorial of 20: start@1.38s
factorial of 4: finish@2.77s,res = 24
factorial of 16: start@2.77s
factorial of 8: finish@4.79s,res = 40320
factorial of 12: finish@6.81s,res = 479001600
factorial of 16: finish@10.27s,res = 20922789888000
factorial of 20: finish@10.89s,16)@11.01s: [24,20922789888000]

(5)pathos.multiprocessing.Processpool(),uimap方法

import pathos
import multiprocess
import time

def f(x,c_conn = multiprocess.Pipe()
    t0 = time.time()
    conn_s = [c_conn] * len(var)
    t0_s = [t0] * len(var)
    res = list(p.uimap(f,res))

if __name__ == '__main__':
    main()

结果:可以看出,第一批次4个子进程几乎同时开启;当一个子进程结束后,马上开启第5个子进程。而且,第5个进程的返回值排在第4个进程的返回值之前。

output:
factorial of 4: start@1.03s
factorial of 8: start@1.08s
factorial of 12: start@1.10s
factorial of 20: start@1.15s
factorial of 4: finish@2.53s,res = 24
factorial of 16: start@2.53s
factorial of 8: finish@4.58s,res = 40320
factorial of 12: finish@6.60s,res = 479001600
factorial of 16: finish@10.03s,res = 20922789888000
factorial of 20: finish@10.66s,16)@10.78s: [24,20922789888000,2432902008176640000]

(6)pathos.multiprocessing.Processpool(),amap方法

import pathos
import multiprocess
import time

def f(x,c_conn = multiprocess.Pipe()
    t0 = time.time()
    conn_s = [c_conn] * len(var)
    t0_s = [t0] * len(var)
    res = p.amap(f,t0_s).get()

    print('output:')
    while p_conn.poll():
        print(p_conn.recv())
    t = time.time() - t0
    print('factorial of %s@%.2fs: %s' % (var,res))

if __name__ == '__main__':
    main()

结果:可以看出,第一批次4个子进程几乎同时开启;当一个子进程结束后,马上开启第5个子进程。

output:
factorial of 4: start@1.04s
factorial of 8: start@1.07s
factorial of 12: start@1.12s
factorial of 20: start@1.13s
factorial of 4: finish@2.54s,res = 24
factorial of 16: start@2.54s
factorial of 8: finish@4.58s,res = 40320
factorial of 12: finish@6.62s,res = 479001600
factorial of 16: finish@10.04s,res = 20922789888000
factorial of 20: finish@10.64s,16)@10.76s: [24,20922789888000]

(7)pathos.pp.ParallelPool(),pipe方法

注意,multiprocessing.Pipe()或multiprocess.Pipe()产生的管道对象无法传入子进程(可能是pickle错误)。但是,pathos.pp.ParallelPool()进程池中,子进程print函数可以直接输出到标准输出,因此也不必通过管道将信息传递到主进程了。

import pathos
import time

def f(x,t0):
    ans = 1
    x0 = x
    t = time.time() - t0
    print('factorial of %d: start@%.2fs' % (x0,t))
    while x > 1:
        ans *= x
        time.sleep(0.5)
        x -= 1
    t = time.time() - t0
    print('factorial of %d: finish@%.2fs,16)
    p = pathos.pp.ParallelPool()
    t0 = time.time()
    for i in var:
        res.append(p.pipe(f,t0))
        
    print('output:')
    t = time.time() - t0
    print('factorial of %s@%.2fs: %s' % (var,res))

if __name__ == '__main__':
    main()

结果:可以看出,所有子进程都是逐个执行的。

factorial of 4: start@0.12s
factorial of 4: finish@1.62s,res = 24
factorial of 8: start@1.80s
factorial of 8: finish@5.30s,res = 40320
factorial of 12: start@5.46s
factorial of 12: finish@10.96s,res = 479001600
factorial of 20: start@11.16s
factorial of 20: finish@20.66s,res = 2432902008176640000
factorial of 16: start@20.94s
factorial of 16: finish@28.44s,res = 20922789888000
output:
factorial of (4,16)@28.67s: [24,20922789888000]

(8)pathos.pp.ParallelPool(),apipe方法

import pathos
import time

def f(x,16)
    p = pathos.pp.ParallelPool()
    t0 = time.time()
    for i in var:
        res.append(p.apipe(f,t0))
        
    print('output:')
    for i in range(len(res)):
        res[i] = res[i].get()
    t = time.time() - t0
    print('factorial of %s@%.2fs: %s' % (var,res))

if __name__ == '__main__':
    main()

结果:可以看出,第一批次4个子进程几乎同时开启;当一个子进程结束后,马上开启第5个子进程。

output:
factorial of 4: start@0.20s
factorial of 4: finish@1.70s,res = 24
 factorial of 8: start@0.21s
factorial of 8: finish@3.71s,res = 40320
 factorial of 12: start@0.13s
factorial of 12: finish@5.63s,res = 479001600
 factorial of 20: start@0.18s
factorial of 20: finish@9.68s,res = 2432902008176640000
factorial of 16: start@1.70s
factorial of 16: finish@9.20s,res = 20922789888000
 factorial of (4,16)@9.72s: [24,20922789888000]

(9)pathos.pp.ParallelPool(),map方法

import pathos
import time

def f(x,16)
    p = pathos.pp.ParallelPool()
    t0 = time.time()
    res= p.map(f,[t0] * 5)
        
    print('output:')
    t = time.time() - t0
    print('factorial of %s@%.2fs: %s' % (var,res))

if __name__ == '__main__':
    main()

结果:可以看出,所有子进程都是逐个执行的。

factorial of 4: start@0.14s
factorial of 4: finish@1.64s,res = 24
factorial of 8: start@1.74s
factorial of 8: finish@5.24s,res = 40320
factorial of 12: start@5.35s
factorial of 12: finish@10.85s,res = 479001600
factorial of 20: start@11.01s
factorial of 20: finish@20.51s,res = 2432902008176640000
factorial of 16: start@20.66s
factorial of 16: finish@28.16s,res = 20922789888000
 output:
factorial of (4,16)@28.51s: [24,20922789888000]

(10)pathos.pp.ParallelPool(),amap方法

注意:amap方法中,如果子进程有print语句,会导致返回结果是只包含最后一个子进程返回值的tuple,而不是所有子进程的返回值组成完整list,原因暂不清楚。因此,amap方法中,子进程需要输出内容只能通过返回值在主进程输出

import pathos
import time

def f(x,t0):
    ans = 1
    x0 = x
    t = time.time() - t0
    msg1 = 'factorial of %d: start@%.2fs' % (x0,t)
    while x > 1:
        ans *= x
        time.sleep(0.5)
        x -= 1
    t = time.time() - t0
    msg2 = 'factorial of %d: finish@%.2fs,ans)
    return (ans,msg1,msg2)

def main():
    var = (4,16)
    p = pathos.pp.ParallelPool()
    t0 = time.time()
    ret = p.amap(f,[t0] * 5).get()
    res = [item[0] for item in ret]
        
    print('output:')
    for item in ret:
        print(item[1])
        print(item[2])
    t = time.time() - t0
    print('factorial of %s@%.2fs: %s' % (var,res))

if __name__ == '__main__':
    main()

结果:可以看出,第一批次4个子进程几乎同时开启;当一个子进程结束后,马上开启第5个子进程。

output:
factorial of 4: start@0.16s
factorial of 4: finish@1.66s,res = 24
factorial of 8: start@0.18s
factorial of 8: finish@3.68s,res = 40320
factorial of 12: start@0.19s
factorial of 12: finish@5.69s,res = 479001600
factorial of 20: start@0.14s
factorial of 20: finish@9.64s,res = 2432902008176640000
factorial of 16: start@1.66s
factorial of 16: finish@9.16s,20922789888000]

(11)pathos.pp.ParallelPool(),imap方法

import pathos
import time

def f(x,16)
    p = pathos.pp.ParallelPool()
    t0 = time.time()
    ret = list(p.imap(f,[t0] * 5))
    res = [item[0] for item in ret]
        
    print('output:')
    for item in ret:
        print(item[1])
        print(item[2])
    t = time.time() - t0
    print('factorial of %s@%.2fs: %s' % (var,res))

if __name__ == '__main__':
    main()

结果:可以看出,所有子进程都是逐个执行的。

output:
factorial of 4: start@0.17s
factorial of 4: finish@1.67s,res = 24
factorial of 8: start@1.67s
factorial of 8: finish@5.17s,res = 40320
factorial of 12: start@5.17s
factorial of 12: finish@10.67s,res = 479001600
factorial of 20: start@10.67s
factorial of 20: finish@20.17s,res = 2432902008176640000
factorial of 16: start@20.17s
factorial of 16: finish@27.67s,16)@28.41s: [24,20922789888000]

(12)pathos.pp.ParallelPool(),uimap方法

import pathos
import time

def f(x,16)
    p = pathos.pp.ParallelPool()
    t0 = time.time()
    ret = list(p.uimap(f,res))

if __name__ == '__main__':
    main()

结果:可以看出,第一批次4个子进程几乎同时开启;当一个子进程结束后,马上开启第5个子进程。而且,第5个进程的返回值排在第4个进程的返回值之前。

output:
factorial of 4: start@0.26s
factorial of 4: finish@1.76s,res = 24
factorial of 8: start@0.29s
factorial of 8: finish@3.79s,res = 40320
factorial of 12: start@0.25s
factorial of 12: finish@5.75s,res = 479001600
factorial of 16: start@1.77s
factorial of 16: finish@9.28s,res = 20922789888000
factorial of 20: start@0.31s
factorial of 20: finish@9.81s,16)@10.24s: [24,2432902008176640000]

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐