微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 Apache flink 处理乱序事件?

如何解决如何使用 Apache flink 处理乱序事件?

为了测试流处理和 Flink,我给了自己一个看似简单的问题。我的数据流由粒子的 xy 坐标以及记录位置的时间 t 组成。我的目标是用特定粒子的速度注释这些数据。所以流可能看起来像这样。

<timestamp:Long> <particle_id:String> <x:Double> <y:Double>

1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2

现在不能保证事件会按顺序到达,即 1612103771213 p2 -0.1 -0.1 可能在 10ms 之前到达说 1612103771212 p2 0.0 0.0

为简单起见,可以假设任何迟到的数据都将在早到数据的 100ms 内到达。

我承认我是流处理和 Flink 的新手,所以这可能是一个愚蠢的问题,如果有一个明显的答案,但我目前不知道如何在这里实现我的目标。

编辑

按照 David 的回答,我尝试使用 Flink Table API 对数据流进行排序,使用 nc -lk 9999 作为文本套接字流。问题是在我关闭文本套接字流之前,控制台不会打印任何内容。这是我写的scala代码-


package processor

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState,ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings,FieldExpression,WithOperations}
import org.apache.flink.util.Collector

import java.time.Duration


object AnnotateJob {

  val OUT_OF_ORDER_nesS = 100

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val tableEnv = StreamTableEnvironment.create(env,bSettings)

    env.setParallelism(1)

    // Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
    val text = env.socketTextStream("localhost",9999)
    val objStream = text
      .filter( _.nonEmpty )
      .map(new ParticleMapFunction)

    val posstream = objStream
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_nesS))
          .withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
            override def extractTimestamp(t: ParticlePos,l: Long): Long = t.t
          })
      )

    val tablePos = tableEnv.fromDataStream(posstream,$"t".rowtime() as "et",$"t",$"name",$"x",$"y")
    tableEnv.createTemporaryView("pos",tablePos)
    val sorted = tableEnv.sqlQuery("SELECT t,name,x,y FROM pos ORDER BY et ASC")

    val sortedPosstream = tableEnv.toAppendStream[ParticlePos](sorted)

    // sortedPosstream.keyBy(pos => pos.name).process(new ValAnnotator)

    sortedPosstream.print()

    // execute program
    env.execute()
  }

  case class ParticlePos(t : Long,name : String,x : Double,y : Double) extends Serializable
  case class ParticlePosVal(t : Long,y : Double,var vx : Double = 0.0,var vy : Double = 0.0) extends Serializable

  class ParticleMapFunction extends MapFunction[String,ParticlePos] {
    override def map(t: String): ParticlePos = {
      val parts = t.split("\\W+")
      ParticlePos(parts(0).toLong,parts(1),parts(2).todouble,parts(3).todouble)
    }
  }

}

解决方法

在 Flink 中执行此操作的一种方法可能是使用 KeyedProcessFunction,即可以:

  • 处理流中的每个事件
  • 保持某种状态
  • 使用基于事件时间的计时器触发一些逻辑

所以它会是这样的:

  • 您需要了解有关数据的某种“最大无序”。根据您的描述,让我们假设 100 毫秒,例如,当处理时间戳为 1612103771212 的数据时,您决定考虑您确定已收到 1612103771112 之前的所有数据。
  • 您的第一步是 keyBy() 您的流,按粒子 ID 键控。这意味着 Flink 应用程序中 next 操作符的逻辑现在可以用一个粒子的一系列事件来表达,并且每个粒子都以这种方式并行处理。

像这样:

yourStream.keyBy(...lookup p1 or p2 here...).process(new YourProcessFunction())
  • ProcessFunctionYourProcessFunction 初始化期间(即在 open 方法期间),初始化一个 ListState,您可以在其中安全地存储内容。
  • 在处理流中的元素时,在 processElement 方法中,只需将其添加到 listState 并在例如 100 毫秒内注册一个定时器触发器
  • onTimer() 方法触发时,比如说在时间 t,查看 listState 中所有时间 t - 100 的元素,如果你至少有其中两个,对它们进行排序,将它们从状态中移除,应用您描述的速度计算和注释逻辑,并将结果发送到下游。

您会发现 an example in the official Flink training 在乘坐出租车期间使用这种逻辑,这与您的用例有很多相似之处。还可以查看该存储库的各种 Readme.md 文件以了解更多详细信息。

,

一般来说,结合事件时间计时器的水印是解决乱序事件流所带来问题的解决方案。涵盖 Event Time and Watermarks 的官方 Flink 培训部分解释了其工作原理。

在更高层次上,有时使用 Flink 的 CEP 库或 Flink SQL 之类的东西更容易,因为它们可以很容易地按时间对流进行排序,从而消除所有无序。例如,参见 How to sort a stream by event time using Flink SQL 的 Flink DataStream 程序示例,该程序使用 Flink SQL 按事件时间对流进行排序。

就您而言,一个相当简单的 MATCH_RECOGNIZE 查询可以满足您的需求。可能看起来像这样,

SELECT *
    FROM event
    MATCH_RECOGNIZE (
        PARTITION BY particleId
        ORDER BY ts
        MEASURES 
            b.ts,b.particleId,velocity(a,b)
        AFTER MATCH SKIP TO NEXT ROW
        PATTERN (a b)
        DEFINE
            a AS TRUE,b AS TRUE
    )

其中,velocity(a,b) 是一个用户定义的函数,用于计算速度,给定同一粒子的两个连续事件(a 和 b)。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。