微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Apache Flink EventTime 处理不起作用

如何解决Apache Flink EventTime 处理不起作用

我正在尝试在 KDA 上使用 Flink v1.11 应用程序执行流连接。加入 wrt 到 ProcessingTime 有效,但使用 EventTime 我看不到 Flink 的任何输出记录。

这是我的 EventTime 处理代码不起作用,

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    DataStream<Trade> input1 = createSourceFromInputStreamName1(env)
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Trade>forMonotonousTimestamps()
                            .withTimestampAssigner(((event,l) -> event.getEventTime()))
            );
    DataStream<Company> input2 = createSourceFromInputStreamName2(env)
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Company>forMonotonousTimestamps()
                            .withTimestampAssigner(((event,l) -> event.getEventTime()))
            );
    DataStream<String> joinedStream = input1.join(input2)
            .where(new TradeKeySelector())
            .equalTo(new CompanyKeySelector())
            .window(TumblingEventTimeWindows.of(Time.seconds(30)))
            .apply(new JoinFunction<Trade,Company,String>() {
                @Override
                public String join(Trade t,Company c) {
                    return t.getEventTime() + "," + t.getTicker() + "," + c.getName() + "," + t.getPrice();
                }
            });
    joinedStream.addSink(createS3SinkFromStaticConfig());
    env.execute("Flink S3 Streaming Sink Job");
}

我与 ProcessingTime 有类似的加入

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    DataStream<Trade> input1 = createSourceFromInputStreamName1(env);
    DataStream<Company> input2 = createSourceFromInputStreamName2(env);
    DataStream<String> joinedStream = input1.join(input2)
            .where(new TradeKeySelector())
            .equalTo(new CompanyKeySelector())
            .window(TumblingProcessingTimeWindows.of(Time.milliseconds(10000)))
            .apply (new JoinFunction<Trade,String> (){
                @Override
                public String join(Trade t," + t.getPrice();
                }
            });
    joinedStream.addSink(createS3SinkFromStaticConfig());
    env.execute("Flink S3 Streaming Sink Job");
}

来自我试图加入的两个流的示例记录:

{'eventTime': 1611773705,'ticker': 'TBV','price': 71.5}
{'eventTime': 1611773705,'name': 'The Bavaria'}

解决方法

我没有发现任何明显错误,但以下任何一种情况都可能导致此作业不产生任何输出:

  • 水印问题。例如,如果流之一变得空闲,则水印将停止前进。或者,如果窗口之后没有事件,则水印不会前进到足以关闭该窗口。或者,如果时间戳实际上不是按升序排列的(使用 forMonotonousTimestamps 策略,事件应该按时间戳排序),管道可能会默默地丢弃所有无序事件。
  • StreamingFileSink 仅在检查点期间完成其输出,并且不会在作业停止时完成任何待处理的文件。
  • 窗口连接的行为类似于内部连接,并且需要来自每个输入流的至少一个事件才能在给定的窗口间隔内产生任何结果。从您分享的示例来看,这似乎不是问题。

更新:

鉴于您(似乎)想要做的是将每个交易与交易时可用的最新公司记录连接,查找连接或临时表连接似乎是不错的方法。>

这里有几个例子:

https://github.com/ververica/flink-sql-cookbook/blob/master/joins/04/04_lookup_joins.md

https://github.com/ververica/flink-sql-cookbook/blob/master/joins/03/03_kafka_join.md

一些文档:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#event-time-temporal-join

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/versioned_tables.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。