微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用pyspark跟踪和查找数据帧中的最新值

如何解决使用pyspark跟踪和查找数据帧中的最新值

我是 Pyspark 的新手,我遇到了以下需要解决的案例。有人可以请帮助并解决它。我检查并尝试在谷歌和堆栈溢出中找到类似的问题。但不幸的是我没有得到它。

问题:

我有包含两列的数据框,一列已过时,另一列是替换列。

数据帧:

DataSet

在上面的数据框中,绝对值在替换列中得到更新。例如:这里 10 变为 12,在下一行 12 变为 14,第 3 行再次第 14 个值变为 16。如果您看到值在下一行更新

所以前三个值成为一组,因为链值正在更新。以红色突出显示,因此对于那些过时值,替换中的最后一个值是最新值 16。对于其他两行,19 是连通性值并以黄色突出显示,因此这两行的最新值是 20 是最新值。

预期产出

dataset- expected output

我尝试在 pyspark 中使用 map 和 foreach,但没有得到想要的结果。一些请帮助我如何解决这个问题。

解决方法

首先,您必须发现所有链并为它们创建一个组。分组后,您可以应用f.last()函数返回所需的值。

from pyspark.sql import Row
from pyspark.sql.window import Window
import pyspark.sql.functions as f


df = spark.createDataFrame([
  (10,12),(12,14),(14,16),(18,19),(19,20),(22,24),(24,25),(25,27),(29,30)
],('obsolute','replace'))

w = Window.orderBy('obsolute')
df = (df
      .withColumn('chain',f.coalesce(f.lag('replace').over(w) == f.col('obsolute'),f.lit(True)))
      .withColumn('group',f.sum((f.col('chain') == f.lit(False)).cast('Int')).over(w)))
# +--------+-------+-----+-----+
# |obsolute|replace|chain|group|
# +--------+-------+-----+-----+
# |10      |12     |true |0    |
# |12      |14     |true |0    |
# |14      |16     |true |0    |
# |18      |19     |false|1    |
# |19      |20     |true |1    |
# |22      |24     |false|2    |
# |24      |25     |true |2    |
# |25      |27     |true |2    |
# |29      |30     |false|3    |
# +--------+-------+-----+-----+


w = Window.partitionBy('group')
df = df.select('obsolute','replace',f.last('replace').over(w).alias('latest'))

df.show(truncate=False)

输出

+--------+-------+------+
|obsolute|replace|latest|
+--------+-------+------+
|10      |12     |16    |
|12      |14     |16    |
|14      |16     |16    |
|18      |19     |20    |
|19      |20     |20    |
|22      |24     |27    |
|24      |25     |27    |
|25      |27     |27    |
|29      |30     |30    |
+--------+-------+------+

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。