微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

分块写入XArray MultiIndex数据

如何解决分块写入XArray MultiIndex数据

我正在尝试有效地重组大型多维数据集。假设随着时间的推移,我有许多遥感图像,其中有多个带,坐标为像素坐标y y,图像采集时间为时间带,采集的不同数据为带宽。

在我的用例中,假设xarray的坐标长度大致为x(3000),y(3000),时间(10)和带(40)浮点数据。所以100gb +的数据。

我一直在尝试从this example开始工作,但是在将其翻译成这种情况时遇到了麻烦。

小数据集示例

注意:实际数据比此示例大得多。

import numpy as np
import dask.array as da
import xarray as xr

nrows = 100
ncols = 200
row_chunks = 50
col_chunks = 50

data = da.random.random(size=(1,nrows,ncols),chunks=(1,row_chunks,col_chunks))

def create_band(data,x,y,band_name):

    return xr.DataArray(data,dims=('band','y','x'),coords={'band': [band_name],'y': y,'x': x})

def create_coords(data,left,top,celly,cellx):
    nrows = data.shape[-2]
    ncols = data.shape[-1]
    right = left + cellx*ncols
    bottom = top - celly*nrows
    x = np.linspace(left,right,ncols) + cellx/2.0
    y = np.linspace(top,bottom,nrows) - celly/2.0
    
    return x,y

x,y = create_coords(data,1000,2000,30,30)

src = []

for time in ['t1','t2','t3']:

    src_t = xr.concat([create_band(data,band) for band in ['blue','green','red','nir']],dim='band')\
                    .expand_dims(dim='time')\
                    .assign_coords({'time': [time]})
    
    src.append(src_t)

src = xr.concat(src,dim='time')

print(src)


<xarray.DataArray 'random_sample-5840d8564d778d573dd403f27c3f47a5' (time: 3,band: 4,y: 100,x: 200)>
dask.array<concatenate,shape=(3,4,100,200),dtype=float64,chunksize=(1,1,50,50),chunktype=numpy.ndarray>
Coordinates:
  * x        (x) float64 1.015e+03 1.045e+03 1.075e+03 ... 6.985e+03 7.015e+03
  * band     (band) object 'blue' 'green' 'red' 'nir'
  * y        (y) float64 1.985e+03 1.955e+03 1.924e+03 ... -984.7 -1.015e+03
  * time     (time) object 't1' 't2' 't3'

重组-堆叠并转置

我需要存储以下内容

print(src.stack(sample=('y','x','time')).T)

<xarray.DataArray 'random_sample-5840d8564d778d573dd403f27c3f47a5' (sample: 60000,band: 4)>
dask.array<transpose,shape=(60000,4),chunksize=(3600,1),chunktype=numpy.ndarray>
Coordinates:
  * band     (band) object 'blue' 'green' 'red' 'nir'
  * sample   (sample) MultiIndex
  - y        (sample) float64 1.985e+03 1.985e+03 ... -1.015e+03 -1.015e+03
  - x        (sample) float64 1.015e+03 1.015e+03 ... 7.015e+03 7.015e+03
  - time     (sample) object 't1' 't2' 't3' 't1' 't2' ... 't3' 't1' 't2' 't3'

我希望使用dask和xarray将结果分块写入磁盘,open_mfdataset可以访问。 实木复合地板似乎是一个不错的选择,但我不知道如何分块编写(src太大,无法存储在内存中)。

@dask.delayed
def stacker(data):
   return data.stack(sample=('y','time')).T.to_pandas() 

stacker(src).to_parquet('out_*.parquet')

def stack_write(data):
   data.stack(sample=('y','time')).T.to_pandas().to_parquet('out_*.parquet')
   return None

stack_write(src)

在这一点上,我只是希望一些好主意。谢谢!

解决方法

我这里有一个解决方案(https://github.com/pydata/xarray/issues/1077#issuecomment-644803374),用于将多索引数据集写入文件。

您必须手动将数据集“编码”为可以写为netCDF的形式。然后在读回时进行“解码”。

import numpy as np
import pandas as pd
import xarray as xr


def encode_multiindex(ds,idxname):
    encoded = ds.reset_index(idxname)
    coords = dict(zip(ds.indexes[idxname].names,ds.indexes[idxname].levels))
    for coord in coords:
        encoded[coord] = coords[coord].values
    shape = [encoded.sizes[coord] for coord in coords]
    encoded[idxname] = np.ravel_multi_index(ds.indexes[idxname].codes,shape)
    encoded[idxname].attrs["compress"] = " ".join(ds.indexes[idxname].names)
    return encoded


def decode_to_multiindex(encoded,idxname):
    names = encoded[idxname].attrs["compress"].split(" ")
    shape = [encoded.sizes[dim] for dim in names]
    indices = np.unravel_index(encoded.landpoint.values,shape)
    arrays = [encoded[dim].values[index] for dim,index in zip(names,indices)]
    mindex = pd.MultiIndex.from_arrays(arrays)

    decoded = xr.Dataset({},{idxname: mindex})
    for varname in encoded.data_vars:
        if idxname in encoded[varname].dims:
            decoded[varname] = (idxname,encoded[varname].values)
    return decoded
,

目前,这不是解决方案,而是修改了您的代码版本,以便在其他人想要尝试解决此问题时可以轻松重现:

问题出在stack操作(concatenated.stack(sample=('y','x','time'))上。 在这一步,内存不断增加,进程为killed

concatenated对象是“ Dask后备” xarray.DataArray。因此,我们可以预期stack操作将由Dask懒惰地完成。那么,为什么在此步骤killed中使用进程?

发生这种情况的2种可能性:

  • stack操作实际上是由Dask懒惰完成的,但是由于数据是如此之大,所以即使Dask所需的最小内存也太多了

  • stack操作不受Dask支持


import numpy as np
import dask.array as da
import xarray as xr
from numpy.random import RandomState

nrows = 20000
ncols = 20000
row_chunks = 500
col_chunks = 500


# Create a reproducible random numpy array
prng = RandomState(1234567890)
numpy_array = prng.rand(1,nrows,ncols)

data = da.from_array(numpy_array,chunks=(1,row_chunks,col_chunks))


def create_band(data,x,y,band_name):

    return xr.DataArray(data,dims=('band','y','x'),coords={'band': [band_name],'y': y,'x': x})

def create_coords(data,left,top,celly,cellx):
    nrows = data.shape[-2]
    ncols = data.shape[-1]
    right = left + cellx*ncols
    bottom = top - celly*nrows
    x = np.linspace(left,right,ncols) + cellx/2.0
    y = np.linspace(top,bottom,nrows) - celly/2.0
    
    return x,y


x,y = create_coords(data,1000,2000,30,30)

bands = ['blue','green','red','nir']
times = ['t1','t2','t3']
bands_list = [create_band(data,band) for band in bands]

src = []

for time in times:

    src_t = xr.concat(bands_list,dim='band')\
                    .expand_dims(dim='time')\
                    .assign_coords({'time': [time]})

    src.append(src_t)


concatenated = xr.concat(src,dim='time')
print(concatenated)
# computed = concatenated.compute() # "computed" is ~35.8GB

stacked = concatenated.stack(sample=('y','time'))

transposed = stacked.T

为了更改nrows的大小,可以尝试更改ncolsconcatenated的值。为了提高性能,我们也可以/应该更改chunks

注意:我什至尝试过

concatenated.to_netcdf("concatenated.nc")
concatenated = xr.open_dataarray("concatenated.nc",chunks=10)

这是为了确保它是由Dask支持的DataArray,并且也能够调整块。 我为chunks尝试了不同的值:但是总是内存不足。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。