如何解决Oracle MERGE 重写为 PySpark如果为空 - 更新,否则 - 插入
在 Oracle sql 中,我可以这样做:
MERGE INTO destination d
USING new_data n
ON (d.c1 = n.c1 AND d.c2 = n.c2)
WHEN MATCHED THEN
UPDATE SET d.d1 = n.d1
WHERE d.d1 IS NULL
WHEN NOT MATCHED THEN
INSERT (c1,c2,d1)
VALUES (n.c1,n.c2,n.d1);
如果 c1
、c2
存在于 destination
中且 d1
为空,则 d1
被更新。
如果 c1
、c2
不存在,则插入行。
有没有办法在 PySpark 中做同样的事情?
这会生成数据帧:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getorCreate()
dCols = ['c1','c2','d1']
dData = [('a','b',5),('c','d',None)]
destination = spark.createDataFrame(dData,dCols)
nData = [('a',1),6),('e','f',7),('g','h',None)]
new_data = spark.createDataFrame(nData,dCols)
在 PySpark 中,几乎包含 sql 中的所有内容。但我没有找到 MERGE
的等效项。
解决方法
您可以使用 coalesce
import pyspark.sql.functions as F
result = new_data.alias('t1').join(
destination.alias('t2'),['c1','c2'],'full'
).select('c1','c2',F.coalesce('t2.d1','t1.d1').alias('d1'))
result.show()
+---+---+----+
| c1| c2| d1|
+---+---+----+
| e| f| 7|
| g| h|null|
| c| d| 6|
| a| b| 5|
+---+---+----+
,
在 SQL 中,MERGE
可以替换为 left join union right join full outer join:
merged = destination.alias("dest").join(new_data.alias("src"),["c1","c2"],"full") \
.selectExpr("c1","c2","coalesce(dest.d1,src.d1) as d1")
merged.show()
#+---+---+----+
#| c1| c2| d1|
#+---+---+----+
#| e| f| 7|
#| g| h|null|
#| c| d| 6|
#| a| b| 5|
#+---+---+----+
但是,每次执行此合并时,您都需要将所有数据重写到目标中,因为 Spark 不支持更新并且可能导致性能不佳。所以如果你真的需要这样做,我建议你看看 Delta Lake ,它带来了 ACID 事务触发,并支持 MERGE 语法。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。