如何解决如何从键入之前应用的不同过滤器中恢复KeyedStream
我如何分散相同的keyedStream并根据不同的用例应用过滤器,而无需在过滤结束时创建新的keyedStream? 示例:
DataStream<Event> streamFiltered = RabbitMQConnector.eventStreamObject(env)
.flatMap(new Consumer())
.name("Event Mapper")
.assignTimestampsAndWatermarks(new PeriodicExtractor())
.name("Watermarks Added")
.filter(new NullIdEventsFilterFunction())
.name("Event Filter");
/*Now I will or need to send the same keyedStream for applying two different transformations with different filters but under the same keyed concept*/
/*Once I'd applied the filter I will receive back a SingleOutputStreamOperator and then I need to keyBy again*/
/*in a normal scenario I will need to do keyBy again,and I want to avoid that */
KeyedStream<T,T> keyed1 = streamFiltered.filter(x -> x.id != null).keyBy(key -> key.id); /*wants to avoid this*/
KeyedStream<T,T> keyed2= streamFiltered.filter(x -> x.id.lenght > 10).keyBy(key -> key.id);/*wants to avoid this*/
seeProduct(keyed1);
checkProduct(keyed2);
/*these are just an example,this two operations receive a keyedStream under the same concept but with different filters applied to the keyedStream already created and wants to reuse that same keyedStream after different filters to avoid a new creation*/
private static SingleOutputStreamOperator<EventProduct>seeProduct(KeyedStream<Event,String> stream) {
return stream.map(x -> new EventProduct(x)).name("Event Product");
}
private static SingleOutputStreamOperator<EventCheck>checkProduct(KeyedStream<Event,String> stream) {
return stream.map(x -> new EventCheck(x)).name("Event Check");
}
在正常情况下,每个单个过滤器函数都将返回SingleOutputStream,然后我需要再次执行keyBy(但是我已经有一个id为keyedStream的主意,要在过滤器之后获取它,我将需要通过再次创建一个新的KeyedStream)。例如,应用过滤器后,如何保留keyedStream概念?
解决方法
我认为,在您的情况下,side output
功能会有所帮助-您可以针对每种过滤情况从基本keyed stream
处获得单独的输出。
请在flink侧输出文档:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html中查看更多详细信息和示例。
类似的事情(用伪代码)应该对您有用:
final OutputTag<Tuple2<String,Event>> outputTag1 = new OutputTag<>("side-output-filter-1"){};
final OutputTag<Tuple2<String,Event>> outputTag2 = new OutputTag<>("side-output-filter-2"){};
DataStream<Event> keyedStream = source.keyby(x -> x.id);
.process(new KeyedProcessFunction<Tuple,Tuple2<String,Event>,Event>> {
@Override
public void processElement(
Tuple2<String,Event> value,Context ctx,Collector<Tuple2<String,Event>> out) throws Exception {
// emit data to regular output
out.collect(value);
// emit data to side output
ctx.output(outputTag1,value);
ctx.output(outputTag2,value);
}
})
/*for use case one I need to use the same keyed concept but apply a filter*/
DataStream<Tuple2<String,Event>> sideOutputStream1 = keyedStream.getSideOutput(outputTag1).filter(x -> x.id != null);
/*for use case two I need to use the same keyed concept but apply a filter*/
DataStream<Tuple2<String,Event>> sideOutputStream2 = keyedStream.getSideOutput(outputTag2).filter(x -> x.id.lenght > 10);
,
最简单的答案似乎是先应用过滤,然后再使用keyBy。
如果出于某种原因需要在过滤之前对流进行键分区(例如,您可能正在应用使用键分区状态的RichFilterFunction),则可以使用reinterpretAsKeyedStream重新建立键,而无需另一个keyBy的费用。
使用侧面输出是将流分成几个过滤的子流的好方法,但是这些输出流将不再是KeyedStreams。如果重新应用键选择器功能会产生与已经存在的分区完全相同的分区,则只能安全地使用reinterpretAsKeyedStream。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。