微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink 操作示例 —— 状态

flatMap(...RichFlatMapFunction)

val keyedSensorData = sensorData.keyBy(_.id)

val alerts  = keyedSensorData
  .flatMap(new TemperatureAlert(1.7))

alerts.print()

...

class TemperatureAlert(val threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
    private var lastTempState: ValueState[Double] = _

    override def open(parameters: Configuration): Unit = {
        val lastTempDesc = new ValueStateDescriptor[Double]("lastTemp", Types.of[Double])
        lastTempState = getRuntimeContext.getState[Double](lastTempDesc)
    }

    override def flatMap(reading: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
        val lastTemp = lastTempState.value()
        val tempDiff = (lastTemp - reading.temperature).abs
        if (tempDiff > threshold) {
            out.collect(reading.id, reading.temperature, tempDiff)
        }
        this.lastTempState.update(reading.temperature)
    }
}

 

flatMapWithState

val alerts = keyedSensorData.flatMapWithState[(String, Double, Double), Double] {
  case (in: SensorReading, None) =>
    (List.empty, Some(in.temperature))
  case (r: SensorReading, lastTemp: Some[Double]) =>
    val tempDiff = (r.temperature - lastTemp.get).abs
    if (tempDiff > 1.7) {
      (List((r.id, r.temperature, tempDiff)), Some(r.temperature))
    } else {
      (List.empty, Some(r.temperature))
    }
}

 

 

233

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐