文章目录
项目场景
我们现在有这样一个表,需要按月来记录用户的状态,当前月的状态数据是每天都要更新的,历史月的状态数据导入到表之后就不再更新了。
那么这个业务场景就转换成了“如何向一个已存在的分区表写入并覆盖当前月份的状态数据,而又保留历史月份数据”的问题。
问题描述
Spark中向分区表写数据的时候,如果写入模式为“overwrite”,那会将整个表覆盖掉;如果写入模式为“append”,那么我们当前一个月的数据每天都会追加到当前月的分区内,那就会造成数据重复。所以不能直接使用append或者overwrite模式,那么该如何只覆盖当前月分区内的数据呢?
从Spark 2.3.0版本之后,Spark提供了一个参数:
spark.sql.sources.partitionOverwriteMode=STATIC //此参数默认为STATIC
官网对这个参数的解释:
当我们使用INSERT OVERWRITE语句向一个分区表插入数据时,支持两种模式:static和dynamic。在static模式下,Spark在覆盖写之前,会删除所有符合条件的分区,例如,分区表中有一个"2021-01"的分区,当使用INSERT OVERWRITE语句向表中写入"2021-02"这个分区的数据的时候,会把"2021-01"分区也覆盖掉。在dynamic模式下,Spark不会提前删除分区,而是在运行时覆盖那些有数据写入的分区。
解决方案
我们知道spark.sql.sources.partitionOverwriteMode参数的含义之后,就可以使用它的dynamic模式,在不影响其他分区数据的情况下,覆盖我们指定的分区的数据。
首先,我们要创建好分区表,并把历史数据导进去。例如,在Spark中可以这样写:
df.
.withColumn("month", lit("2021-01"))
.coalesce(1)
.write.mode("append")
.partitionBy("month")
.format("parquet").saveAsTable("db.table")
之后,我们在覆盖当前月分区数据的时候,按照下面两步来编写代码:
//第一步
val spark = SparkSession
.builder()
.master("yarn")
.appName("demo")
.config("spark.sql.sources.partitionOverwriteMode", "DYNAMIC") //初始化SparkSession时配置此参数
.enableHiveSupport()
.getorCreate()
...
//第二步
val currentMonth = LocalDate.Now().toString.substring(0, 7)
df
.withColumn("month", lit(currentMonth))
.coalesce(1)
.write.mode("overwrite")
//.partitionBy("month") //注意:如果要写入数据的表已经是分区表了,insert语句是不允许再指定partitionBy()的
.format("parquet").insertInto("db.table") //注意:这里是insertInto()
参考
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。