如何解决从 dask 数组产生向量输出
我有一个大的 dask 数组 (labeled_arr
),它实际上是一个带标签的光栅图像(dtype 是 int64)。我想使用 rasterio 将标记区域转换为多边形并将它们组合成一个多边形列表(或只有几何列的地质系列)。这是对单个数组的简单任务,但我无法弄清楚如何告诉 dask 我希望它对每个块执行此操作并返回不是数组的内容。
应用于每个块的函数:
def get_polys(labeled_blocks):
polys = list(poly[0]['coordinates'][0] for poly in rasterio.features.shapes(
labeled_blocks.astype('int32'),transform=trans))[:-1]
# Note: rasterio.features.shapes returns an iterator,hence the conversion to a list here
return polys
试图让 dask 执行此操作的代码行:
test_polygons = da.blockwise(get_polys,'',labeled_arr,'ij')
test_polygons.compute()
其中 labeled_arr
是输入的分块 dask 数组。
按原样运行会返回错误,指出我必须为 da.blockwise
指定 dtype。指定 dtype 会返回 AttributeError,因为输出列表类型没有 dtype 属性。我发现了 meta
关键字,但仍然无法获得将输出转换为系列或列表的正确语法。
我不喜欢上述方法,但我的总体目标是:采用带标签的分块 dask 数据数组(并非全部适合内存),根据每个块的计算提取一个列表,并生成一个连接的列表(或 Pandas 数据对象)以及我原始分块数组中所有块的输出。
解决方法
这可能有效:
import dask
import dask.array as da
# we expect to see 4 blocks here
test_array = da.random.random((4,4),chunks=(2,2))
@dask.delayed
def my_func(block):
# do something fancy
return list(block)
results = dask.compute([my_func(x) for x in test_array.to_delayed().ravel()])
正如您所指出的,问题在于 list
没有 dtype
。解决此问题的一种方法是将 list
转换为 np.array
,但我不确定这是否适用于所有 geometry
对象(对于 {{1} },但由于长度不同,多边形可能会出现问题)。由于您对将这些几何图形强制放入数组不感兴趣,因此最好将单个块视为 Points
对象,一次一个地将它们馈送到您的函数中(但跨工作器/进程进行缩放)。
这是我最初得到的解决方案,尽管考虑到 concatenate=True kwarg,它仍然需要大量 RAM。
poss_list = []
def get_polys(labeled_blocks):
polys = list(poly[0]['coordinates'][0] for poly in rasterio.features.shapes(
labeled_blocks.astype('int32'),transform=trans))[:-1]
poss_list.append(polys)
da.blockwise(get_bergs,'',labeled_arr,'ij',meta=pd.DataFrame({'c':[]}),concatenate=True).compute()
如果我的解释是正确的,这不会将块输入到我的跨工作程序/进程的函数中(这似乎我现在可以逃脱)。
更新 - 使用 dask.delayed 改进答案,以@SultanOrazbayev 接受的答案为基础
import dask
# onedem = original_xarray_dataarray
poss_list = []
@dask.delayed
def get_bergs(labeled_blocks,pointer,chunk0,chunk1):
# Note: I'm using this in a CRS (polar stereo) with negative y coordinates - it hasn't been tested for other CRSs
def getpx(chunkid,chunksz):
amin = chunkid[0] * chunksz[0][0]
amax = amin + chunksz[0][0]
bmin = chunkid[1] * chunksz[1][0]
bmax = bmin + chunksz[1][0]
return (amin,amax,bmin,bmax)
# order of all inputs (and outputs) should be y,x when axis order is used
chunksz = (onedem.chunks['y'],onedem.chunks['x'])
ymini,ymaxi,xmini,xmaxi = getpx((chunk0,chunk1),chunksz)
# use rasterio Windows and rioxarray to construct transform
# https://rasterio.readthedocs.io/en/latest/topics/windowed-rw.html#window-transforms
chwindow = rasterio.windows.Window(xmini,ymini,xmaxi-xmini,ymaxi-ymini) #.from_slices[ymini,ymaxi],[xmini,xmaxi])
trans = onedem.rio.isel_window(chwindow).rio.transform(recalc=True)
return list(poly[0]['coordinates'][0] for poly in rasterio.features.shapes(labeled_blocks.astype('int32'),transform=trans))[:-1]
for __,obj in enumerate(labeled_arr.to_delayed()):
for bl in obj:
piece = dask.delayed(get_bergs)(bl,*bl.key)
poss_list.append(piece)
poss_list = dask.compute(*poss_list)
# unnest the list of polygons returned by using dask to polygonize
concat_list = [item for sublist in poss_list for item in sublist if len(item)!=0]
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。