1. 简单介绍
目前所能理解的程度(持续更新),知道Flink中有三种流,DataStream, AllWindowedStream,KeyedStream,WindowedStream。
1.1 DataStream经过以下方法可以转化为AllWindowedStream
// 1.TimeWindow之滚动窗口
def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow] = {
new AllWindowedStream(javaStream.timeWindowAll(size))
}
// 2.TimeWindow之滑动窗口
def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow] = {
new AllWindowedStream(javaStream.timeWindowAll(size, slide))
}
// 3.Countwindow之滑动窗口
def countwindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow] = {
new AllWindowedStream(stream.countwindowAll(size, slide))
}
// 4.Countwindow之滚动窗口
def countwindowAll(size: Long): AllWindowedStream[T, GlobalWindow] = {
new AllWindowedStream(stream.countwindowAll(size))
}
// 5.
def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W] = {
new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](stream, assigner))
}
AllWindowedStream有很多与DataStream相似的方法。
对AllWindowStream的操作流程:
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
1.2 DataStream经过以下方法可以转化为KeyedStream
def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = asScalaStream(stream.keyBy(fields: _*))
def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple] =
asScalaStream(stream.keyBy(firstField +: otherFields.toArray: _*))
def keyBy[K: Typeinformation](fun: T => K): KeyedStream[T, K] = {
val cleanFun = clean(fun)
val keyType: Typeinformation[K] = implicitly[Typeinformation[K]]
val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
override def getProducedType: Typeinformation[K] = keyType
}
asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType))
}
def keyBy[K: Typeinformation](fun: KeySelector[T, K]): KeyedStream[T, K] = {
val cleanFun = clean(fun)
val keyType: Typeinformation[K] = implicitly[Typeinformation[K]]
asScalaStream(new JavaKeyedStream(stream, cleanFun, keyType))
}
对AllWindowStream的操作流程:
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
1.3 KeyedStream经过以下方法可以转化为WindowedStream
注意WindowedStream是由KeyedStream转换而来的
def timeWindow(size: Time): WindowedStream[T, K, TimeWindow] = {
new WindowedStream(javaStream.timeWindow(size))
}
def countwindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow] = {
new WindowedStream(javaStream.countwindow(size, slide))
}
def countwindow(size: Long): WindowedStream[T, K, GlobalWindow] = {
new WindowedStream(javaStream.countwindow(size))
}
def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow] = {
new WindowedStream(javaStream.timeWindow(size, slide))
}
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W] = {
new WindowedStream(new WindowedJavaStream[T, K, W](javaStream, assigner))
}
2. 运用示例
2.1 WindowedStream运用
2.1.1 示例一
import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
// 定义输入数据的样例类
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
// 定义窗口聚合结果的样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
object HotItems {
def main(args: Array[String]): Unit = {
// 创建一个流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 定义时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 从文件读取数据
val inputStream = env.readTextFile("F:\\SparkWorkSpace\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
// 将数据转换成样例类型,并且提取timestamp定义watermark
val dataStream = inputStream.map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
}).assignAscendingTimestamps(_.timestamp * 1000L) // 指定时间时间 将时间戳的秒转换为微秒
// 对数据进行转换 过滤出pv行为,开窗聚合统计个数
val aggStream: DataStream[ItemViewCount] = dataStream
.filter(_.behavior == "pv") // 过滤pv行为
.keyBy("itemId") // 按照itemId分组
.timeWindow(Time.hours(1), Time.minutes(5)) //定义滑动窗口
.aggregate(new CountAgg(), new ItemCountwindowResult())
val aggStream111: WindowedStream[UserBehavior, Tuple, TimeWindow] = dataStream
.filter(_.behavior == "pv") // 过滤pv行为
.keyBy("itemId") // 按照itemId分组
.timeWindow(Time.hours(1), Time.minutes(5)) //定义滑动窗口
// aggStream.print("aggStream")
// 对窗口聚合结果按照窗口进行分组,并做排序取TopN输出
val resultStream: DataStream[String] = aggStream
.keyBy("windowEnd")
.process(new TopNHotItems(5))
resultStream.print("resultStream")
env.execute("hot items job")
}
}
// 自定义预聚合函数 来一条数据就加1
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {
// 初始值
override def createAccumulator(): Long = 0L
// 来一条数据就加1
override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1
// 输出结果
override def getResult(accumulator: Long): Long = accumulator
// 两个累加器结果相加
override def merge(a: Long, b: Long): Long = a + b
}
// 扩展示例:求平均值
class AggAvg() extends AggregateFunction[UserBehavior, (Int, Long), Double] {
override def createAccumulator(): (Int, Long) = (0, 0L)
override def add(value: UserBehavior, accumulator: (Int, Long)): (Int, Long) = {
(accumulator._1 + 1, accumulator._2 + value.timestamp)
}
override def getResult(accumulator: (Int, Long)): Double = {
accumulator._2 / accumulator._1.todouble
}
override def merge(a: (Int, Long), b: (Int, Long)): (Int, Long) = {
(a._1 + b._1, a._2 + b._2)
}
}
// 自定义窗口函数,结合window信息包装成样例类 泛型:预聚合的输出作为in 输出out key window
class ItemCountwindowResult() extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
// 获取key 需要用java的元组类型
val itemId = key.asInstanceOf[Tuple1[Long]].f0
// 获取窗口结束时间
val windowEnd = window.getEnd
// 获取预聚合值
val count = input.iterator.next()
// 输出结果
out.collect(ItemViewCount(itemId, windowEnd, count))
}
}
// 自定义keyedProcessFunction
class TopNHotItems(n: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
// 定义listState,用于保存当前所有的count结果
lazy val itemCountListState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemcount-list", classOf[ItemViewCount]))
override def processElement(value: ItemViewCount,
ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context,
out: Collector[String]): Unit = {
// 每来一条数据,就把它保存在状态中
itemCountListState.add(value)
// 注册定时器,在windowEnd+100触发 1.定时器是对keyBy后的每个键分别起作用2.对于同一个key多次注册也只有一个
// 对窗口分组,对其中的每条数据排序,然后输出
ctx.timerService().registerEventTimeTimer(value.windowEnd + 100)
}
// 定时器触发时,从状态中取数据,然后排序输出
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext,
out: Collector[String]): Unit = {
// 先把状态中的数据提取到一个ListBuffer中
val allItemCountList: ListBuffer[ItemViewCount] = ListBuffer()
// 由于itemCountListState底层是java使用scala的遍历语法需要引入隐式转换
import scala.collection.JavaConversions._
for (itemCount <- itemCountListState.get()) {
allItemCountList += itemCount
}
// 按照count大小排序
val sortedItemCountList = allItemCountList.sortBy(_.count)(Ordering.Long.reverse).take(n)
// 清空状态
itemCountListState.clear()
// 排名信息格式化成String, 方便监控显示
val result: StringBuilder = new StringBuilder
result.append("时间").append(new Timestamp(timestamp - 100)).append("\n")
// 遍历sorted列表,输出TopN信息
for (i <- sortedItemCountList.indices) {
// 获取当前商品的count信息
val currentItemCount = sortedItemCountList(i)
result.append("Top").append(i + 1).append(":")
.append(" 商品Id=").append(currentItemCount.itemId)
.append(" 访问量=").append(currentItemCount.count)
.append("\n")
}
result.append("===============================\n\n")
Thread.sleep(1000)
// 输出结果
out.collect(result.toString())
}
}
2.1.2 示例二
import java.sql.Timestamp
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
// 定义输入数据样例类
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
// 定义聚合结果样例类
case class PageViewCount(url: String, windowEnd: Long, count: Long)
object NetworkTopNPage {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 从文件中读取数据
val inputStream: DataStream[String] = env.readTextFile("F:\\SparkWorkSpace\\UserBehaviorAnalysis\\NetworKFlowAnalysis\\src\\main\\resources\\apache.log")
// 转换成样例类,指定timestamp和watermark
val dataStream = inputStream
.map(data => {
val dataArray = data.split(" ")
// 将时间字段转换为时间戳
val simpleDataFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val timestamp = simpleDataFormat.parse(dataArray(3)).getTime
ApacheLogEvent(dataArray(0), dataArray(1), timestamp, dataArray(5), dataArray(6))
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(60)) {
override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime
})
// 开窗聚合
val aggStream = dataStream
// .keyBy("url")
.keyBy(_.url) // 后面获取间的类型就可以不是Tuple类型了,且可以直接获取
.timeWindow(Time.minutes(10), Time.seconds(5))
.aggregate(new PageCountAgg(), new PageCountwindowResult())
// 每个窗口的统计值排序输出
val resultStream = aggStream
// .key("window")
.keyBy(_.windowEnd)
.process(new TopNHotPage(3))
resultStream.print("resultStream")
env.execute(" ")
}
}
// 自定义预聚合函数
class PageCountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator + 1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
// 自定义windowFunction, 包装成样例类
class PageCountwindowResult() extends WindowFunction[Long, PageViewCount, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PageViewCount]): Unit = {
out.collect(PageViewCount(key, window.getEnd, input.head))
}
}
// 自定义Process Function
class TopNHotPage(n: Int) extends KeyedProcessFunction[Long, PageViewCount, String] {
// 定义ListState保存所有聚合结果
lazy val pageCountListState = getRuntimeContext.getListState(new ListStateDescriptor[PageViewCount]("pagecount-list", classOf[PageViewCount]))
override def processElement(value: PageViewCount,
ctx: KeyedProcessFunction[Long,
PageViewCount, String]#Context, out: Collector[String]): Unit = {
pageCountListState.add(value)
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
}
// 等到数据都到齐,从状态中取出,排序输出
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
val allPageCountList: ListBuffer[PageViewCount] = ListBuffer()
val iter = pageCountListState.get().iterator()
// 清空状态
pageCountListState.clear()
while (iter.hasNext) {
allPageCountList += iter.next()
val sortedPageCountList: ListBuffer[PageViewCount] = allPageCountList.sortWith(_.count > _.count).take(n)
// 排名信息格式化成String, 方便监控显示
val result: StringBuilder = new StringBuilder
result.append("时间").append(new Timestamp(timestamp - 1)).append("\n")
// 遍历sorted列表,输出TopN信息
for (i <- sortedPageCountList.indices) {
// 获取当前商品的count信息
val currentItemCount = sortedPageCountList(i)
result.append("Top").append(i + 1).append(":")
.append(" 页面url=").append(currentItemCount.url)
.append(" 访问量=").append(currentItemCount.count)
.append("\n")
}
result.append("===============================\n\n")
Thread.sleep(1000)
// 输出结果
out.collect(result.toString())
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。