微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 pyspark 结构化流计算移动平均列

如何解决使用 pyspark 结构化流计算移动平均列

我正在使用 pyspark 处理一些传入的流数据,并且我想向我的数据框中添加一个具有 50 秒移动平均值的新列。

我尝试使用具有 rangeBetween 的 Window 规范:

import pyspark.sql.window as W

w = (W.Window()
     .partitionBy(col("sender"))
     .orderBy(F.col("event_time").cast('long'))
     .rangeBetween(-50,0))
df2 = df.withColumn('rolling_average',F.avg("fr").over(w))

但这给了我一个错误,因为结构化流需要一个基于时间的窗口(可能是为了管理状态):

AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets

使用 sql.window 函数,我还可以计算移动平均线,但这将通过在使用翻滚(或跳跃)窗口的窗口(和称为发送方的唯一 ID 键)上进行分组来为我提供结果:>

df.select('sender','event_time','fr').groupBy("sender",window("event_time","50 second")).avg().alias('avg_fr')
发件人 窗口 平均(fr)
59834cfd-6cb2-4ece-8353-0a9b20389656 {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} 0.17443667352199554
8b5d90b9-65d9-4dd2-b742-31c4f0ce37d6 {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} 0.010564474388957024
a74204f3-e25d-4737-a302-9206cd69e90a {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} 0.16375258564949036
db16426d-a9ba-449b-9777-3bdfadf0e0d9 {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} 0.17516431212425232

滚动窗口显然不是我想要的,我需要以某种方式再次将其加入到原始表中。 我不确定如何根据传入的不规则事件时间戳定义滑动窗口。

现在我想写一个有状态的函数,将一组先前接收到的记录存储到一个状态中,并为每个新数据点更新它。但是对于我期望的这样一个常见的活动来说,这似乎非常复杂以更简单的方式完成。

编辑:当前版本的 Spark (3.1.1) 只允许在 Java 或 Scala 中构建任意有状态函数,而不是在 python 中,以保护到 JVM 的转换。

如果这实际上是正确的方法,您有什么想法吗?

解决方法

您收到异常是因为看起来您正在构建用于批处理的窗口,而不是流式数据帧。

Window Operations on Event-Time 部分的结构化流编程指南中,给出了一个可以应用于您的用例的示例:

streamDf = ...  # streaming DataFrame of schema { event_time: Timestamp,sender: String,fr: Integer }

# Group the data by window and sender and compute the average of each group
movingAverageDf = streamDf.groupBy(
    window(streamDf.event_time,"50 seconds","5 seconds"),streamDf.sender
).avg(streamDf.fr)

请记住,如果不使用水印,应用程序的内部状态将无限增长。因此,建议还添加水印。确保在水印中使用与在窗口中相同的事件时间。

关于流式查询的 outputModes 的另一个说明:查看 OutputModes 中的概述以了解您的流式查询支持哪些模式。

,

根据迈克的要求,一个最小的可重现示例。这是一个非流媒体案例:

from pyspark.sql.functions import *
from pyspark.sql.types import *
import math
import datetime

rawData = [(1,"A","2021-04-15T14:31:45.000",1,4.0),(2,"2021-04-15T14:32:46.000",3,5.0),(3,"B","2021-04-15T14:32:16.000",8,100.0),(4,"2021-04-15T14:33:16.000",10,200.0),(5,2,-3.0),(6,"2021-04-15T14:32:47.000",11,-500.0),(7,"2021-04-15T14:33:17.000",2.0)]

df= spark.createDataFrame(rawData).toDF("index",\
                                         "sender",\
                                         "event_time",\
                                         "value1",\
                                         "value2")

df=df.select(df['event_time'].astype('Timestamp').alias('ts'),"sender","value1","value2",)


print(df.schema)
display(df)


| ts                           | sender | value1 | value2 | 
|------------------------------|--------|--------|--------| 
| 2021-04-15T14:31:45.000+0000 | A      | 1      | 4      | 
| 2021-04-15T14:32:46.000+0000 | A      | 3      | 5      | 
| 2021-04-15T14:32:16.000+0000 | B      | 8      | 100    | 
| 2021-04-15T14:33:16.000+0000 | B      | 10     | 200    | 
| 2021-04-15T14:32:16.000+0000 | A      | 2      | -3     | 
| 2021-04-15T14:32:47.000+0000 | B      | 11     | -500   | 
| 2021-04-15T14:33:17.000+0000 | A      | 0      | 2      | 

要使用移动平均值向该表添加新列,我首先将作为新列的时间戳截断为 10 秒分辨率,以防止每秒生成一个窗口,这似乎非常低效。使用 2 分钟的水印去除迟到的数据。

@udf(returnType=TimestampType())
def round_time(dt=None,round_to=10):
  if dt.second%round_to==0:
    s=dt.second
  else:
    s=(math.floor(dt.second/round_to)+1)*round_to
  y=dt+datetime.timedelta(seconds=s-dt.second)
  return y

df=df.withWatermark("ts","2 minutes").select('*',round_time(df["ts"]).alias("trunct_time"))
display(df)

| ts                           | sender | value1 | value2 | trunct_time                  | 
|------------------------------|--------|--------|--------|------------------------------| 
| 2021-04-15T14:31:45.000+0000 | A      | 1      | 4      | 2021-04-15T14:31:50.000+0000 | 
| 2021-04-15T14:32:46.000+0000 | A      | 3      | 5      | 2021-04-15T14:32:50.000+0000 | 
| 2021-04-15T14:32:16.000+0000 | B      | 8      | 100    | 2021-04-15T14:32:20.000+0000 | 
| 2021-04-15T14:33:16.000+0000 | B      | 10     | 200    | 2021-04-15T14:33:20.000+0000 | 
| 2021-04-15T14:32:16.000+0000 | A      | 2      | -3     | 2021-04-15T14:32:20.000+0000 | 
| 2021-04-15T14:32:47.000+0000 | B      | 11     | -500   | 2021-04-15T14:32:50.000+0000 | 
| 2021-04-15T14:33:17.000+0000 | A      | 0      | 2      | 2021-04-15T14:33:20.000+0000 | 

现在我计算以 10 秒为增量移动的 50 秒滑动窗口的移动平均值。

avgDF = df.withWatermark("ts","2 minutes").select('value1','sender','ts').groupBy("sender",window("ts","50 second",'10 second')).avg()
avgDF = avgDF.withColumn("window_end",avgDF.window.end).withColumnRenamed('sender','sender2')
display(avgDF)

| sender2 | window                                                                        | avg(value1) | window_end                   | 
|---------|-------------------------------------------------------------------------------|-------------|------------------------------| 
| A       | {"start":"2021-04-15T14:31:10.000+0000","end":"2021-04-15T14:32:00.000+0000"} | 1           | 2021-04-15T14:32:00.000+0000 | 
| A       | {"start":"2021-04-15T14:31:00.000+0000","end":"2021-04-15T14:31:50.000+0000"} | 1           | 2021-04-15T14:31:50.000+0000 | 
| A       | {"start":"2021-04-15T14:31:20.000+0000","end":"2021-04-15T14:32:10.000+0000"} | 1           | 2021-04-15T14:32:10.000+0000 | 
| A       | {"start":"2021-04-15T14:31:40.000+0000","end":"2021-04-15T14:32:30.000+0000"} | 1.5         | 2021-04-15T14:32:30.000+0000 | 
| A       | {"start":"2021-04-15T14:31:30.000+0000","end":"2021-04-15T14:32:20.000+0000"} | 1.5         | 2021-04-15T14:32:20.000+0000 | 
| A       | {"start":"2021-04-15T14:32:40.000+0000","end":"2021-04-15T14:33:30.000+0000"} | 1.5         | 2021-04-15T14:33:30.000+0000 | 
| B       | {"start":"2021-04-15T14:31:50.000+0000","end":"2021-04-15T14:32:40.000+0000"} | 8           | 2021-04-15T14:32:40.000+0000 | 
| A       | {"start":"2021-04-15T14:32:30.000+0000","end":"2021-04-15T14:33:20.000+0000"} | 1.5         | 2021-04-15T14:33:20.000+0000 | 

由于滑动窗口四处移动,我们最终会在聚合表中添加额外的行(上面仅部分显示):

现在我们将两个表重新连接在一起:

joined_stream=df.join(
  avgDF,expr("""
    trunct_time = window_end AND
    sender = sender2
    """),"leftOuter"                
)

display(joined_stream.select('ts','value1','value2','avg(value1)'))

|ts                          |sender|value1|value2|avg(value1)|
|----------------------------|------|------|------|-----------|
|2021-04-15T14:31:45.000+0000|A     |1     |4     |1          |
|2021-04-15T14:32:46.000+0000|A     |3     |5     |2.5        |
|2021-04-15T14:32:16.000+0000|B     |8     |100   |8          |
|2021-04-15T14:33:16.000+0000|B     |10    |200   |10.5       |
|2021-04-15T14:32:16.000+0000|A     |2     |-3    |1.5        |
|2021-04-15T14:32:47.000+0000|B     |11    |-500  |9.5        |
|2021-04-15T14:33:17.000+0000|A     |0     |2     |1.5        | 

最终结果和我想要的完全一样(*)。

* 然而,结果可能并不完全符合预期,因为原始时间戳分辨率(以秒为单位)与聚合为 10 秒的块之间可能存在不匹配

,

对于流媒体版本,我所做的事情与我为非流媒体解决方案发布的解决方案基本相同。

schema = StructType([ StructField("ts",TimestampType(),True),StructField("sender",StringType(),StructField("value1",LongType(),StructField("value2",FloatType(),True) ])

df = spark.readStream.schema(schema).format("csv").load("dbfs:/FileStore/shared_uploads/joris.vanagtmaal@wartsilaazure.com/raw_data*")

df=df.withWatermark("ts",round_time(df["ts"]).alias("trunct_time"))
avgDF = df.withWatermark("ts",'sender2').withWatermark("window_end","2 minutes")

joined_stream=df.join(
  avgDF,expr("""
    trunct_time = window_end AND
    sender = sender2 AND   
    """),"leftOuter"                
)

query = (
  joined_stream
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only)
    .queryName("joined")     # joined = name of the in-memory table
    .outputMode("append")  # append = allows stream on stream joins
    .start()
)

这会导致以下错误:

AnalysisException: Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay,which are "late rows" in downstream stateful operations and these rows can be discarded. Please refer the programming guide doc for more details. If you understand the possible risk of correctness issue and still need to run the query,you can disable this check by setting the config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.;

文档中提到:

任何以下有状态操作之后的任何有状态操作 操作可能会出现此问题:

Append 模式下的流聚合或 流-流外连接

有一个已知的解决方法:将您的流式查询拆分为多个 每个有状态运算符的查询,并确保端到端恰好每次 询问。确保最后一次查询的端到端恰好是一次 可选。

但这是关于如何解决这个问题的一个相当含糊的描述。基于:https://issues.apache.org/jira/browse/SPARK-28074

这意味着将查询拆分为多个步骤,其中 1 个有状态 操作每个并将中间结果持久化到主题。这 产生大部分可重复的结果。但当然它增加了 消息传递的整体延迟。

根据设置,这可能是也可能不是正确的解决方案,但对于此示例,我决定将检查正确性参数设置为 false,因此它不会再抛出异常,而只会在日志中写入警告。>

%sql set spark.sql.streaming.statefulOperator.checkCorrectness.enabled=False

现在它会给我想要的结果:

%sql select * from joined
| ts                           | sender | value1 | value2 | avg(value1) | 
|------------------------------|--------|--------|--------|-------------| 
| 2021-04-15T14:33:16.000+0000 | B      | 10     | 200    | 10.5        | 
| 2021-04-15T14:32:47.000+0000 | B      | 11     | -500   | 9.5         | 
| 2021-04-15T14:31:45.000+0000 | A      | 1      | 4      | 1           | 
| 2021-04-15T14:32:16.000+0000 | A      | 2      | -3     | 1.5         | 
| 2021-04-15T14:32:46.000+0000 | A      | 3      | 5      | 2.5         | 
| 2021-04-15T14:33:17.000+0000 | A      | 0      | 2      | 1.5         | 
| 2021-04-15T14:32:16.000+0000 | B      | 8      | 100    | 8           | 

/* 还有一点需要注意,这些结果只有在它们后面跟着一个新的数据点使水印超过阈值(这里是 2 分钟)时才变得可见,这在流应用程序中不是问题,但对于这个例子几分钟后,我添加了一个新的第 8 个数据点,当然出于同样的原因,它在输出中不可见。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。