如何解决在 worker 上存储对象并执行方法
我有一个应用程序,其中有一组可以进行大量设置的对象(每个对象最多需要 30 秒到 1 分钟)。设置好后,我想传递一个参数向量(小,
我的想法是,这应该是工作人员的“池”,其中每个工作人员都使用自己的特定配置(这将不同)初始化一个对象,并保留在工作人员的内存中。随后,每个worker上这个对象的一个方法得到一个向量并返回一些数组。然后由主程序组合这些(顺序不重要)。
一些有效的 MWE 如下(串行版本第一,dask 版本第二):
import datetime as dt
import itertools
import numpy as np
from dask.distributed import Client,LocalCluster
# Set up demo dask on local machine
cluster = LocalCluster()
client = Client(cluster)
class Model(object):
def __init__(self,power):
self.power = power
def powpow(self,x):
return np.power(x,self.power)
def neg(self,x):
return -x
def compute_me(self,x):
return sum(self.neg(self.powpow(x)))
# Set up the objects locally
bag = [Model(power)
for power in [1,2,3,4,5]
]
x = [13,23,37]
result = [obj.compute_me(x) for obj in bag]
# Using dask
# Wrapper function to pass the local object
# and parameter
def wrap(obj,x):
return obj.compute_me(x)
res = []
for obj,xx in itertools.product(bag,[x,]):
res.append(client.submit(wrap,obj,xx))
result_dask = [r.result() for r in res]
np.allclose(result,result_dask)
在我的实际案例中,Model
类会执行大量初始化、预计算等操作,初始化所需的时间可能是运行 compute_me
方法的 10-50 倍.基本上,就我而言,让每个工作人员在本地都有一个预定义的 Model
实例,并让 dask 将输入传送到 compute_me
是有益的。
This post (第二个答案)在命名空间中初始化和存储,但该示例没有显示如何将不同的初始化参数传递给每个工作人员。或者有没有其他方法可以做到这一点?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。