目录
代码示例3:子进程间通信Queue&JoinableQueue
进程同步之:Queue和JoinableQueue
multiprocessing.Queue类似于queue.Queue,一般用来多个进程间交互信息。Queue是进程和线程安全的。它实现了queue.Queue的大部分方法,但task_done()和join()没有实现。
multiprocessing.JoinableQueue是multiprocessing.Queue的子类,增加了task_done()方法和join()方法。
task_done():一般在调用get()时获得一个task,在task结束后调用task_done()来通知Queue当前task完成。
join():阻塞直到queue中的所有的task都被处理(即task_done方法被调用)。
代码示例1:Queue
#encoding=utf-8
from multiprocessing import Process,Queue
def offer(queue):
# 入队列
if queue.empty():
queue.put("Hello World")
else:
print(queue.get())
if __name__ == '__main__':
# 创建一个队列实例
q = Queue()
p = Process(target = offer,args = (q,))
p.start()
print(q.get()) # 主进程执行出队列操作
q.put("哈哈哈哈哈")# 主进程执行入队列操作
m = Process(target = offer,)) # 子进程执行入队列操作
m.start()
m.join()
p.join()
代码示例2:两个子进程间通信
#encoding=utf-8
from multiprocessing import Process,Queue
import os,time,random
# 写数据进程执行的代码:
def write(q):
for value in ['A','B','C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码
def read(q):
time.sleep(1)
while not q.empty():
# if not q.empty():
print('Get %s from queue.' % q.get(True))
time.sleep(1) # 目的是等待写队列完成
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程
q = Queue()
pw = Process(target = write,))
pr = Process(target = read,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
pr.join()
print("Done!")
代码示例3:子进程间通信Queue&JoinableQueue
#encoding=utf-8
import multiprocessing
import time
class Consumer(multiprocessing.Process):
# 派生进程
def __init__(self,task_queue,result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue#任务队列
self.result_queue = result_queue#结果队列
# 重写原进程的run方法
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:#为None,进程退出
# Poison pill means shutdown
print(('%s: Exiting' % proc_name))
self.task_queue.task_done()
break
print(('%s: %s' % (proc_name,next_task)))
answer = next_task() # __call__()
self.task_queue.task_done()#执行完get之后调用task_done()来通知Queue当前task完成
self.result_queue.put(answer)
return
class Task(object):
def __init__(self,a,b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a,self.b,self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a,self.b)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count()
print(('Creating %d consumers' % num_consumers))
# 创建cup核数量个的子进程
consumers = [ Consumer(tasks,results) for i in range(num_consumers) ]
# 依次启动子进程
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i,i))
for i in range(num_consumers):
tasks.put(None)#有几个进程加几个None,以此结束死循环
# Wait for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print ('Result: %s' %result)
num_jobs -= 1
一个任务
执行完任务之后,都要调用一次self.task_queue.task_done()
表示这个任务完成)
2 结果队列,类型:Queue,存储任务的计算结果(10个数)
3 获取你机器的cpu个数
4 实例化10个Task类的实例,作为任务,放到任务队列
5 启动cpu个数的进程,依次start启动
6 多进程开始干这10个任务,每个任务的计算结果计入结果队列
7 任务队列名.join()这句话等待所有任务执行完毕
8 遍历结果队列,打印所有的结果
进程同步之:Lock
锁是为了确保数据一致性。比如读写锁,每个进程给一个变量增加 1 ,但是如果在一个进程读取但还没有写入的时候,另外的进程也同时读取了,并写入该值,则最后写入的值是错误的,这时候就需要加锁来保持数据一致性。
>#encoding=utf-8
from multiprocessing import Process,Lock
import time
def l(num,lock):
lock.acquire() # 获得锁
time.sleep(0.2)
print("Hello Num: %s" % (num))
lock.release() # 释放锁
if __name__ == '__main__':
lock = Lock() # 创建一个共享锁实例
for num in range(10):
Process(target = l,args = (num,lock)).start()
进程同步之:Semaphore(多把锁)
Semaphore用于控制对共享资源的访问数量。Semaphore锁和Lock稍有不同,Semaphore相当于N把锁,获取其中一把就可以执行。可用锁的总数N在创建实例时传入,比如s = Semaphore(n)。与Lock一样,如果可用锁为0,进程将会阻塞,直到可用锁大于0。
#encoding=utf-8
import multiprocessing
import time
def worker(s,i):
s.acquire()
print(multiprocessing.current_process().name + " acquire")
time.sleep(i)
print(multiprocessing.current_process().name + " release")
s.release()
if __name__ == "__main__":
# 设置限制最多3个进程同时访问共享资源
s = multiprocessing.Semaphore(3)
for i in range(5):
p = multiprocessing.Process(target = worker,args = (s,i * 2))
p.start()
进程同步之:event
Event提供一种简单的方法,可以在进程间传递状态信息,实现进程间同步通信。事
件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。
#encoding=utf-8
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print('wait_for_event: starting')
e.wait() # 等待收到能执行信号,如果一直未收到将一直阻塞
print('wait_for_event: e.is_set()->',e.is_set())
def wait_for_event_timeout(e,t):
"""Wait t seconds and then timeout"""
print('wait_for_event_timeout: starting')
e.wait(t)# 等待t秒超时,此时Event的状态仍未未设置,继续执行
print('wait_for_event_timeout: e.is_set()->',e.is_set())
e.set()# 初始内部标志为真
if __name__ == '__main__':
e = multiprocessing.Event()
print("begin,e.is_set()",e.is_set())
w1 = multiprocessing.Process(name='block',target=wait_for_event,args=(e,))
w1.start()
w2 = multiprocessing.Process(name='block',))
w2.start()
#可将2改为5,看看执行结果
w3 = multiprocessing.Process(name='nonblock',target=wait_for_event_timeout,2))
w3.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
# e.set() #可注释此句话看效果
print('main: event is set')
两个等待的进程都继续执行
进程同步之:pipe(管道)
Pipe是两个进程间通信的工具。Pipe可以是单向(half-duplex),也可以是双向(duplex)。通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从Pipe一端输入对象,然后被Pipe另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
Pipe的每个端口同时最多一个进程读写,否则会出现各种问题,可能造成corruption异常。Pipe对象建立的时候,返回一个含有两个元素的元组对象,每个元素代表Pipe的一端(Connection对象)。我们对Pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。
#encoding=utf-8
import multiprocessing as mp
def proc_1(pipe):
pipe.send('hello')
print('proc_1 received: %s' %pipe.recv())#pipe.recv()会死等
pipe.send("what is your name?")
print('proc_1 received: %s' %pipe.recv())
def proc_2(pipe):
print('proc_2 received: %s' %pipe.recv())#pipe.recv()会死等
pipe.send('hello,too')
print('proc_2 received: %s' %pipe.recv())
pipe.send("I don't tell you!")
if __name__ == '__main__':
# 创建一个管道对象pipe
pipe = mp.Pipe()
print(len(pipe))
print(type(pipe))
# 将第一个pipe对象传给进程1
p1 = mp.Process(target = proc_1,args = (pipe[0],))
# 将第二个pipe对象传给进程2
p2 = mp.Process(target = proc_2,args = (pipe[1],))
p2.start()
p1.start()
p2.join()
p1.join()
进程同步之:Condition
一个condition变量总是与某些类型的锁相联系,当几个condition变量必须共享同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪。
condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。 acquire() 和release() 会调用与锁相关联的相应方法。
wait()方法会释放锁,当另外一个进程使用notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,Condition类实现了一个conditon变量。这个condition变量允许一个或多个进程等待,直到他们被另一个进程通知。如果lock参数,被给定一个非空的值,那么他必须是一个lock或者Rlock对象,它用来做底层锁。否则,会创建一个新的Rlock对象,用来做底层锁。
wait(timeout=None) :等待通知,或者等到设定的超时时间。当调用wait()方法时,如果调用它的进程没有得到锁,那么会抛出一个RuntimeError异常。 wait()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前会一直阻塞。如果有等待的进程,notify()方法会唤醒一个在等待conditon变量的进程。notify_all() 则会唤醒所有在等待conditon变量的进程。
注意:
notify()和notify_all()不会释放锁,也就是说,进程被唤醒后不会立刻返回他们的wait() 调用。除非进程调用notify()和notify_all()之后放弃了锁的所有权。在典型的设计风格里,利用condition变量加锁去允许访问一些共享状态,进程在获取到它想得到的状态前,会反复调用wait()。修改状态的进程在他们状态改变时调用 notify() or notify_all(),用这种方式,进程会尽可能的获取到想要的一个等待者状态。
#encoding=utf-8
import multiprocessing as mp
import threading
import time
def consumer(cond):
with cond:
print("consumer before wait")
cond.wait() # 等待消费
print("consumer after wait")
def producer(cond):
with cond:
print("producer before notifyAll")
cond.notify_all() # 通知消费者可以消费了
## cond.notify() # 通知1个人,没有参数
print("producer after notifyAll")
if __name__ == '__main__':
condition = mp.Condition()
p1 = mp.Process(name = "p1",target = consumer,args=(condition,))
p2 = mp.Process(name = "p2",))
p3 = mp.Process(name = "p3",target = producer,))
p1.start()
time.sleep(2)
p2.start()
time.sleep(2)
p3.start()
进程同步之:共享变量(数字/字符串/列表/字典/实例对象)
程序运行中生成的变量是放在进程的数据区中,每个进程都是独立的地址空间,所以用一般的方法是不能共享变量的,multiprocessing模块提供了Array/Manager/Value类,借助以上类能够实现进程间共享数字变量/字符串变量/列表/字典/实例对象。
- Value类
直接类似Value(‘d’,0.0)即可
构造方法:multiprocessing.Value(typecode_or_type,*args[,lock])。
函数作用:返回从共享内存中分配的一个ctypes 对象,其中typecode_or_type定义了返回的类型。它要么是一个ctypes类型,要么是一个代表ctypes类型的code。比如c_bool和’b’是同样的,因为’b’是c_bool的code。
参数说明:
ctypes是Python的一个外部函数库,它提供了和C语言兼任的数据类型,可以调用DLLs或者共享库的函数,能被用作在python中包裹这些库。对于共享整数或者单个字符,初始化比较简单,参照下图映射关系即可:*args是传递给ctypes的构造参数
-
Array类:
构造方法:Array(typecode_or_type,size_or_initializer,*,lock=True)
函数作用:返回从共享内存中分配的一个数据内容为ctypes 类型的数组,ctypes 类型参见上表
参数说明:size_or_initializer可以是一个初始化好的列表也可以是列表的大小,如果是列表大小的话,默认值和c中对应的数据类型是一致的。 -
Manager类:
Manager对象的使用方法同multiprocessing中的类似,共享的字符串只能用manager.Value()来实现,因为multiprocessing.Value()的参数不支持字符串
代码示例1:未使用共享变量
#encoding=utf-8 from multiprocessing import Process def f(n,a): n = 3.1415927 for i in range(len(a)): a[i] = -a[i] #print(a[i]) if __name__ == '__main__': num = 0 # arr = list(range(10)) p = Process(target = f,arr)) p.start() p.join() print(num) print(arr[:])
代码示例2:共享数字变量
#encoding=utf-8 from multiprocessing import Process,Value,Array def f(n,a): n.value = n.value+1 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d',0.0) # 创建一个进程间共享的数字类型,默认值为0 arr = Array('i',range(10)) # 创建一个进程间共享的数组类型,初始值为range[10] #arr = Array('i',10) # 创建一个进程间共享的数组类型,初始大小为10 p = Process(target = f,arr)) p.start() p.join() p = Process(target = f,arr)) p.start() p.join() print(num.value) # 获取共享变量num的值 print(arr[:])
代码示例3:共享变量+锁
#encoding=utf-8 import time from multiprocessing import Process,Lock class Counter(object): def __init__(self,initval = 0): self.val = Value('i',initval) self.lock = Lock() def increment(self): with self.lock: self.val.value += 1 # 共享变量自加1 #print(“increment one time!”,self.value() ) #加此句死锁 def value(self): with self.lock: return self.val.value def func(counter): for i in range(50): time.sleep(0.01) counter.increment() if __name__ == '__main__': counter = Counter(0) procs = [Process(target = func,args = (counter,)) for i in range(10)] # 等价于 # for i in range(10): # Process(target = func,)) for p in procs: p.start() for p in procs: p.join() print(counter.value())
代码示例4:共享字符串
#encoding=utf-8 from multiprocessing import Process,Manager,Value from ctypes import c_char_p def greet(shareStr): shareStr.value = shareStr.value + ",World!" if __name__ == '__main__': manager = Manager() shareStr = manager.Value(c_char_p,"Hello") process = Process(target = greet,args = (shareStr,)) process.start() process.join() print(shareStr.value)
代码示例5:共享的字典和列表
#encoding=utf-8 from multiprocessing import Process,Manager def f( shareDict,shareList ): shareDict[1] = '1' shareDict['2'] = 2 shareDict[0.25] = None shareList.reverse() # 翻转列表 if __name__ == '__main__': manager = Manager() shareDict = manager.dict() # 创建共享的字典类型 shareList = manager.list( range( 10 ) ) # 创建共享的列表类型 p = Process( target = f,args = ( shareDict,shareList ) ) p.start() p.join() print(shareDict) print(shareList)
代码示例6:共享实例对象#encoding=utf-8 import time,os import random from multiprocessing import Pool,Lock,Manager from multiprocessing.managers import BaseManager #必须要创建一个BaseManager类的子类 class MyManager(BaseManager): pass def Manager(): m = MyManager() m.start() return m class Counter(object): def __init__(self,initval=0): self.val = Value('i',initval) self.lock = Lock() def increment(self): with self.lock: self.val.value += 1 def value(self): with self.lock: return self.val.value #将Counter类注册到Manager管理类中 MyManager.register('Counter',Counter) def long_time_task(name,counter): time.sleep(0.2) print('Run task %s (%s)...\n' % (name,os.getpid())) start = time.time() #time.sleep(random.random() * 3) for i in range(50): time.sleep(0.01) counter.increment() end = time.time() print('Task %s runs %0.2f seconds.' % (name,(end - start))) if __name__ == '__main__': manager = Manager()#返回一个管理对象 # 创建共享Counter类实例对象的变量,Counter类的初始值0 counter = manager.Counter(0) print('Parent process %s.' % os.getpid()) p = Pool() for i in range(5): p.apply_async(long_time_task,args = (str(i),counter)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') print(counter.value())
说明:
Manager()函数返回一个管理对象,它控制了一个服务端进程,用来保持Python对象,并允许其它进程使用代理来管理这些对象。Manager()返回的管理者,支持类型包括,list,dict,Namespace,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value and Array。
managers比使用共享内存对象更灵活,因为它支持任意对象类型。同样的,单个的manager可以通过网络在不同机器上进程间共享。但是,会比共享内存慢。版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。
-