微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

将一个大的 tar.gz 文件分解成多个较小的 tar.gz 文件

如何解决将一个大的 tar.gz 文件分解成多个较小的 tar.gz 文件

在 spark 中处理大于 1gb 的 tar.gz 文件时出现 OutOfMemoryError

为了克服这个错误,我尝试使用“split”命令将 tar.gz 拆分为多个部分,结果发现每个拆分本身都不是 tar.gz,因此无法进行处理。

dir=/dbfs/mnt/data/temp
b=524288000
for file in /dbfs/mnt/data/*.tar.gz; 
do 
a=$(stat -c%s "$file");
if [[ "$a" -gt "$b" ]] ; then 
split -b 500M -d --additional-suffix=.tar.gz $file "${file%%.*}_part"
mv $file $dir
fi
done

尝试处理拆分文件时出错

Caused by: java.io.EOFException
    at org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream.read(GzipCompressorInputStream.java:281)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    at org.apache.commons.compress.archivers.tar.TararchiveInputStream.read(TararchiveInputStream.java:590)
    at org.apache.commons.io.input.ProxyInputStream.read(ProxyInputStream.java:98)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.Reader.read(Reader.java:140)
    at org.apache.commons.io.IoUtils.copyLarge(IoUtils.java:2001)
    at org.apache.commons.io.IoUtils.copyLarge(IoUtils.java:1980)
    at org.apache.commons.io.IoUtils.copy(IoUtils.java:1957)
    at org.apache.commons.io.IoUtils.copy(IoUtils.java:1907)
    at org.apache.commons.io.IoUtils.toString(IoUtils.java:778)
    at org.apache.commons.io.IoUtils.toString(IoUtils.java:803)
    at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$$anonfun$apply$1.apply(command-2152765781429277:33)
    at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$$anonfun$apply$1.apply(command-2152765781429277:31)
    at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
    at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
    at scala.collection.immutable.Stream.foreach(Stream.scala:595)
    at scala.collection.TraversableOnce$class.toMap(TraversableOnce.scala:316)
    at scala.collection.AbstractTraversable.toMap(Traversable.scala:104)
    at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$.apply(command-2152765781429277:34)
    at linea3796c25fa964697ba042965141ff28827.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-2152765781429278:3)
    at linea3796c25fa964697ba042965141ff28827.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-2152765781429278:3)

我有 tar.gz 文件,最大大小为 4gb,每个文件最多可包含 7000 个 json 文档,其大小从 1mb 到 50mb 不等。

如果我想将较大的 tar.gz 文件分成较小的 tar.gz 文件,这是我解压缩的唯一选择,然后根据文件大小或文件数重新压缩? - “是这样吗?”

解决方法

普通的 gzip 文件不可拆分。 GZip Tar 档案更难处理。 Spark 可以处理 gzip 压缩的 json 文件,但不能处理 gzipped tar 文件和 tar 文件。 Spark 可以处理每个最大约 2GB 的二进制文件。 Spark 可以处理连接在一起的 JSON

我建议使用 Pandas UDF 或 .pipe() 运算符来处理每个 tar gzipped 文件(每个工作人员一个)。每个工作人员都会以流式方式解压缩、解压和处理每个 JSON 文档,永远不会填满内存。希望你有足够的源文件来并行运行它并看到加速。

您可能想探索流式传输方法,以增量方式将压缩的 JSON 文件传送到 ADLS Gen 2 / S3,并使用 Databricks 自动加载器功能在文件到​​达时立即加载和处理。

这个问题的答案也How to load tar.gz files in streaming datasets?似乎很有希望。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。