微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

scala – Flink:如何将弃用的折叠转换为聚合?

我正在关注Flink: Monitoring the Wikipedia Edit Stream快速启动示例.

这个例子是用Java编写的,我在Scala中实现它,如下所示:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits.keyBy( _.getUser )
      .timeWindow(Time.seconds(5))
      .fold(("",0L)) {
        (acc: (String,Long),event: WikipediaEditEvent) => {
          (event.getUser,acc._2 + event.getByteDiff)
        }
      }

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }
}

但是,Flink中的fold函数已被弃用,建议使用聚合函数.

enter image description here

但是我没有找到关于如何将弃用的折叠转换为聚合的示例或教程.

知道怎么做吗?可能不仅仅是通过应用聚合.

UPDATE

我有一个实现如下:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits
      .map( e => UserWithEdits(e.getUser,e.getByteDiff) )
      .keyBy( "user" )
      .timeWindow(Time.seconds(5))
      .sum("edits")

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }

  /** Data type for words with count */
  case class UserWithEdits(user: String,edits: Long)
}

我也想知道如何使用自定义AggregateFunction进行实现.

UPDATE

我按照此文档:AggregateFunction,但有以下问题:

在版本1.3的接口AggregateFunction的源代码中,您将看到add确实返回void:

void add(IN value,ACC accumulator);

但是对于版本1.4 AggregateFunction,正在返回:

ACC add(IN value,ACC accumulator);

我该怎么处理?

我使用的Flink版本是1.3.2,此版本的文档没有AggregateFunction,但是还没有版本1.4.

enter image description here

解决方法

您将找到AggregateFunction in the Flink 1.4 docs的一些文档,包括一个示例.

1.3.2中包含的版本仅限于与可变累加器类型一起使用,其中add操作修改累加器.这已经是fixed for Flink 1.4,但还没有发布.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐