如何解决在 Dask 中排队工人 具有 dask.delayed 的依赖期货期货集合.as_completed()
我需要使用 Dask 调度程序和工作器解决以下场景:
-
Dask 程序有 N 个循环调用的函数(N 个由用户定义)
-
每个函数都以
delayed(func)(args)
开始并行运行。 -
上一点的每个函数启动时,都会触发W个worker。这就是我调用工人的方式:
futures = client.map(worker_func,worker_args) worker_responses = client.gather(futures)
这意味着我需要 N * W 个工人来并行运行所有内容。问题是这不是最优的,因为它分配了太多的资源,我在云上运行它并且它很昂贵。另外,N是用户定义的,所以我事先不知道我需要有多少处理能力。
有没有一种方法可以让工作人员排队,如果我定义 Dask 有 X 个工作人员,当一个工作人员结束时,下一个工作人员开始?
解决方法
首先定义您需要的工人数量,将它们视为短暂的,但在整个处理过程中都是静态的
您可以动态创建它们(在您开始时或稍后),但可能希望在处理开始时将它们全部准备好
在您看来,client is an executor(所以当您提到workers 和并行运行时,您可能指的是同一件事
此类类似于 concurrent.futures
中的执行程序,但也允许 Future
调用中的 submit/map
对象。当客户端被实例化时,它默认接管所有 dask.compute
和 dask.persist
调用。
一旦您的工作人员可用,Dask 将通过调度程序分配给他们的工作
你应该通过将结果传递给 dask.delayed()
和前面的函数结果(这是一个 Future,而不是结果)来使任何相互依赖的任务这样做
这个 Futures-as-arguments 将允许 Dask 构建您工作的任务图
示例使用 https://examples.dask.org/delayed.html
未来参考https://docs.dask.org/en/latest/futures.html#distributed.Future
具有 dask.delayed 的依赖期货
这是Delayed docs中的一个完整示例(实际上是将几个连续的示例组合成相同的结果)
import dask
from dask.distributed import Client
client = Client(...) # connect to distributed cluster
def inc(x):
return x + 1
def double(x):
return x * 2
def add(x,y):
return x + y
data = [1,2,3,4,5]
output = []
for x in data:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
c = dask.delayed(add)(a,b) # depends on a and b
output.append(c)
total = dask.delayed(sum)(output) # depends on everything
total.compute() # 45
您可以拨打 total.visualize()
到 see the task graph
期货集合
如果您已经使用 .map(..)
来映射函数和参数对,您可以继续创建 Futures,然后一次性 .gather(..)
它们,即使它们在一个集合中(这很方便)给你)
.gather()
的 results
将与它们的排列相同(列表列表)
[[fn1(args11),fn1(args12)],[fn2(args21)],[fn3(args31),fn3(args32),fn3(args33)]]
https://distributed.dask.org/en/latest/api.html#distributed.Client.gather
import dask
from dask.distributed import Client
client = Client(...) # connect to distributed cluster
collection_of_futures = []
for worker_func,worker_args in iterable_of_pairs_of_fn_args:
futures = client.map(worker_func,worker_args)
collection_of_futures.append(futures)
results = client.gather(collection_of_futures)
备注
-
worker_args
必须是一些可迭代的映射到worker_func
,这可能是错误的来源 -
.gather()
ing 将阻塞,直到所有期货完成或筹集
.as_completed()
如果您需要尽快获得结果,您可以使用 .as_completed(..)
,但请注意,结果的顺序是不确定的,所以我认为这对您的情况没有意义......如果你发现它确实如此,你需要一些额外的保证
- 在结果中包含有关如何处理结果的信息
- 保留对每个的引用并检查它们
- 仅在无关紧要的情况下组合组(即所有期货具有相同的目的)
还要注意,产生的期货是完整的,但仍然是期货,所以你仍然需要调用.result()
或.gather()
他们
https://distributed.dask.org/en/latest/api.html#distributed.as_completed
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。