如何解决cluster.adapt() 在将内存数据移动到其他人之前杀死工作人员
我将 Dask 与 Slurm 集群一起使用:
cluster = SLURMCluster(cores=64,processes=64,memory="128G",walltime="24:00:00")
#export DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES=100
cluster.adapt(minimum_jobs=1,maximum_jobs=2,interval="20 s",target_duration="100 s",wait_count=20)
我的工作负载从约 1000 个节点两两减少到 1。每次减少需要约 2 分钟。所以,开头可能有平行,但结尾的平行较少。我只能访问集群中的两个节点。所以我希望它在开始时使用两个集群节点,最后使用一个集群节点。
# pseudo code
def reduce(task):
futures = []
for i in range(0,len(task),2):
futures.append(client.submit(reduceTwo(),task[i]. task[i+1]))
while len(futures) != 1:
futures_new = []
for i in range(0,len(futures),2):
futures_new.append(client.submit(reduceTwo(),futures[i].result(),futures[i+1].result()))
futures = futures_new
return futures[0].result()
然而,我的问题是,当 cluster.adapt() 预计从 2 个集群节点减少到 1 个集群节点时,它会先减少到 0 并启动一个新节点。
问题一:降到0正常吗? 如果可以正确保存留在被杀节点内存中的输出数据(可能在调度程序节点,集群的登录节点中),实际上不会有问题。但是,我读了日志,似乎在工人可以正常停止/退休之前,它被杀得太早了。有些工人退休,有些则没有。
问题 2:这种“退休前杀戮”可能发生吗?有没有办法让员工有更长的退休时间?。你可以在上面的第一个代码中看到,我尝试增加尽可能多的时序参数,但它不起作用。我不完全理解this parameter list。
我知道我可以优化我的代码。像del futures一样,我们的计算完成了,所以一个worker的内存阶段任务将为0,并且它的死亡不会导致太多的计算需要重做。或者,可以使用相同的还原库。但是,无论如何,这两个Dask问题可以解决吗?
解决方法
回答我自己的问题,以防其他人看到同样的问题。
关键是:不要让所有节点将他们的临时文件保存在同一个共享磁盘的同一个目录中。
不指定local_directory
,很容易所有节点都想将worker本地文件保存到~/dask-worker-space
目录中,该目录在所有节点之间共享。然后,所有节点之间都会竞争在这个目录中读/写。然后,当一个节点想要杀死它的工人时,它可能会意外杀死其他节点的工人,最终 (Q1) 节点数量减少到 0。而且(Q2)无法移动被杀工人的数据。
我希望 Dask 可以支持所有节点写入同一个 dask-worker-space。这真的是自然行为,我的意思是,当我只想快速使用 Dask 做一些并行时,我的直觉不会告诉我:“设置 local_directory,否则程序会崩溃”。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。