如何解决Apache Flink Job中有一个对象flink运算符例如Filter或两个对象
我有Apache Flink Job,它具有来自单独的Apache Kafka主题的4个输入DataStreams(JSON消息),并且我只有一个对象XFilterFunction-进行一些过滤。我写了一些数据管道逻辑(以原始示例为例):
FilterFunction<MyEvent> xFilter = new XFilterFunction();
inputDataStream1.filter(xFilter)
.name("Xfilter")
.uid("Xfilter");
inputDataStream2
.union(inputDataStream3)
//here some logics (map,process,...)
.filter(xFilter);
在Job中使用一个新对象XFilterFunction是好是坏做法?
还是最好使用两个新对象XFilterFunction? (2个流-> 2个新的过滤器对象)
解决方法
如果您多次实例化该类,即
inputDataStream1.filter(new XFilterFunction());
...
inputDataStream2.filter(new XFilterFunction());
应该没有问题。我不确定状态或覆盖的上下文函数之类的其他东西是否会显示有害的行为。
如果没有RichFunction
的特殊化,也许甚至只是通过委托进行的纯函数调用,不幸的是,我没有那么深入地讲Flink的内部原理,但是使用上述解决方案,您应该是安全的。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。