如何解决进行令人尴尬的并行数据连接时,dask 中出现 KilledWorker 错误
我有一个 embarrassingly parallel workload,我正在读取一组镶木地板文件,将它们连接成更大的镶木地板文件,然后将其写回磁盘。我在一个分布式计算机(带有分布式文件系统)上运行它,大约有 300 个工人,每个工人有 20GB 的 RAM。每个单独的工作应该只消耗 2-3 GB 的 RAM,但不知何故工作人员由于内存错误而崩溃(获取:distributed.scheduler.KilledWorker 异常)。我可以在工作人员的输出日志中看到以下内容:
内存使用率很高,但工作人员没有数据可以存储到磁盘。可能 其他一些进程正在泄漏内存。进程内存:18.20 GB
with open('ts_files_list.txt','r') as f:
all_files = f.readlines()
# There are about 500K files
all_files = [f.strip() for f in all_files]
# grouping them into groups of 50.
# The concatenated df should be about 1GB in memory
npart = 10000
file_pieces = np.array_split(all_files,npart)
def read_and_combine(filenames,group_name):
dfs = [pd.read_parquet(f) for f in filenames]
grouped_df = pd.concat(dfs)
grouped_df.to_parquet(f,engine='pyarrow')
group_names = [f'group{i} for i in range(npart)]
delayed_func = dask.delayed(read_and_combine)
# the following line shouldn't have resulted in a memory error,but it does
dask.compute(map(delayed_func,file_pieces,group_names))
我在这里遗漏了什么明显的东西吗? 谢谢!
Dask 版本:2021.01.0,pyarrow 版本:2.0.0,分布式版本:2021.01.0
解决方法
存在一些语法错误,但总体而言工作流程似乎合理。
with open('ts_files_list.txt','r') as f:
all_files = f.readlines()
all_files = [f.strip() for f in all_files]
npart = 10000
file_pieces = np.array_split(all_files,npart)
def read_and_combine(filenames,group_name):
grouped_df = pd.concat(pd.read_parquet(f) for f in filenames)
grouped_df.to_parquet(group_name,engine='pyarrow')
del grouped_df # this is optional (in principle dask should clean up these objects)
group_names = [f'group{i}' for i in range(npart)]
delayed_func = dask.delayed(read_and_combine)
dask.compute(map(delayed_func,file_pieces,group_names))
要记住的另一件事是 parquet
文件默认是压缩的,因此在解压缩时它们可能会占用比压缩文件大小更多的内存。不确定这是否适用于您的工作流程,但在遇到内存问题时需要记住一些事情。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。