微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

【Python】分布式驱动进程池线程池的简单实现

计算 1-200 的平方,数据长这样:[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19],...,[180, 181, 182, 183, 184, 185, 186, 187, 188, 189],[190, 191, 192, 193, 194, 195, 196, 197, 198, 199]],长度为20 ,每个客户端处理10长度的数据

使用分布式去掉第一层列表,进程池去掉第二层列表,每个进程池开启10个进程,如果想要线程池实现只需要改个名字,把客户端代码中的

with concurrent.futures.ProcesspoolExecutor(10) as pool 改为 with concurrent.futures.ThreadPoolExecutor(10) as pool

服务器:

#服务器端
import multiprocessing.managers #分布式进程管理器
import queue    #队列

task_queue = queue.Queue()  #任务队列(发出)
result_queue = queue.Queue()    #结果队列(返回)

def return_task():  #返回任务队列
    return task_queue
def return_result():    #返回结果队列
    return result_queue

class Queuemanger(multiprocessing.managers.BaseManager):#继承,进程管理共享数据
    pass

if __name__ == "__main__":
    multiprocessing.freeze_support()    #开启分布式支持
    Queuemanger.register("get_task",callable = return_task)#注册函数给客户端调用
    Queuemanger.register("get_result",callable = return_result)
    manger = Queuemanger(address=("192.168.99.1",8848),authkey=123456)#创建一个管理器,设置地址与密码
    #ip地址为本机IP
    manger.start()  #开启
    task = manger.get_task() #任务队列
    result = manger.get_result() #结果队列
    datalist = []
    for i in range(20):
        mylist = [x for x in range(10 * i, 10 * (i + 1))]
        print("压入任务队列的数据",mylist)
        task.put(mylist) #压入数据
        #[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],...,[190, 191, 192, 193, 194, 195, 196, 197, 198, 199]
    print("------ 等待中 ------")
    for i in range(20):
        res = result.get(timeout = 100)
        print("从客户端得到结果队列的数据-",res)
        
    manger.shutdown()   #关闭服务器

客户端:

#客户端
import multiprocessing.managers #分布式进程管理器
import concurrent.futures

class Queuemanger(multiprocessing.managers.BaseManager):#继承,进程管理共享数据
    pass

def go(n):
    return n * n

if __name__ == "__main__":
    Queuemanger.register("get_task")    #注册函数调用服务器
    Queuemanger.register("get_result")
    manger = Queuemanger(address=("192.168.99.1", 8848), authkey=123456)
    manger.connect()  # 连接服务器
    task = manger.get_task()   # 任务
    result = manger.get_result() # 结果

    for i in range(10):#每个客户端处理10个list
        datalist = task.get() #从队列中取出数据
        print("客户端从任务队列中得到的数据:",datalist)
        mylist1 = []
        with concurrent.futures.ProcesspoolExecutor(10) as pool:  # 开启10个进程
            dlist = pool.map(go, datalist)
            for j in dlist:
                mylist1.append(j)
            print(mylist1)
            result.put("计算完成后压入结果队列的数据:"+str(mylist1)) #将计算后的数据压入队列

服务器运行结果:

 

 客户端运行数据:

 

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐