微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Dask 和 Xarray 两个数据集之间的差异

如何解决Dask 和 Xarray 两个数据集之间的差异

我需要使用 dask 和 Xarray 计算两个数据集(每月重新采样的两个每日变量)之间的差异。这是我的代码

def diff(path_1,path_2):
    import xarray as xr
    max_v=xr.open_mfdataset(path_1,combine='by_coords',concat_dim="time",parallel=True)['variable_1'].resample({'time': '1M'}).max()
    min_v=xr.open_mfdataset(path_2,parallel=True)['variable_2'].resample({'time': '1M'}).min()
    
    return (max_v-min_v).compute()
        
future = client.submit(diff,path_1,path_2)
diff = client.gather(future)

我也试过这个:

%%time
def max_var(path):
    import xarray as xr
    multi_file_dataset = xr.open_mfdataset(path,parallel=True)
    max_v=multi_file_dataset['variable_1'].resample(time='1M').max(dim='time')
    return max_v.compute()

def min_var(path):
    import xarray as xr
    multi_file_dataset = xr.open_mfdataset(path,parallel=True)
    min_v=multi_file_dataset['variable_2'].resample(time='1M').min(dim='time')
    return min_v.compute()

futures=[]
future = client.submit(max_temp,path1)
futures.append(future)
future = client.submit(min_temp,path2)
futures.append(future)
results = client.gather(futures)

diff = results[0]-results[1]

但我注意到在 getitem-nanmax e getitem-nanmin 的最后一步(例如 1980 年中的 1974 年)的计算变得非常缓慢。

这里是集群配置:

cluster = SLURMCluster(walltime='1:00:00',cores=5,memory='5GB')
cluster.scale(jobs=10)

每个数据集由几个文件组成:总大小=7GB

有没有更好的方法来实现这个计算?

谢谢

解决方法

不能 100% 确定这适用于您的情况,但没有 mwe 就很难做得更好。所以,我怀疑 .compute() 使用的 xarray 可能与 client.submit 发生冲突,因为现在计算正在工作人员身上进行,我不确定它是否可以正确分配工作同行(但这是一个怀疑,我不确定)。因此,解决此问题的一种方法是将计算放入主脚本中(因为 xarray 将与背景中的 dask 集成),所以这可能会起作用:

import xarray as xr

max_v=xr.open_mfdataset(path_1,combine='by_coords',concat_dim="time",parallel=True,chunks={'time': 10})['variable_1'].resample({'time': '1M'}).max()
min_v=xr.open_mfdataset(path_2,chunks={'time': 10})['variable_2'].resample({'time': '1M'}).min()
    
diff_result = (max_v-min_v).compute()

以下是不同数据集上的 mwe

import xarray as xr

# chunks option will create dask array
ds = xr.tutorial.open_dataset('rasm',decode_times=True,chunks={'time': 10})

# these are lazy calculations
max_v = ds['Tair'].resample({'time': '1M'}).max()
min_v = ds['Tair'].resample({'time': '1M'}).min()

# this will use dask scheduler in the background
diff_result = (max_v-min_v).compute()

# since the data refers to the same variable,all the results will be either 0 or `nan` (if the variable was not available in that time/x/y combination)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。