如何解决Python:以不同的执行时间同时运行多个函数
我正在处理一个需要运行两个不同 cpu 密集型功能的项目。因此,使用多进程方法似乎是要走的路。我面临的挑战是一个函数的运行时间比另一个慢。为便于论证,假设 execute
的运行时间为 0.1 秒,而 update
需要整整一秒才能运行。目标是当 update
运行时,execute
将计算 10 次输出值。 update
完成后,它需要将一组参数传递给 execute
,然后 update
可以使用新的一组参数继续生成输出。一段时间后,from multiprocessing import Pool
from datetime import datetime
import time
import numpy as np
class MyClass():
def __init__(self,inital_parameter_1,inital_parameter_2):
self.parameter_1 = inital_parameter_1
self.parameter_2 = inital_parameter_2
def execute(self,input_1,input_2,time_in):
print('starting execute function for time:' + str(time_in))
time.sleep(0.1) # wait for 100 milliseconds
# generate some output
output = (self.parameter_1 * input_1) + (self.parameter_2 + input_2)
print('exiting execute function')
return output
def update(self,update_input_1,update_input_2,time_in):
print('starting update function for time:' + str(time_in))
time.sleep(1) # wait for 1 second
# generate parameters
self.parameter_1 += update_input_1
self.parameter_2 += update_input_2
print('exiting update function')
def smap(f):
return f()
if __name__ == "__main__":
update_input_1 = 3
update_input_2 = 4
input_1 = 0
input_2 = 1
# initialize class
my_class = MyClass(1,2)
# total runtime (arbitrary)
runtime = int(10e6)
# update_time (arbitrary)
update_time = np.array([10,10e2,15e4,20e5])
for current_time in range(runtime):
# if time equals update time run both functions simultanously until update is complete
if any(update_time == current_time):
with Pool() as pool:
res = pool.map_async(my_class.smap,[my_class.execute(input_1,current_time),my_class.update(update_input_1,current_time)])
# otherwise run only execute
else:
output = my_class.execute(input_1,current_time)
# increment input
input_1 += 1
input_2 += 2
需要再次运行并再次生成一组新参数。
此外,两个函数都需要一组不同的输入变量。
下面的图片链接应该可以更好地形象化我的难题。
function runtime visualisation
根据我收集到的信息 (https://zetcode.com/python/multiprocessing/),使用非对称映射方法可能是可行的方法,但它似乎并没有真正奏效。非常感谢任何帮助。
伪代码
// replace the client with a mock
@MockBean(YourClientInterface)
YourClientInterface yourClientInterface() {
return Mock(YourClientInterface)
}
// inject the mock in order to configure responses when it gets called
@Inject
YourClientInterface client
解决方法
我承认无法完全按照您的代码与您的描述进行比较。但我看到了一些问题:
- 方法
update
不返回除None
之外的任何值,后者是由于缺少return
语句而隐式返回的。 - 您的
with Pool() ...:
块将在块退出时调用terminate
,即在您调用非阻塞pool.map_async
之后立即调用。但是您没有规定等待这个提交的任务完成(terminate
很可能会在它完成之前终止正在运行的任务)。 - 您传递给
map_async
函数的是辅助函数名称和一个可迭代。但是您正在从当前主进程调用对execute
和update
的方法调用,并将它们的返回值用作 iterable 的元素,并且这些返回值绝对不是适合于传递给smap
。所以没有进行多处理,这完全是错误的。 - 您还一遍又一遍地创建和销毁进程池。只创建一次进程池要好得多。
因此,我建议至少进行以下更改。但请注意,此代码生成任务的速度可能比完成任务的速度要快得多,并且在给定当前 runtime
值的情况下,您可能有数百万个任务排队等待运行,这可能会对内存等系统资源造成很大压力。所以我插入了一些代码来确保提交任务的速度受到限制,以便未完成提交的任务数量永远不会超过可用 CPU 内核数量的三倍。
# we won't need heavy-duty numpy for what we are doing:
#import numpy as np
from multiprocessing import cpu_count
from threading import Lock
... # etc.
if __name__ == "__main__":
update_input_1 = 3
update_input_2 = 4
input_1 = 0
input_2 = 1
# initialize class
my_class = MyClass(1,2)
# total runtime (arbitrary)
runtime = int(10e6)
# update_time (arbitrary)
# we don't need overhead of numpy (remove import of numpy):
#update_time = np.array([10,10e2,15e4,20e5])
update_time = [10,20e5]
tasks_submitted = 0
lock = Lock()
execute_output = []
def execute_result(result):
global tasks_submitted
with lock:
tasks_submitted -= 1
# result is the return value from method execute
# do something with it,e.g. execute_output.append(result)
pass
update_output = []
def update_result(result):
global tasks_submitted
with lock:
tasks_submitted -= 1
# result is the return value from method update
# do something with it,e.g. update_output.append(result)
pass
n_processors = cpu_count()
with Pool() as pool:
for current_time in range(runtime):
# if time equals update time run both functions simultanously until update is complete
#if any(update_time == current_time):
if current_time in update_time:
# run both update and execute:
pool.apply_async(my_class.update,args=(update_input_1,update_input_2,current_time),callback=update_result)
with lock:
tasks_submitted += 1
pool.apply_async(my_class.execute,args=(input_1,input_2,callback=execute_result)
with lock:
tasks_submitted += 1
# increment input
input_1 += 1
input_2 += 2
while tasks_submitted > n_processors * 3:
time.sleep(.05)
# Ensure all tasks have completed:
pool.close()
pool.join()
assert(tasks_submitted == 0)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。