如何解决将 dask 系列从 Delayed 对象添加到 dask 数据帧
我想使用 dask 数据框中的列创建一个 dask 系列并将其添加到 dask 数据框中。创建新系列的函数可以是任意复杂的,并且可能使用 dask.dataframe
没有的函数。我的想法是对该函数使用 dask.delayed
装饰器,并从该单个 Delayed
对象创建一个 dask 系列。然后我遇到了必须将这个新系列的分区/部门与原始 dask 数据框对齐的问题,以便我可以将其作为一列附加。我可以假设该函数创建的系列始终与原始 dask 数据帧具有相同的长度和索引。所以我尝试了一些“肮脏”的技巧来实现它。见下文:
import dask
import dask.dataframe as dd
import numpy as np
@dask.delayed
def customized_function(a,b):
return a+b
ddf = dd.from_pandas(pd.DataFrame(np.arange(30).reshape(-1,3),columns=list('ABC')),npartitions=2)
ddf = ddf.loc[ddf.A <= 10]
result = customized_function(dask.delayed(ddf['A']),dask.delayed(ddf['B']))
result_ds = dd.from_delayed(result,meta=(None,object),verify_meta=False)
result_ds = result_ds.repartition(npartitions=ddf.npartitions)
result_ds.divisions = ddf.divisions
ddf['D'] = result_ds
ddf.compute()
不幸的是,这并不总是有效。上面代码的输出是here
它仅在 ddf
只有一个分区时有效,或者如果我删除了 ddf = ddf.loc[ddf.A <= 10]
行或者我在 ddf = ddf.repartition(1)
之后添加了 ddf = ddf.loc[ddf.A <= 10]
。
这是某种类型的错误吗?或者,如果这是预期的,有什么方法可以实现我想要的吗?我的 dask 版本是 2021.03.0
。
编辑:
我暂时自己找到了解决方法,见下文:
import dask
import dask.dataframe as dd
import numpy as np
def customized_function(partition):
return partition.A + partition.B
ddf = dd.from_pandas(pd.DataFrame(np.arange(30).reshape(-1,npartitions=2)
ddf = ddf.loc[ddf.A <= 10]
args_ddf = ddf[['A','B']].repartition(1)
result_ds = args_ddf.map_partitions(customized_function)
ddf['D'] = result_ds
ddf.compute()
我必须确保将 args_ddf
重新分区为 1,以便我可以安全地调用 customized_function
,因为它可能会计算整个系列的一些统计信息。这意味着我必须将函数需要的列放入内存中,但这是我愿意为灵活性付出的代价。如果有人知道更好的方法,请告诉我。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。