如何解决使用pyspark跟踪和查找数据帧中的最新值
我是 Pyspark 的新手,我遇到了以下需要解决的案例。有人可以请帮助并解决它。我检查并尝试在谷歌和堆栈溢出中找到类似的问题。但不幸的是我没有得到它。
问题:
我有包含两列的数据框,一列已过时,另一列是替换列。
数据帧:
在上面的数据框中,绝对值在替换列中得到更新。例如:这里 10 变为 12,在下一行 12 变为 14,第 3 行再次第 14 个值变为 16。如果您看到值在下一行更新
所以前三个值成为一组,因为链值正在更新。以红色突出显示,因此对于那些过时值,替换中的最后一个值是最新值 16。对于其他两行,19 是连通性值并以黄色突出显示,因此这两行的最新值是 20 是最新值。
预期产出
我尝试在 pyspark 中使用 map 和 foreach,但没有得到想要的结果。一些请帮助我如何解决这个问题。
解决方法
首先,您必须发现所有链并为它们创建一个组。分组后,您可以应用f.last()
函数返回所需的值。
from pyspark.sql import Row
from pyspark.sql.window import Window
import pyspark.sql.functions as f
df = spark.createDataFrame([
(10,12),(12,14),(14,16),(18,19),(19,20),(22,24),(24,25),(25,27),(29,30)
],('obsolute','replace'))
w = Window.orderBy('obsolute')
df = (df
.withColumn('chain',f.coalesce(f.lag('replace').over(w) == f.col('obsolute'),f.lit(True)))
.withColumn('group',f.sum((f.col('chain') == f.lit(False)).cast('Int')).over(w)))
# +--------+-------+-----+-----+
# |obsolute|replace|chain|group|
# +--------+-------+-----+-----+
# |10 |12 |true |0 |
# |12 |14 |true |0 |
# |14 |16 |true |0 |
# |18 |19 |false|1 |
# |19 |20 |true |1 |
# |22 |24 |false|2 |
# |24 |25 |true |2 |
# |25 |27 |true |2 |
# |29 |30 |false|3 |
# +--------+-------+-----+-----+
w = Window.partitionBy('group')
df = df.select('obsolute','replace',f.last('replace').over(w).alias('latest'))
df.show(truncate=False)
输出
+--------+-------+------+
|obsolute|replace|latest|
+--------+-------+------+
|10 |12 |16 |
|12 |14 |16 |
|14 |16 |16 |
|18 |19 |20 |
|19 |20 |20 |
|22 |24 |27 |
|24 |25 |27 |
|25 |27 |27 |
|29 |30 |30 |
+--------+-------+------+
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。