微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

FlinkKinesis Data Analytics事件会话窗口

如何解决FlinkKinesis Data Analytics事件会话窗口

我是 Flink(Kinesis Data Analytics AWS 服务)的新手。我在实现 EventTimeSessionWindows 时遇到了困难。下面是我的代码

SingleOutputStreamOperator<Deserializedobj> deserializedobjSingleOutputStreamOperator = kinesisstream
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Deserializedobj>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event,timestamp) -> Long.parseLong(event.timestamp)));

        SingleOutputStreamOperator<String> sessionStream = deserializedobjSingleOutputStreamOperator
                .keyBy("anonymousId")
                .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
                .process(new ProcessWindowFunction<Deserializedobj,String,Tuple,TimeWindow>() {
                    @Override
                    public void process(Tuple tuple,Context context,Iterable<Deserializedobj> iterable,Collector<String> out) throws Exception {
                        long count = 0;
                        for (Deserializedobj obj : iterable) {
                            count++;
                        }
                        out.collect("Window: " + tuple.getField(0) + " " + context.window() + "count: " + count);
                    }
                });

基本上,我想针对在 Kinesis 流上流动的用户活动创建一个基于会话(30 分钟间隔)的窗口。我的事件不能保证以正确的顺序流动,因为它们在源端触发。我想在我的 Deserializedobj 中使用“时间戳”列来确保我可以将事件放入正确的框架(会话)中。在处理该窗口之前,我想等待几秒钟 (4) 以确保我没有丢失任何事件(因为它们是延迟事件)。

目前,我遇到了以下错误,它没有给出任何内部发生的线索。我从 Apache Flink 仪表板下的异常选项卡中捕获了此错误

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2d7aff90[Not completed,task = java.util.concurrent.Executors$RunnableAdapter@3aa696a6[Wrapped task = org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer@fc8fb39]] rejected from java.util.concurrent.ThreadPoolExecutor@520831ff[Shutting down,pool size = 8,active threads = 8,queued tasks = 0,completed tasks = 14]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
    at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:473)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:345)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.sourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)```

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。