微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python:以不同的执行时间同时运行多个函数

如何解决Python:以不同的执行时间同时运行多个函数

我正在处理一个需要运行两个不同 cpu 密集型功能的项目。因此,使用多进程方法似乎是要走的路。我面临的挑战是一个函数的运行时间比另一个慢。为便于论证,假设 execute 的运行时间为 0.1 秒,而 update 需要整整一秒才能运行。目标是当 update 运行时,execute 将计算 10 次输出值。 update 完成后,它需要将一组参数传递给 execute,然后 update 可以使用新的一组参数继续生成输出。一段时间后,from multiprocessing import Pool from datetime import datetime import time import numpy as np class MyClass(): def __init__(self,inital_parameter_1,inital_parameter_2): self.parameter_1 = inital_parameter_1 self.parameter_2 = inital_parameter_2 def execute(self,input_1,input_2,time_in): print('starting execute function for time:' + str(time_in)) time.sleep(0.1) # wait for 100 milliseconds # generate some output output = (self.parameter_1 * input_1) + (self.parameter_2 + input_2) print('exiting execute function') return output def update(self,update_input_1,update_input_2,time_in): print('starting update function for time:' + str(time_in)) time.sleep(1) # wait for 1 second # generate parameters self.parameter_1 += update_input_1 self.parameter_2 += update_input_2 print('exiting update function') def smap(f): return f() if __name__ == "__main__": update_input_1 = 3 update_input_2 = 4 input_1 = 0 input_2 = 1 # initialize class my_class = MyClass(1,2) # total runtime (arbitrary) runtime = int(10e6) # update_time (arbitrary) update_time = np.array([10,10e2,15e4,20e5]) for current_time in range(runtime): # if time equals update time run both functions simultanously until update is complete if any(update_time == current_time): with Pool() as pool: res = pool.map_async(my_class.smap,[my_class.execute(input_1,current_time),my_class.update(update_input_1,current_time)]) # otherwise run only execute else: output = my_class.execute(input_1,current_time) # increment input input_1 += 1 input_2 += 2 需要再次运行并再次生成一组新参数。

此外,两个函数都需要一组不同的输入变量。

下面的图片链接应该可以更好地形象化我的难题。

function runtime visualisation

根据我收集到的信息 (https://zetcode.com/python/multiprocessing/),使用非对称映射方法可能是可行的方法,但它似乎并没有真正奏效。非常感谢任何帮助。

代码

// replace the client with a mock
@MockBean(YourClientInterface)
YourClientInterface yourClientInterface() {
    return Mock(YourClientInterface)
}

// inject the mock in order to configure responses when it gets called
@Inject
YourClientInterface client

解决方法

我承认无法完全按照您的代码与您的描述进行比较。但我看到了一些问题:

  1. 方法 update 不返回除 None 之外的任何值,后者是由于缺少 return 语句而隐式返回的。
  2. 您的 with Pool() ...: 块将在块退出时调用 terminate,即在您调用非阻塞 pool.map_async 之后立即调用。但是您没有规定等待这个提交的任务完成(terminate 很可能会在它完成之前终止正在运行的任务)。
  3. 您传递给 map_async 函数的是辅助函数名称和一个可迭代。但是您正在从当前主进程调用对 executeupdate 的方法调用,并将它们的返回值用作 iterable 的元素,并且这些返回值绝对不是适合于传递给 smap。所以没有进行多处理,这完全是错误的。
  4. 您还一遍又一遍地创建和销毁进程池。只创建一次进程池要好得多。

因此,我建议至少进行以下更改。但请注意,此代码生成任务的速度可能比完成任务的速度要快得多,并且在给定当前 runtime 值的情况下,您可能有数百万个任务排队等待运行,这可能会对内存等系统资源造成很大压力。所以我插入了一些代码来确保提交任务的速度受到限制,以便未完成提交的任务数量永远不会超过可用 CPU 内核数量的三倍。

# we won't need heavy-duty numpy for what we are doing:
#import numpy as np
from multiprocessing import cpu_count
from threading import Lock
... # etc.

if __name__ == "__main__":
    update_input_1 = 3
    update_input_2 = 4
    input_1 = 0
    input_2 = 1
    # initialize class
    my_class = MyClass(1,2)

    # total runtime (arbitrary)
    runtime = int(10e6)
    # update_time (arbitrary)
    # we don't need overhead of numpy (remove import of numpy):
    #update_time = np.array([10,10e2,15e4,20e5])
    update_time = [10,20e5]

    tasks_submitted = 0
    lock = Lock()

    execute_output = []
    def execute_result(result):
        global tasks_submitted

        with lock:
            tasks_submitted -= 1
        # result is the return value from method execute
        # do something with it,e.g. execute_output.append(result)
        pass

    update_output = []
    def update_result(result):
        global tasks_submitted

        with lock:
            tasks_submitted -= 1
        # result is the return value from method update
        # do something with it,e.g. update_output.append(result)
        pass

    n_processors = cpu_count()
    with Pool() as pool:
        for current_time in range(runtime):
            # if time equals update time run both functions simultanously until update is complete
            #if any(update_time == current_time):
            if current_time in update_time:
                # run both update and execute:
                pool.apply_async(my_class.update,args=(update_input_1,update_input_2,current_time),callback=update_result)
                with lock:
                    tasks_submitted += 1
            pool.apply_async(my_class.execute,args=(input_1,input_2,callback=execute_result)
            with lock:
                tasks_submitted += 1

            # increment input
            input_1 += 1
            input_2 += 2
            while tasks_submitted > n_processors * 3:
                time.sleep(.05)
        # Ensure all tasks have completed:
        pool.close()
        pool.join()
        assert(tasks_submitted == 0)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。