微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如果列值取决于文件路径,是否可以在一次读取多个文件时将文字作为列添加到spark数据框?

如何解决如果列值取决于文件路径,是否可以在一次读取多个文件时将文字作为列添加到spark数据框?

我正在尝试将许多avro文件读入spark数据帧。它们都共享相同的s3文件路径前缀,因此最初我运行的是类似的东西:

path = "s3a://bucketname/data-files"
df = spark.read.format("avro").load(path)

已成功识别所有文件

单个文件如下:

"s3a://bucketname/data-files/timestamp=20201007123000/id=update_account/0324345431234.avro"

在尝试操作数据时,代码不断出错,并发出一条消息,指出其中一个文件不是Avro数据文件。收到的实际错误消息是:org.apache.spark.SparkException: Job aborted due to stage failure: Task 62476 in stage 44102.0 Failed 4 times,most recent failure: Lost task 62476.3 in stage 44102.0 (TID 267428,10.96.134.227,executor 9): java.io.IOException: Not an Avro data file

为避免该问题,我能够获得我感兴趣的avro文件的显式文件路径。将它们放入列表(file_list)后,我能够成功运行spark.read.format("avro").load(file_list)

现在的问题是-我有兴趣向数据框添加文件路径中一部分的字段(例如,上面示例中的时间戳和ID)。

仅使用存储桶和前缀文件路径查找文件时(方法1),这些字段会自动附加到结果数据帧中。有了显式的文件路径,我没有获得那个优势。

我想知道在使用spark读取文件时是否可以包含这些列。

依次处理文件的外观如下:

for file in file_list:
    df = spark.read.format("avro").load(file)
    id,timestamp = parse_filename(file)
    df = df.withColumn("id",lit(id))\
         .withColumn("timestamp",lit(timestamp))

但是有超过50万个文件,这将需要一个永恒的时间。

我是Spark的新手,非常感谢您的帮助!

解决方法

为什么不尝试首先使用Wholetextfiles方法读取文件,并在开始时将路径名添加到数据本身中。然后,您可以从数据中过滤出文件名,并在创建数据框时将其添加为列。我同意这是一个两步过程。但它应该工作。要获取文件的时间戳,您将需要js不可序列化的文件系统对象,即该对象不能用于sparks并行化操作中,因此,您将必须使用文件和时间戳创建本地集合,并以某种方式将其与您使用Wholetextfiles创建的RDD联接在一起。

,

这里要解决的两件事:

指定文件

Spark具有内置处理功能,可以读取给定路径中特定类型的所有文件。正如@Sri_Karthik所建议的,尝试提供类似"s3a://bucketname/data-files/*.avro"的路径(如果不起作用,请尝试"s3a://bucketname/data-files/**/*.avro" ...我不记得确切的模式匹配语法spark使用的路径),应该抓住仅所有avro文件,并摆脱那些在这些路径中看到非avro文件的错误。在我看来,这比手动获取文件路径并显式指定它们更为优雅。

顺便说一句,您看到这种情况的原因很可能是因为文件夹通常标有.SUCCESS.COMPLETED之类的元数据文件,以表明它们已准备好使用。

从文件路径中提取元数据

如果您检出this stackoverflow question,它将显示如何将文件名添加为新列(scala和pyspark都适用)。然后,您可以使用regexp_extract函数从该文件名字符串中解析出所需的元素。我从没在火花中使用scala,因此无法在其中为您提供帮助,但是它应该类似于pyspark version

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。