微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

根据下次在数据帧pyspark中出现特定值来更新行

如何解决根据下次在数据帧pyspark中出现特定值来更新行

如果我有这样的数据框

    data = [(("ID1","ENGAGEMENT",2019-03-03)),(("ID1","BABY SHOWER",2019-04-13)),"WEDDING",2019-07-10)),"DIVORCE",2019-09-26))]
    df = spark.createDataFrame(data,["ID","Event","start_date"])
    df.show()
    
    +---+-----------+----------+
    | ID|      Event|start_date|
    +---+-----------+----------+
    |ID1| ENGAGEMENT|2019-03-03|
    |ID1|BABY SHOWER|2019-04-13|
    |ID1|    WEDDING|2019-07-10|
    |ID1|    DIVORCE|2019-09-26|
    +---+-----------+----------+

必须从此数据框中根据后续事件的开始日期推断事件的结束日期

例如:如果您有一个订婚,那么订婚将在婚礼结束时结束,因此您可以将婚礼的开始日期作为订婚的结束日期。

因此,上面的数据框应该获得此输出

+---+-----------+----------+----------+
| ID|      Event|start_date|  end_date|
+---+-----------+----------+----------+
|ID1| ENGAGEMENT|2019-03-03|2019-07-10|
|ID1|BABY SHOWER|2019-04-13|2019-04-13|
|ID1|    WEDDING|2019-07-10|2019-09-26|
|ID1|    DIVORCE|2019-09-26|      NULL|
+---+-----------+----------+----------+

我最初是在由ID分隔的窗口上使用Lead函数来尝试此操作,以使行排在最前面,但由于可能在20行之后出现“ Wedding”事件,因此行不通,这是一种非常麻烦的方式去做。

df = df.select("*",*([f.lead(f.col(c),default=None).over(Window.orderBy("ID")).alias("LEAD_"+c) 
                      for c in ["Event","start_date"]]))

activity_dates = activity_dates.select("*",default=None).over(Window.orderBy("ID")).alias("LEAD_"+c) 
                      for c in ["LEAD_Event","LEAD_start_date"]]))


df = df.withColumn("end_date",f.when((col("Event") == "ENGAGEMENT") & (col("LEAD_Event") == "WEDDING"),col("LEAD_start_date"))
                                .when((col("Event") == "ENGAGEMENT") & (col("LEAD_LEAD_Event") == "WEDDING"),col("LEAD_LEAD_start_date"))

如何在不遍历数据集的情况下实现这一目标?

解决方法

这是我的尝试。

Integer

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。