如何解决Flink CEP比赛中从状态清除的所有先前事件
我在flink CEP中有一个特殊情况,其中所有事件在一次匹配中都被丢弃。例如,让我们以Apple,And,All,Bulb,Bee,Bell的输入为例。条件是单词以A开头,然后单词以B开头。但是该匹配项仅返回第一个匹配项,并且跳过了其余匹配项,因此仅返回了Apple Bulb,其他的则被丢弃。
解决方法
您应该尝试使用AfterMatchSkipStrategy的各种选项和combining patterns的不同方式。例如,如果将 values
A B
2 9 0.2719
0 0.2938
28 0.3323
15 0.3640
10 0.3647
3 15 0.3281
8 0.3310
9 0.5218
与AfterMatchSkipStrategy.noSkip()
结合使用,则将返回所有9个匹配项。
followedByAny()
DataStream<String> stringInputStream = environment.fromElements(
"apple","and","all","bulb","bee","bell");
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
Pattern<String,?> pattern = Pattern.<String>begin("start",skipStrategy)
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String s) throws Exception {
return (s.charAt(0) == 'a');
}
}).followedByAny("next")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String s) throws Exception {
return (s.charAt(0) == 'b');
}
});
PatternStream<String> patternStream = CEP.pattern(stringInputStream,pattern);
DataStream<String> result = patternStream.process(
new PatternProcessFunction<String,String>() {
@Override
public void processMatch(
Map<String,List<String>> map,Context context,Collector<String> out) throws Exception {
out.collect(map.toString());
}
});
result.print();
但是,您需要小心。设置需要无限期存储事件的模式将导致性能下降,并最终导致作业失败。您想尽可能地限制比赛。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。