如何解决执行 Dask dekayed 和计算时出现 FileNotFoundError
我不熟悉并行处理并询问应用程序。所以我有 1000 个文件要并行运行,所以我使用 Dask 计算来执行此操作。我的工人和核心已正确分配。我在 JuputerLab 环境中运行所有东西。
这是我的尝试;
连接到网关:
from dask_gateway import Gateway
gateway = Gateway(
address=address,public_address=public_address,auth="jupyterhub",)
options = gateway.cluster_options()
options
选择不需要的工人数:
cluster = gateway.new_cluster(
cluster_options=options,)
cluster.adapt(minimum=10,maximum=50)
client = cluster.get_client()
cluster
client
在这个列表中,我有一堆我的 gzip 文件,里面有 nc 文件:
turb=glob.glob('Data/*')
这是我对这些文件执行的函数:
def get_turb(file):
name = str(file[5:18])
d=[file[5:9],file[9:11],file[11:13],file[14:16],file[16:18]]
f_zip = gzip.open(file,'rb')
yr=d[0]
mo=d[1]
da=d[2]
hr=d[3]
mn=d[4]
fs = s3fs.S3FileSystem(anon=True)
period = pd.Period(str(yr)+str('-')+str(mo)+str('-')+str(da),freq='D')
# period.dayofyear
dy=period.dayofyear
cc=[7,8,9,10,11,12,13,14,15,16] #look at the IR channels only for now
dat = xr.open_dataset(f_zip)
这里我正在做 dask 延迟和计算:
files = []
for grb_file in turb[:20]:
s3_ds = dask.delayed(get_turb)(grb_file)
files.append(s3_ds)
s3_ds.visualize()
files = dask.compute(*edr_files)
Dask 延迟正确生成对象并将其附加到文件,但在运行计算时抛出错误:
FileNotFoundError: [Errno 2] No such file or directory: 'Data/20190107_0300.gz'
事实上那里有同名的文件,我已经正确检查了它并且它运行完美而没有正确的dask延迟功能。有人可以指导我我错过了什么或做错了什么。非常感谢您的帮助!
解决方法
听起来工作人员在不同的机器上运行,在这种情况下,文件可能不存在(我无法想象为什么你会使用带有本地集群的 dask gateway,所以我假设这个集群是一个多节点集群)。
您要么需要将文件复制给您的工作人员,或者更好的是,从 S3 加载它们。
在您的代码片段中,您正在创建一个 s3fs 对象,但您根本没有使用它。如果这些文件在 s3 中,您应该调用 fs.open
而不仅仅是 open
要查看您的员工正在查看的内容,您可以执行以下操作
import os
# tells you which directory they're all pointing at
client.run(os.getcwd)
# tells you what's in there
client.run(lambda: os.listdir('./')
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。