如何解决如何使用 dask 拆分大型 .csv 文件?
我正在尝试使用 dask
将一个巨大的制表符分隔文件拆分为包含 100,000 个内核的 AWS Batch 阵列上的较小块。
在 AWS Batch 中,每个核心都有一个唯一的环境变量 AWS_BATCH_JOB_ARRAY_INDEX
,范围从 0 到 99,999(复制到以下代码段中的 idx
变量中)。因此,我尝试使用以下代码:
import os
import dask.dataframe as dd
idx = int(os.environ["AWS_BATCH_JOB_ARRAY_INDEX"])
df = dd.read_csv(f"s3://main-bucket/workdir/huge_file.tsv",sep='\t')
df = df.repartition(npartitions=100_000)
df = df.partitions[idx]
df = df.persist() # this call isn't needed before calling to df.to_csv (see comment by Sultan)
df = df.compute() # this call isn't needed before calling to df.to_csv (see comment by Sultan)
df.to_csv(f"/tmp/split_{idx}.tsv",sep="\t",index=False)
print(idx,df.shape,df.head(5))
在调用 presist
之前我需要调用 compute
和/或 df.to_csv
吗?
解决方法
当我必须将一个大文件拆分成多个小文件时,我只需运行以下代码即可。
读取并重新分区
import dask.dataframe as dd
df = dd.read_csv("file.csv")
df = df.repartition(npartitions=100)
保存到 csv
o = df.to_csv("out_csv/part_*.csv",index=False)
保存到镶木地板
o = df.to_parquet("out_parquet/")
如果您想避免元数据,可以在此处使用 write_metadata_file=False
。
一些注意事项:
- 我不认为您真的需要持久化和计算,因为您可以直接保存到磁盘。当您遇到内存错误等问题时,保存到磁盘比计算更安全。
- 我发现在编写时使用 parquet 格式至少比 csv 快 3 倍。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。