如何解决如何使用 Apache flink 处理乱序事件?
为了测试流处理和 Flink,我给了自己一个看似简单的问题。我的数据流由粒子的 x
和 y
坐标以及记录位置的时间 t
组成。我的目标是用特定粒子的速度注释这些数据。所以流可能看起来像这样。
<timestamp:Long> <particle_id:String> <x:Double> <y:Double>
1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2
现在不能保证事件会按顺序到达,即 1612103771213 p2 -0.1 -0.1
可能在 10ms
之前到达说 1612103771212 p2 0.0 0.0
。
为简单起见,可以假设任何迟到的数据都将在早到数据的 100ms
内到达。
我承认我是流处理和 Flink 的新手,所以这可能是一个愚蠢的问题,如果有一个明显的答案,但我目前不知道如何在这里实现我的目标。
编辑
按照 David 的回答,我尝试使用 Flink Table API 对数据流进行排序,使用 nc -lk 9999
作为文本套接字流。问题是在我关闭文本套接字流之前,控制台不会打印任何内容。这是我写的scala代码-
package processor
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState,ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings,FieldExpression,WithOperations}
import org.apache.flink.util.Collector
import java.time.Duration
object AnnotateJob {
val OUT_OF_ORDER_nesS = 100
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env,bSettings)
env.setParallelism(1)
// Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
val text = env.socketTextStream("localhost",9999)
val objStream = text
.filter( _.nonEmpty )
.map(new ParticleMapFunction)
val posstream = objStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_nesS))
.withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
override def extractTimestamp(t: ParticlePos,l: Long): Long = t.t
})
)
val tablePos = tableEnv.fromDataStream(posstream,$"t".rowtime() as "et",$"t",$"name",$"x",$"y")
tableEnv.createTemporaryView("pos",tablePos)
val sorted = tableEnv.sqlQuery("SELECT t,name,x,y FROM pos ORDER BY et ASC")
val sortedPosstream = tableEnv.toAppendStream[ParticlePos](sorted)
// sortedPosstream.keyBy(pos => pos.name).process(new ValAnnotator)
sortedPosstream.print()
// execute program
env.execute()
}
case class ParticlePos(t : Long,name : String,x : Double,y : Double) extends Serializable
case class ParticlePosVal(t : Long,y : Double,var vx : Double = 0.0,var vy : Double = 0.0) extends Serializable
class ParticleMapFunction extends MapFunction[String,ParticlePos] {
override def map(t: String): ParticlePos = {
val parts = t.split("\\W+")
ParticlePos(parts(0).toLong,parts(1),parts(2).todouble,parts(3).todouble)
}
}
}
解决方法
在 Flink 中执行此操作的一种方法可能是使用 KeyedProcessFunction,即可以:
- 处理流中的每个事件
- 保持某种状态
- 使用基于事件时间的计时器触发一些逻辑
所以它会是这样的:
- 您需要了解有关数据的某种“最大无序”。根据您的描述,让我们假设 100 毫秒,例如,当处理时间戳为
1612103771212
的数据时,您决定考虑您确定已收到1612103771112
之前的所有数据。 - 您的第一步是
keyBy()
您的流,按粒子 ID 键控。这意味着 Flink 应用程序中 next 操作符的逻辑现在可以用一个粒子的一系列事件来表达,并且每个粒子都以这种方式并行处理。
像这样:
yourStream.keyBy(...lookup p1 or p2 here...).process(new YourProcessFunction())
- 在
ProcessFunction
的YourProcessFunction
初始化期间(即在open
方法期间),初始化一个ListState
,您可以在其中安全地存储内容。 - 在处理流中的元素时,在
processElement
方法中,只需将其添加到listState
并在例如 100 毫秒内注册一个定时器触发器 - 当
onTimer()
方法触发时,比如说在时间t
,查看listState
中所有时间 t - 100 的元素,如果你至少有其中两个,对它们进行排序,将它们从状态中移除,应用您描述的速度计算和注释逻辑,并将结果发送到下游。
您会发现 an example in the official Flink training 在乘坐出租车期间使用这种逻辑,这与您的用例有很多相似之处。还可以查看该存储库的各种 Readme.md 文件以了解更多详细信息。
,一般来说,结合事件时间计时器的水印是解决乱序事件流所带来问题的解决方案。涵盖 Event Time and Watermarks 的官方 Flink 培训部分解释了其工作原理。
在更高层次上,有时使用 Flink 的 CEP 库或 Flink SQL 之类的东西更容易,因为它们可以很容易地按时间对流进行排序,从而消除所有无序。例如,参见 How to sort a stream by event time using Flink SQL 的 Flink DataStream 程序示例,该程序使用 Flink SQL 按事件时间对流进行排序。
就您而言,一个相当简单的 MATCH_RECOGNIZE 查询可以满足您的需求。可能看起来像这样,
SELECT *
FROM event
MATCH_RECOGNIZE (
PARTITION BY particleId
ORDER BY ts
MEASURES
b.ts,b.particleId,velocity(a,b)
AFTER MATCH SKIP TO NEXT ROW
PATTERN (a b)
DEFINE
a AS TRUE,b AS TRUE
)
其中,velocity(a,b) 是一个用户定义的函数,用于计算速度,给定同一粒子的两个连续事件(a 和 b)。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。