微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink Window 分类

1)Global Window 和 Key Windows

  在运用窗口计算时,Flink根据上游数据集是否为 KeyStream 类型,对应的 Windows 也会有所不同。

※ Keyed Window:上游数据集如果是 KeyedStream 类型,则调用 DataStream API的 window() 方法,数据会根据 Key 在不同的 Task 实例中并行分别计算,最后得出针对每个 Key 统计的结果。

※ Gloabl Window:如果是 Non-Keyed 类型,则调用 WindowAll() 方法,所有的数据都会在窗口算子中由一个 Task 中计算,并 得到全局统计结果

/读取文件数据
val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
.map(line=>{
  var arr =line.split(",") 
new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.to Long) }) //Global Window data.windowAll(自定义的WindowAssigner) //Keyed Window data.keyBy(_.sid) .window(自定义的WindowAssigner)

2)Time Window 和 Count Window

  基于业务数据的方面考虑,Flink 支持两种类型的窗口,一种是基于时间的窗口 Time Window。还有一种基于输入数据的数量的窗口 Count Window

※ Time Window(时间窗口)

  根据不同的业务场景,Time Window 也可以分为三种类型,分别是滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)

  1.滚动窗口(Tumbling Window)

   滚动窗口是根据固定时间进行切分,且窗口和窗口之间的元素互不重叠。这种类型的窗口最大的特点是比较简单。只需要指定一个窗口长度(window size)

//每隔5秒统计每个基站的日志数量
data.map(stationLog=>((stationLog.sid,1)))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
//.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1) //聚合

其中时间间隔可以是 Time.milliseconds(x)、Time.seconds(x) 或 Time.minutes(x)。

  2.滑动窗口(Sliding Window)

  滑动窗口是在滚动窗口基础之上增加了窗口滑动时间(Slide Time),且允许窗口数据发生重叠。当 Windows size 固定之后,窗口并不像滚动窗口按照 Windows Size 向前移动,而是根据设定的 Slide TIme 向前滑动。窗口之间的数据重叠大小根据 Windows size 和 Sliding time 决定,当 Slide size 小于 Windows size 便会发生窗口重叠,Slide size 大于 Windows size 就会出现窗口不连续,数据可能不能在任何一个窗口内计算,Slide size 和 Windows size 相等时,Sliding Windows 其实就是 Tumbing Windows。 

//每隔3秒计算最近5秒内,每个基站的日志数量
data.map(stationLog=>((stationLog.sid,1)))
.keyBy(_._1)
.timeWindow(Time.seconds(5),Time.seconds(3))
//.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(3)))
.sum(1)

  3.会话窗口(Session Window)

   会话窗口(Session Windows)主要是将某段时间内活跃的较高的数据聚合成一个窗口进行计算,窗口触发的条件是 Session Gap,是指在规定的时间内,如果没有数据活跃接入,则认为窗口结束,然后触发窗口计算结果。需要注意的是如果数据一直不间断进入窗口,也会导致窗口不触发的情况。与滑动窗口、滚动窗口不同的是,Session Windows 不需要由固定的 windows size 和 slide time,只需要定义 session gap,来规定不活跃数据的时间上限即可。

//3秒内如果没有数据进入,则计算每个基站的日志数量
data.map(stationLog=>((stationLog.sid,1)))
.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(3)))
.sum(1)

4)Count Window(数量窗口)

Count Window 也有滚动窗口、滑动窗口等,但是使用的很少

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐