REDUCE
reduce算子是滚动聚合的泛化实现。它将一个ReduceFunction应用到了一个KeyedStream上面去。reduce算子将会把每一个输入事件和当前已经reduce出来的值做聚合计算。reduce操作不会改变流的事件类型。输出流数据类型和输入流数据类型是一样的。
对分组数据进行处理更为通用的方法是使用reduce
算子。
上图展示了reduce
算子的原理:reduce
在按照同一个Key分组的数据流上生效,它接受两个输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。
reduce函数可以通过实现接口ReduceFunction来创建一个类。ReduceFunction接口定义了reduce()
方法,此方法接收两个输入事件,输入一个相同类型的事件。
// T: the element type ReduceFunction[T] > reduce(T, T): T
下面的例子,流根据传感器ID分流,然后计算每个传感器的当前最大温度值。
scala version
val maxTempPerSensor = keyed.reduce((r1, r2) => r1.temperature.max(r2.temperature))
java version
DataStream<SensorReading> maxTempPerSensor = keyed .reduce((r1, r2) -> { if (r1.temperature > r2.temperature) { return r1; } else { return r2; } });
reduce作为滚动聚合的泛化实现,同样也要针对每一个key保存状态。因为状态从来不会清空,所以我们需要将reduce算子应用在一个有限key的流上。
实例二:
case class score(name: String, course: String, score: Int) val dataStream: DataStream[score] = senv.fromElements( score("Li", "English", 90), score("Wang", "English", 88), score("Li", "Math", 85), score("Wang", "Math", 92), score("Liu", "Math", 91), score("Liu", "English", 87)) class MyReduceFunction() extends ReduceFunction[score] { // reduce 接受两个输入,生成一个同类型的新的输出 override def reduce(s1: score, s2: score): score = { score(s1.name, "Sum", s1.score + s2.score) } } val sumReduceFunctionStream = dataStream .keyBy("name") .reduce(new MyReduceFunction)
使用Lambda表达式更简洁一些:
val sumLambdaStream = dataStream .keyBy("name") .reduce((s1, s2) => score(s1.name, "Sum", s1.score + s2.score))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。