微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

合并两个Spark数据框并添加新列以标识最新日期

如何解决合并两个Spark数据框并添加新列以标识最新日期

我有两个数据框

一个数据帧具有value列的值(每个ID唯一)

id   date          value  some_other_columns...
1    2020-10-01    'a'    
2    2020-09-30    'b'    
2    2020-10-01    'b'
3    2020-10-01    'c'

第二个数据帧的value列为空值

id   date          value  some_other_columns...
1    2020-10-02    NULL
2    2020-10-02    NULL
4    2020-10-02    NULL
5    2020-10-02    NULL
6    2020-10-02    NULL

我想合并这两个数据框并创建一个新列is_active以确定按ID分组的最新日期(但保留其他列),如果value从第二个数据框分配id从第一个数据帧开始存在

id   date          value  some_other_columns... is_active
1    2020-10-01    'a'                          0
1    2020-10-02    'a'                          1
2    2020-09-30    'b'                          0
2    2020-10-01    'b'                          0
2    2020-10-02    'b'                          1
3    2020-10-01    'c'                          1
4    2020-10-02    NULL                         1
5    2020-10-02    NULL                         1
6    2020-10-02    NULL                         1

解决方法

假设您的两个数据帧分别为df_1df_2。为了从valuesdf_2分配df_1,您可以执行left join

from pyspark.sql.functions import *
df_1_ = df_1.select("id","value").withColumnRenamed("id","id_1")

df_2 = df_2.drop("value").join(df_1_,(df_2.id == df_1_.id_1),"left")\
            .drop("id_1").distinct()

df_2.show()
+---+----------+-----+                                                          
| id|      date|value|
+---+----------+-----+
|  1|2020-10-02|    a|
|  2|2020-10-02|    b|
|  4|2020-10-02| null|
|  5|2020-10-02| null|
|  6|2020-10-02| null|
+---+----------+-----+

现在要获取is_active列,您可以合并然后使用窗口函数(row_number()rank(),具体取决于您的需要):

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w = Window.partitionBy('id').orderBy(desc("date"))

df_1.union(df_2).withColumn("is_active",F.when(row_number().over(w)==1,1)\
        .otherwise(0)).orderBy("id","date").show()
+---+----------+-----+---------+                                                
| id|      date|value|is_active|
+---+----------+-----+---------+
|  1|2020-10-01|    a|        0|
|  1|2020-10-02|    a|        1|
|  2|2020-09-30|    b|        0|
|  2|2020-10-01|    b|        0|
|  2|2020-10-02|    b|        1|
|  3|2020-10-01|    c|        1|
|  4|2020-10-02| null|        1|
|  5|2020-10-02| null|        1|
|  6|2020-10-02| null|        1|
+---+----------+-----+---------+

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?