如何解决根据其他列值分配ID
是否可以根据状态列的值分配操作ID?目的是为每个起始序列分配增量ID。例如:在下表中,从2020-09-15 22:49开始的初始操作获得的操作ID = 1,直到操作结束的所有行也将获得ID 1。 每个开始/结束状态以及开始和结束之间的所有“ on”状态都将具有相同的ID。
Timestamp |state | operation id
----------------------------------------
2020-09-15 22:53 start 1
2020-09-16 22:53 on 1
2020-09-17 22:53 on 1
2020-09-18 22:53 on 1
2020-09-19 22:53 end 1
2020-09-20 22:53 off null
2020-09-21 22:53 off null
2020-09-22 22:53 off null
2020-09-23 22:53 start 2
2020-09-24 22:53 on 2
2020-09-25 22:53 end 2
2020-09-26 22:53 start 3
2020-09-27 22:53 end 3
时间戳和状态列可用。目的是建立操作id列。
解决方法
您可以使用按“时间戳记”排序的Window
函数。由于您希望当{state}为'off'时operation_id
始终为null
,因此我将过滤state'off'行并将其作为单独的数据帧。我们将'start'分配为1
,将'on'分配为0
,将'end'分配为2
首先,在窗口上方此新分配的“状态”列上获得一个incremental sum
。对应于“结束”状态的incremental sum
始终是3的倍数。这也将是您的“顺序结束”
要获得所需的内容,必须在lag
列上使用incremental sum
函数,然后用滞后值替换3的倍数。最后一步是除以3,将其转换为整数并加1。
现在将df_not_off
和df_off
合并为最终输出
您的数据框:
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *
schema = StructType([StructField("Timestamp",IntegerType()),StructField("state",StringType())])
data = [[1,'start'],[2,'on'],[3,[4,[5,'end'],[6,'off'],[7,\
[8,[9,[10,[11,[12,[13,'end']]
df = spark.createDataFrame(data,schema=schema)
df.show()
操作:
df_off = df.filter(col("state")=="off")
df_not_off = df.filter(col("state")!="off")
df_not_off = df_not_off.withColumn("state_num",F.when(col("state")=="start",1).when(col("state")=="on",0).otherwise(2))
w=Window().orderBy("Timestamp")
df_not_off = df_not_off.withColumn("incremental_sum",F.sum("state_num").over(w))\
.withColumn("lag",F.lag("incremental_sum").over(w))\
.withColumn("incremental_sum",F.when(F.col("incremental_sum")%3==0,F.col("lag")).otherwise(F.col("incremental_sum")))\
.withColumn("incremental_sum",((F.col("incremental_sum")/3).cast('integer'))+1)\
.withColumnRenamed("incremental_sum","operation_id")\
.drop("state_num","lag")
df_off = df_off.withColumn("operation_id",F.lit(None))
final_df = df_not_off.union(df_off)
final_df.orderBy("Timestamp").show()
输出:
+---------+-----+------------+
|Timestamp|state|operation_id|
+---------+-----+------------+
| 1|start| 1|
| 2| on| 1|
| 3| on| 1|
| 4| on| 1|
| 5| end| 1|
| 6| off| null|
| 7| off| null|
| 8| off| null|
| 9|start| 2|
| 10| on| 2|
| 11| end| 2|
| 12|start| 3|
| 13| end| 3|
+---------+-----+------------+
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。