一、操作系统中相关进程的知识
??Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。 ??子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。 ??Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程。
??示例如下
import os pid=os.fork() if pid==0: print('I am child process %s my parents is %s'%(os.getpid(),os.getppid())) else: print('I (%s) just created a child process (%s).'%(os.getpid(),pid))
??输出如下
I (64225) just created a child process (64226). I am child process 64226 my parents is 64225
二、跨平台模块multiprocessing
multiprocessing
模块提供了一个Process类来代表一个进程对象。
??示例1
from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print('Run child process %s (%s)...' % (name,os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getppid()) p = Process(target=run_proc,args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.') #join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
??示例2
from multiprocessing import Process import time import os class P(Process): def run(self): print('Run child process %s (%s)...'%(self.name,os.getpid())) # 默认函数对象有name方法 ,结果为:P-1 time.sleep(3) print('%s is done' % self.name) if __name__ == '__main__': print('Parent process %s.' % os.getppid()) p=P() p.start() p.join()
三、进程数据隔离
多个进程间的数据是隔离的,也就是说多个进程修改全局变量互不影响
??验证示例
from multiprocessing import Process import time x=100 def task(): global x print('子进程开启,当前x的值为%d'%x) time.sleep(3) x=10 print('子进程结束,当前x的值为%d'%x) if __name__ == '__main__': print('当前为父进程,准备开启子进程,x的值为%d' % x) p1=Process(target=task) p1.start() p1.join() print('当前为父进程,准备结束父进程,x的值为%d' % x)
??输出
当前为父进程,准备开启子进程,x的值为100 子进程开启,当前x的值为100 子进程结束,当前x的值为10 当前为父进程,准备结束父进程,x的值为100
==注意:有些情况是需要加锁的情况,如文件读写问题==
四、多进程并行执行
??示例如下
import time from multiprocessing import Process def task(name,n): print('%s is running'%name) time.sleep(n) print('%s is done'%name) if __name__ == '__main__': p1=Process(target=task,args=("进程1",1)) #用时1s p2=Process(target=task,args=("进程2",2)) #用时1s p3=Process(target=task,args=("进程3",3)) #用时1s start_time=time.time() p1.start() p2.start() p3.start() # 当第一秒在运行p1时,其实p2、p3也已经在运行,当1s后到p2时只需要再运行1s就到p3了,到p3也是一样。 p1.join() p2.join() p3.join() stop_time=time.time() print(stop_time-start_time) #3.2848567962646484
五、进程池
1、线性执行( pool.apply() )
from multiprocessing import Pool # 导入进程池模块pool import time,os def foo(i): time.sleep(2) print("in process",os.getpid()) # 打印进程号 if __name__ == "__main__": pool = Pool(processes=5) # 设置允许进程池同时放入5个进程 for i in range(10): pool.apply(func=foo,args=(i,)) # 同步执行挂起进程 print('end') pool.close() # 关闭进程池,不再接受新进程 pool.join() # 进程池中进程执行完毕后再关闭,如果注释掉,那么程序直接关闭。
2、并发执行( pool.apply_async() )
from multiprocessing import Pool # 导入进程池模块pool import time,os.getpid()) # 打印进程号 if __name__ == "__main__": pool = Pool(processes=5) # 设置允许进程池同时放入5个进程,并且将这5个进程交给cpu去运行 for i in range(10): pool.apply_async(func=foo,)) # 采用异步方式执行foo函数 print('end') pool.close() pool.join() # 进程池中进程执行完毕后再关闭,如果注释掉,那么程序直接关闭。
3、设置回调
from multiprocessing import Process,Pool import time,os.getpid()) # 打印子进程的进程号 def bar(arg):#注意arg参数是必须要有的 print('-->exec done:',arg,os.getpid()) # 打印进程号 if __name__ == "__main__": pool = Pool(processes=2) print("主进程",os.getpid()) # 主进程的进程号 for i in range(3): pool.apply_async(func=foo,),callback=bar) # 执行回调函数callback=Bar print('end') pool.close() pool.join() # 进程池中进程执行完毕后再关闭,如果注释掉,那么程序直接关闭。
??执行结果
主进程 752 end in process 2348 -->exec done: None 752 in process 8364 -->exec done: None 752 in process 2348 -->exec done: None 752 #回调函数说明fun=Foo干不完就不执行bar函数,等Foo执行完就去执行Bar #这个回调函数是主进程去调用的,而不是每个子进程去调用的。
六、子进程
??1、 很多时候子进程是一个外部进程,如执行一条命令,这和命令行执行效果是一样的 ??示例如下
import subprocess print('$nslookup https://www.baidu.com') r = subprocess.call(['nslookup','https://www.baidu.com']) print('Exit code',r)
??2、 有时候子进程还需要进行输入,可以通过communicate
方法来输入 ??示例如下
import subprocess print('$ nslookup https://www.baidu.com') p = subprocess.Popen(['nslookup'],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE) output,err = p.communicate(b'set q=mx\nbaidu.com\nexit\n') print(output.decode('gbk')) print('Exit code:',p.returncode)
??输出如下
$ nslookup https://www.baidu.com 默认服务器: bogon Address: 192.168.111.1 > > 服务器: bogon Address: 192.168.111.1 baidu.com MX preference = 10,mail exchanger = mx.maillb.baidu.com baidu.com MX preference = 20,mail exchanger = jpmx.baidu.com baidu.com MX preference = 15,mail exchanger = mx.n.shifen.com baidu.com MX preference = 20,mail exchanger = mx50.baidu.com baidu.com MX preference = 20,mail exchanger = mx1.baidu.com > Exit code: 0
七、守护进程
守护进程在主进程代码执行完毕时立刻挂掉,然后主进程等待非守护进程执行完毕后回收子进程的资源(避免产生僵尸进程),整体才算结束。
示例
from multiprocessing import Process import os import time def task(x): print('%s is running ' %x) time.sleep(3) print('%s is done' %x) if __name__ == '__main__': p1=Process(target=task,args=('守护进程',)) p2=Process(target=task,args=('子进程',)) p2.start() p1.daemon=True # 设置p1为守护进程 p1.start() print('主进程代码执行完毕') >>:主进程代码执行完毕 >>:子进程 is running >>:子进程 is done
==可以从结果看出,主进程代码执行完,守护进程立即挂掉,主进程在等待子进程执行完毕后退出==
八、进程间通信
??如果想要进程间通信可以使用Queue
或Pipe
来实现 ??使用Queue示例
from multiprocessing import Queue,Process def put_id(q): q.put([1,2,3,4]) if __name__ == '__main__': q=Queue() p=Process(target=put_id,args=(q,)) p.start() print(q.get()) p.join() # 输出 [1,4]
==注意:在这需要从multiprocessing导入Queue模块==
??使用Pipe示例
from multiprocessing import Process,Pipe def put_id(conn): conn.send([1,3]) conn.send([4,5,6]) conn.close() if __name__ == '__main__': ## 生成管道。 生成时会产生两个返回对象,这两个对象相当于两端的电话,通过管道线路连接。 ## 两个对象分别交给两个变量。 parent_conn,child_conn=Pipe() p=Process(target=put_id,args=(child_conn,))#child_conn需要传给对端,用于send数据给parent_conn p.start() print(parent_conn.recv()) # parent_conn在这断用于接收数据>>>>[1,3] print(parent_conn.recv()) # parent_conn在这断用于接收数据>>>>[4,6] p.join()
==注意两端要发送次数和接受次数要对等,不然会卡住直到对等==
九、进程间数据共享(字典和列表型)
??前面说过,进程间数据是隔离的,如果想要进程间数据共享可以通过Manager
来实现 ??示例如下
from multiprocessing import Manager,Process from random import randint import os def run(d,l): d[randint(1,50)]=randint(51,100)#生成一个可在多个进程之间传递和共享的字典 l.append(os.getpid()) print(l) if __name__ == '__main__': with Manager() as manage: #做一个别名,此时manager就相当于Manager() d=manage.dict()#生成一个可在多个进程之间传递和共享的字典 l=manage.list(range(5))#生成一个可在多个进程之间传递和共享的列表 p_list=[] for i in range(10):#生成10个进程 p=Process(target=run,args=(d,l)) p_list.append(p)# 将每个进程放入空列表中 p.start() for i in p_list: i.join() print(d)#所有进程都执行完毕后打印字典 print(l)#所有进程都执行完毕后打印列表
十、分布式进程
??在做分布式计算时显然进程比线程各合适,一来进程更稳定,二来线程最多只能在同一台机器的多个cpu上运行; ??multiprocessing
的managers
子模块支持把多进程分布到多个机器上,一个服务进程用作调度者,依靠网络将任务分布到其它多个进程中。 ??假设有一个需求,拥有两台机器,一台机器用来做发送任务的服务进程,一台用来做处理任务的服务进程; ??示例如下
# task_master.py from multiprocessing.managers import BaseManager from queue import Queue import random import time task_queue = Queue() result_queue = Queue() class QueueManager(BaseManager): pass def get_task_queue(): global task_queue return task_queue def get_result_queue(): global result_queue return result_queue if __name__ == '__main__': # 将两个队列注册到网络上,calltable参数关联Queue对象 QueueManager.register('get_task_queue',callable=get_task_queue) QueueManager.register('get_result_queue',callable=get_result_queue) # 创建一个队列管理器,绑定端口5000,设定密码为abc manager = QueueManager(address=('127.0.0.1',5000),authkey=b'abc') manager.start() # 通过网络获取Queue对象 task = manager.get_task_queue() result = manager.get_result_queue() # 放任务进去 for i in range(10): n = random.randint(0,1000) print('Put Task %d'%n) task.put(n) # 从结果队列获取结果 print('Try get results') for i in range(10): r = result.get() print('Result: %s' % r) manager.shutdown() print('master exit')
==注意:一定要用注册过的Queue对象,另外在linux/unix/mac等系统上注册可直接使用QueueManager.register(‘get_result_queue‘,callable=lambda : result_queue)
==
# task_worker.py from multiprocessing.managers import BaseManager from queue import Queue from queue import Empty import time class QueueManager(BaseManager): pass if __name__ == '__main__': # 从服务器上获取,所以注册时只需要提供名字,也就是接口名字 QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是task_master.py的机器 server_addr = '127.0.0.1' manager = QueueManager(address=(server_addr,authkey=b'abc') manager.connect() # 获取Queue对象 task = manager.get_task_queue() result = manager.get_result_queue() # 从队列提取任务,将处理结果插入result队列 for i in range(10): try: n = task.get(timeout=1) print('run task %d*%d'%(n,n)) r = '%d * %d = %d'%(n,n,n*n) time.sleep(1) result.put(r) except Empty: print('task queue is empty') print('worker exit')
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。