微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用不等列的分区从 S3 读取数据

如何解决使用不等列的分区从 S3 读取数据

我在 S3 中有一些分区数据,每个分区都有不同数量的列,如下所示。当我在 pyspark 和 tru 中读取数据以打印模式时,我只能读取所有分区中通常存在的列,但不是全部。读取所有列并重命名几列的最佳方法是什么。

aws s3 ls s3://my-bkt/test_data/
            PRE occ_dt=20210426/
            PRE occ_dt=20210428/
            PRE occ_dt=20210429/
            PRE occ_dt=20210430/
            PRE occ_dt=20210503/
            PRE occ_dt=20210504/
            

spark.read.parquet("aws s3 ls s3://my-bkt/test_data/").printSchema()
 |-- map_api__450jshb457: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- map_api_592yd749dn: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- map_api_has_join: string (nullable = true)


# When I read partition 20210504
spark.read.parquet("aws s3 ls s3://my-bkt/test_data/occ_dt=20210504/").printSchema()
 |-- map_api__450jshb457: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- map_api_592yd749dn: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- map_api_has_join: string (nullable = true)
 |-- cust_activity: string (nullable = true)
 |-- map_api__592rtddvid: string (nullable = true)



# When I read partition 20210503
spark.read.parquet("aws s3 ls s3://my-bkt/test_data/occ_dt=20210503/").printSchema()
 |-- map_api__450jshb457: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- map_api_592yd749dn: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- map_api_4js3nnju8572d93: string (nullable = true)
 |-- map_api_58943h64u47v: string (nullable = true)
 |-- map_api__58943h6220dh: string (nullable = true)
 

如上所示,分区20210503 & 20210504中的字段比其他分区多。当我读取 s3 存储桶以获取架构时,仅显示所有分区中通用的字段。 当我读取 s3 loc 时,我希望获得如下预期结果,并返回所有字段。

Expected Output : 
spark.read.parquet("aws s3 ls s3://my-bkt/test_data/").printSchema()
|-- map_api__450jshb457: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- first_name: string (nullable = true)
|-- map_api_592yd749dn: string (nullable = true)
|-- last_name: string (nullable = true)
|-- map_api_has_join: string (nullable = true)
|-- map_api_4js3nnju8572d93: string (nullable = true)
|-- map_api_58943h64u47v: string (nullable = true)
|-- map_api__58943h6220dh: string (nullable = true)
|-- cust_activity: string (nullable = true)
|-- map_api__592rtddvid: string (nullable = true)

提前致谢!!

解决方法

在选项中添加了mergeSchema。

spark.read.option("mergeSchema","true").parquet("aws s3 ls s3://my-bkt/test_data/").printSchema()
|-- map_api__450jshb457: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- first_name: string (nullable = true)
|-- map_api_592yd749dn: string (nullable = true)
|-- last_name: string (nullable = true)
|-- map_api_has_join: string (nullable = true)
|-- map_api_4js3nnju8572d93: string (nullable = true)
|-- map_api_58943h64u47v: string (nullable = true)
|-- map_api__58943h6220dh: string (nullable = true)
|-- cust_activity: string (nullable = true)
|-- map_api__592rtddvid: string (nullable = true)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。