微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 multirprocessing.Manager.Queue for while 循环运行 python multiprocessing Process?

如何解决如何使用 multirprocessing.Manager.Queue for while 循环运行 python multiprocessing Process?

我有几个要在 while 循环中运行的进程。我基本上有一些过程通过使用 do_performance 来计算一些值,随后我希望它们由一些规则(如在 mc_scheduler 方法中)进行操作,直到计算出的值满足 while 循环中的预定义标准. 下面是我的简化代码。有两个 python class;一个是 multiprocess.Process 的子类(class Worker),另一个是我的类(class MonteCarlo

import itertools 
import multiprocessing as mp
import numpy as np
import ctypes

from sympy.combinatorics.permutations import Permutation
from sympy.utilities.iterables import multiset_permutations
from multiprocessing import Pool,Process,Queue,Manager,Value,def do_performance(i,j):
    i = np.ravel(i)
    j = np.ravel(j)
    out = np.dot(i,j)
    return out


class Worker(Process):
    def __init__(self,MDarr,in_queue,out_queue):
        super(Worker,self).__init__()
        self.MDarr = MDarr
        self.in_queue = in_queue
        self.out_queue = out_queue

    def mcSimulation(self,replicaData,MDarr):
        out = [(i,do_performance(MDarr[0][i,:,:],MDarr[1][i,:])) for i in replicaData]
        out.sort(key=lambda x: x[1])
        return out[: 1000]  

    def run(self):  
        while True:
            input_list = self.in_queue.get()

            # sleep to allow the other workers a chance (b/c the work action is too simple)
            time.sleep(1)

            # put the transformed work on the queue and do simulation
            self.out_queue.put(self.mcSimulation(input_list,self.MDarr))


class MonteCarlo:
    def __init__(self,):
        super(MonteCarlo,self).__init__()
        self.initialize_simulation()

    def initialize_simulation(self):
        # define variables
        self.n_proc,self.n_replica = 5,50
        dt = 4e-12

        # generate randomized numpy array for example
        self.initIdx,self.endIdx = 135,60918
        self.n_traj,self.n_atom = 50000,22
        self.MDarr = np.random.normal(size=(self.n_traj,self.n_atom,3))

        # generate some of list as input into mp.Manager.Queue()
        set_of_list = np.random.choice(np.arange(self.n_traj),self.n_replica,)
        self.input_list = list(itertools.islice(
            multiset_permutations(set_of_list),1000))

        # generate shared_memory array
        self.sharedBaseArr = mp.Array(
            ctypes.c_double,(2 * self.n_traj * self.n_atom * 3),lock=False)
        self.main_NpArray = np.frombuffer(
            self.sharedBaseArr,dtype=ctypes.c_double).reshape(2,self.n_traj,3)
        np.copyto(self.main_NpArray,self.MDarr)
        assert self.main_NpArray.base.base is self.sharedBaseArr,f'shared base array has different shape with main numpy array'

        self.replica_manager = mp.Manager()
        self.in_queue,self.out_queue = self.replica_manager.Queue(),self.replica_manager.Queue()

        return None

    def mc_scheduler(self,result):
        if self.prevIoUs_result == 0:
            self.scheduler_val = 0
            self.prevIoUs_result = result
            self.min_val = result[0][1]

        elif self.prevIoUs_result != 0:
            self.optimal = result
            self.out = [i for i in result if i[1] < self.min_val]

            if len(self.out) == 0:
                self.scheduler_val += 1
                self.min_val = self.min_val
            else:
                # reset mc_scheduler
                self.scheduler_val = 0
                self.min_val = self.out[0][1]

        return None

    def run(self):
        s1time = time.time()

        print(f"Start code {datetime.Now()}")
        print(f"construct the {self.n_proc} workers (mp.Process)")

        print(f"fork and start child process")
        workers = [Worker(self.main_NpArray,self.in_queue,self.out_queue) for name in range(self.n_proc)]
        [worker.start() for worker in workers]

        print("add data to the manager.queue for multi-processes")
        [self.in_queue.put(replica_set) for replica_set in self.input_list]

        print("update initial results")
        self.prevIoUs_result = 0
        result = [i for i in self.out_queue.get()]
        self.mc_scheduler(result)

        while self.scheduler_val < 100:
            # From the action value obtained from each process,get Action results from self.out_queue
            result = [i for i in self.out_queue.get()]

            # compare prevIoUs results
            self.mc_scheduler(result)
            print(f">> {self.scheduler_val}")

            # generate the new input list
            set_of_list = np.random.choice(np.arange(self.n_traj),)
            new_input_list = list(itertools.islice(multiset_permutations(set_of_list),1000))
            [self.in_queue.put(new_input_list) for input_list in new_input_list]


if __name__ == '__main__':
    sample_obj = MonteCarlo()
    sample_obj.run()

我的问题是

  1. 我的机器没有使用 n_proc 进程,它只使用了 1 个进程。
  2. 如何改进我的代码? 请在这方面帮助我。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。