微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

找到火花流窗口函数的最小值和最大值

如何解决找到火花流窗口函数的最小值和最大值

我有一个数据流进来,其中有一个时间戳和一个字段,如下所示:

+------+-------+
| time | event |
+------+-------+
| 1    | A     |
+------+-------+
| 2    | B     |
+------+-------+
| 2    | C     |
+------+-------+
| 2    | D     |
+------+-------+
| 3    | E     |
+------+-------+

我想按 time(以秒为单位)对数据进行分组并按计数减少,以便我有一个 time -> messages_in_second 表:

+------+-------+
| time | count |
+------+-------+
| 1    | 1     |
+------+-------+
| 2    | 3     |
+------+-------+
| 3    | 1     |
+------+-------+  

然后我想制作一个超过 30 秒的窗口,并每 5 秒获取一次该窗口中数据的斜率。因此,如果窗口的第一行计数为 0,最后一行计数为 60,则斜率应为 2,如下所示:

+------------+--------+
| window     | slope  |
+------------+--------+
| [0 to 30]  | 0.123  |
+------------+--------+
| [5 to 35]  | -0.123 |
+------------+--------+
| [10 to 40] | 0      |
+------------+--------+
| [15 to 45] | 2      |
+------------+--------+
| [20 to 50] | 4      |
+------------+--------+  

这是我目前拥有的:

    val rawInputLines = spark.readStream
      .format("text")
      .option("path","file:///E:/path/to/files")
      .load()

    val parsedDataFrame = rawInputLines.map(row => {
        // custom logic to parse the text file into a dataframe
    }).toDF(columnNames:_*)
      .withColumn("time",to_timestamp(col("time"),"yyyy/MM/dd:HH:mm:ss"))

    val windowed = parsedDataFrame
      .groupBy( col("timestamp") ).count // <- multiple aggregations not allowed 
      .groupBy(
          window(col("timestamp"),"30 seconds","5 seconds"),)
    // This is where I am stuck

所以我被困在这里有两个原因:

  1. 我收到一个错误,因为一个流 (Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;) 上不允许有多个分组依据
  2. 我不确定这是否是解决此问题的好方法,我的总体目标是找到每秒事件数量的突然偏差(更高或更低)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。