如何解决合并两个Spark数据框并添加新列以标识最新日期
说我有两个数据框
第一个数据帧具有value
列的值(每个ID唯一)
id date value some_other_columns...
1 2020-10-01 'a'
2 2020-09-30 'b'
2 2020-10-01 'b'
3 2020-10-01 'c'
第二个数据帧的value
列为空值
id date value some_other_columns...
1 2020-10-02 NULL
2 2020-10-02 NULL
4 2020-10-02 NULL
5 2020-10-02 NULL
6 2020-10-02 NULL
我想合并这两个数据框并创建一个新列is_active
以确定按ID分组的最新日期(但保留其他列),如果value
从第二个数据框分配id
从第一个数据帧开始存在
id date value some_other_columns... is_active
1 2020-10-01 'a' 0
1 2020-10-02 'a' 1
2 2020-09-30 'b' 0
2 2020-10-01 'b' 0
2 2020-10-02 'b' 1
3 2020-10-01 'c' 1
4 2020-10-02 NULL 1
5 2020-10-02 NULL 1
6 2020-10-02 NULL 1
解决方法
假设您的两个数据帧分别为df_1
和df_2
。为了从values
为df_2
分配df_1
,您可以执行left join
。
from pyspark.sql.functions import *
df_1_ = df_1.select("id","value").withColumnRenamed("id","id_1")
df_2 = df_2.drop("value").join(df_1_,(df_2.id == df_1_.id_1),"left")\
.drop("id_1").distinct()
df_2.show()
+---+----------+-----+
| id| date|value|
+---+----------+-----+
| 1|2020-10-02| a|
| 2|2020-10-02| b|
| 4|2020-10-02| null|
| 5|2020-10-02| null|
| 6|2020-10-02| null|
+---+----------+-----+
现在要获取is_active
列,您可以合并然后使用窗口函数(row_number()
或rank()
,具体取决于您的需要):
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window.partitionBy('id').orderBy(desc("date"))
df_1.union(df_2).withColumn("is_active",F.when(row_number().over(w)==1,1)\
.otherwise(0)).orderBy("id","date").show()
+---+----------+-----+---------+
| id| date|value|is_active|
+---+----------+-----+---------+
| 1|2020-10-01| a| 0|
| 1|2020-10-02| a| 1|
| 2|2020-09-30| b| 0|
| 2|2020-10-01| b| 0|
| 2|2020-10-02| b| 1|
| 3|2020-10-01| c| 1|
| 4|2020-10-02| null| 1|
| 5|2020-10-02| null| 1|
| 6|2020-10-02| null| 1|
+---+----------+-----+---------+
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。