flatMap(...RichFlatMapFunction)
val keyedSensorData = sensorData.keyBy(_.id) val alerts = keyedSensorData .flatMap(new TemperatureAlert(1.7)) alerts.print() ... class TemperatureAlert(val threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] { private var lastTempState: ValueState[Double] = _ override def open(parameters: Configuration): Unit = { val lastTempDesc = new ValueStateDescriptor[Double]("lastTemp", Types.of[Double]) lastTempState = getRuntimeContext.getState[Double](lastTempDesc) } override def flatMap(reading: SensorReading, out: Collector[(String, Double, Double)]): Unit = { val lastTemp = lastTempState.value() val tempDiff = (lastTemp - reading.temperature).abs if (tempDiff > threshold) { out.collect(reading.id, reading.temperature, tempDiff) } this.lastTempState.update(reading.temperature) } }
flatMapWithState
val alerts = keyedSensorData.flatMapWithState[(String, Double, Double), Double] { case (in: SensorReading, None) => (List.empty, Some(in.temperature)) case (r: SensorReading, lastTemp: Some[Double]) => val tempDiff = (r.temperature - lastTemp.get).abs if (tempDiff > 1.7) { (List((r.id, r.temperature, tempDiff)), Some(r.temperature)) } else { (List.empty, Some(r.temperature)) } }
233
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。