如何解决仅更新已更改的行pyspark增量表数据砖
与创建的数据框相比,仅需要更新现有表中已更改的行。因此,NowNow,我会减去并获取更改后的行,但不确定如何合并到现有表中。
old_df = spark.sql("select * from existing table")
diff = new_df.subtract(old_df)
现在必须插入diff数据框(如果有新行)或更新现有记录
(deltaTable.alias("full_df").merge(
merge_df.alias("append_df"),"full_df.col1 = append_df.col1 OR full_df.col2 =append_df.col2")
.whenNotMatchedInsertAll()
.execute()
)
这不会更新现有记录(案例:col2值已更改; col1未更改)
解决方法
.whenMatchedUpdateAll()
接受一个条件,可以用来保留不变的行:
(deltaTable.alias("full_df").merge(
merge_df.alias("append_df"),"full_df.col1 = append_df.col1 OR full_df.col2 = append_df.col2")
.whenNotMatchedInsertAll()
.whenMatchedUpdateAll("full_df.col1 != append_df.col1 OR full_df.col2 != append_df.col2")
.execute()
)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。