如何解决Dask 可以自动创建一棵树来并行计算并减少工作人员之间的副本吗?
我将其分为两个部分,背景和问题。问题一直在底部。
背景:
假设我想(使用 Dask 分布式)做一个令人尴尬的并行计算,比如对 16 个巨大的数据帧求和。我知道使用 CUDA 会非常快,但让我们继续使用 Dask 作为示例。
实现此目的的基本方法(使用延迟)是:
from functools import reduce
import math
from dask import delayed,compute,visualize
import dask.distributed as dd
import numpy as np
@delayed
def gen_matrix():
return np.random.rand(1000,1000)
@delayed
def calc_sum(matrices):
return reduce(lambda a,b: a + b,matrices)
if __name__ == '__main__':
num_matrices = 16
# Plop them into a big list
matrices = [gen_matrix() for _ in range(num_matrices)]
# Here's the Big Sum
matrices = calc_sum(matrices)
# Go!
with dd.Client('localhost:8786') as client:
f = client.submit(compute,matrices)
result = client.gather(f)
这是 dask 图:
这当然会奏效,但是随着矩阵的大小(参见上面的 gen_matrix)变得太大,Dask 分布式工作者开始出现三个问题:
- 他们向执行求和的主要工作人员发送数据超时
- 主要工作人员在收集所有矩阵时内存不足
- 总和不是并行运行的(只有矩阵组合是)
请注意,这些问题都不是 Dask 的错,它按宣传的那样工作。我刚刚设置的计算很差。
一种解决方案是将其分解为树计算,如下所示,以及该图的 dask 可视化:
from functools import reduce
import math
from dask import delayed,1000)
@delayed
def calc_sum(a,b):
return a + b
if __name__ == '__main__':
num_matrices = 16
# Plop them into a big list
matrices = [gen_matrix() for _ in range(num_matrices)]
# This tells us the depth of the calculation portion
# of the tree we are constructing in the next step
depth = int(math.log(num_matrices,2))
# This is the code I don't want to have to manually write
for _ in range(depth):
matrices = [
calc_sum(matrices[i],matrices[i+1])
for i in range(0,len(matrices),2)
]
# Go!
with dd.Client('localhost:8786') as client:
f = client.submit(compute,matrices)
result = client.gather(f)
和图表:
问题:
我希望能够通过库或 Dask 本身来完成此树生成。我怎样才能做到这一点?
对于那些想知道的人,为什么不直接使用上面的代码?因为有些边缘情况我不想编写代码,也因为它只是需要编写更多代码:)
我也看过这个:Parallelize tree creation with dask
functools 或 itertools 中有什么东西知道如何做到这一点(并且可以与 dask.delayed 一起使用)?
解决方法
Dask bag 有一个减少/聚合方法,可以生成树状 DAG:fold。
工作流程是“打包”延迟的对象,然后折叠它们。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。