微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在scala中使用flink fold函数

这是一个非常有效的尝试使用Flink折叠与 scala匿名函数

val myFoldFunction = (x: Double,t:(Double,String,String)) => x + t._1
env.readFileStream(...).
...
.groupBy(1)
.fold(0.0,myFoldFunction : Function2[Double,(Double,String),Double])

它汇编得很好,但在执行时,我得到了“类型擦除问题”(见下文).在Java中这样做很好,但当然更冗长.我喜欢简洁明了的lambda.我怎么能在scala中做到这一点?

Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.Typeinformation,scala.reflect.classtag)' Could not be determined. 
This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

解决方法

您遇到的问题是Flink [1]中的错误.问题源于Flink的TypeExtractor以及Scala DataStream API在Java实现之上的实现方式. TypeExtractor无法为Scala类型生成Typeinformation,因此返回MissingTypeinformation.创建StreamFold运算符后,手动设置此缺失类型信息.但是,StreamFold运算符的实现方式是它不接受MissingTypeinformation,因此在设置正确的类型信息之前失败.

我已经打开了一个拉取请求[2]来解决这个问题.它应该在接下来的两天内合并.通过使用最新的0.10快照版本,您的问题应该得到解决.

> [1] https://issues.apache.org/jira/browse/FLINK-2631
> [2] https://github.com/apache/flink/pull/1101

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐