微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink检查点的大小增长到20GB以上,检查点的时间超过1分钟

如何解决Flink检查点的大小增长到20GB以上,检查点的时间超过1分钟

首要:

  • 我是Flink的新手(了解原理并能够创建我需要的任何基本流工作)
  • 我使用Kinesis Analytics运行Flink作业,认情况下,它使用间隔为1分钟的增量检查点。
  • Flink作业正在使用FlinkKinesisConsumer和自定义反序列化器(将字节反序列化为一个简单的Java对象,在整个作业中使用)从Kinesis流中读取事件

我想存档的只是简单地计算过去24小时内有多少个ENTITY_ID / FOO和ENTITY_ID / BAR事件。重要的是,此计数应尽可能准确,这就是为什么我使用此Flink功能而不是自己在5分钟的翻滚窗口上进行累加总和的原因。 我还希望能够从一开始就具有“总计”事件的计数(而不仅仅是过去24小时),因此我也要在结果中输出过去5分钟的事件计数,以便后期处理应用可以只需花费这5分钟的数据并计算总和即可。 (此计数不一定是准确的,如果发生中断并且我丢失了一些计数也可以)

现在,这项工作一直很好,直到上周我们的流量激增了10倍以上。从那以后,Flink变成了香蕉。 检查点大小开始逐渐从约500MB增长到20GB,检查点时间大约需要1分钟,并且随着时间的推移逐渐增加。 该应用程序开始出现故障,并且永远无法完全恢复,并且事件迭代器的寿命没有回升,因此没有新的事件被消耗。

由于我是Flink的新手,所以我不确定我进行滑动计数的方式是否完全未优化或完全错误

这是代码关键部分的一小段:

源(MyJsonDeserializationSchema扩展了AbstractDeserializationSchema并仅读取字节并创建Event对象):

SourceFunction<Event> source =
      new FlinkKinesisConsumer<>("input-kinesis-stream",new MyJsonDeserializationSchema(),kinesisConsumerConfig);

输入事件,简单的java pojo,将在Flink运算符中使用:

public class Event implements Serializable {
  public String entityId;
  public String entityType;
  public String entityName;
  public long eventTimestamp = System.currentTimeMillis();
}

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> eventsstream = kinesis
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(30)) {
        @Override
        public long extractTimestamp(Event event) {
          return event.eventTimestamp;
        }
      })

DataStream<Event> fooStream = eventsstream
      .filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event event) throws Exception {
          return "foo".equalsIgnoreCase(event.entityType);
        }
      })

 DataStream<Event> barStream = eventsstream
      .filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event event) throws Exception {
          return "bar".equalsIgnoreCase(event.entityType);
        }
      })


StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    Table fooTable = tEnv.fromDataStream("fooStream,entityId,entityName,entityType,eventTimestame.rowtime");
    tEnv.registerTable("Foo",fooTable);
    Table barTable = tEnv.fromDataStream("barStream,eventTimestame.rowtime");
    tEnv.registerTable("Bar",barTable);

Table slidingFooCountTable = fooTable
      .window(Slide.over("24.hour").every("5.minute").on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityId,minuteWindow")
      .select("concat(concat(entityId,'_'),entityName) as slidingFooId,entityid as slidingFooEntityid,entityName as slidingFooEntityName,entityType.count as slidingFooCount,minuteWindow.rowtime as slidingFooMinute");

Table slidingBarCountTable = barTable
      .window(Slide.over("24.hout").every("5.minute").on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityId,entityName) as slidingBarId,entityid as slidingBarEntityid,entityName as slidingBarEntityName,entityType.count as slidingBarCount,minuteWindow.rowtime as slidingBarMinute");

    Table tumblingFooCountTable = fooTable
      .window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityid,minuteWindow")
      .select("concat(concat(entityName,entityName) as tumblingFooId,entityId as tumblingFooEntityId,entityNamae as tumblingFooEntityName,entityType.count as tumblingFooCount,minuteWindow.rowtime as tumblingFooMinute");
   
    Table tumblingBarCountTable = barTable
      .window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityid,entityName) as tumblingBarId,entityId as tumblingBarEntityId,entityNamae as tumblingBarEntityName,entityType.count as tumblingBarCount,minuteWindow.rowtime as tumblingBarMinute");

    Table aggregatedTable = slidingFooCountTable
      .leftOuterJoin(slidingBarCountTable,"slidingFooId = slidingBarId && slidingFooMinute = slidingBarMinute")
      .leftOuterJoin(tumblingFooCountTable,"slidingFooId = tumblingBarId && slidingFooMinute = tumblingBarMinute")
      .leftOuterJoin(tumblingFooCountTable,"slidingFooId = tumblingFooId && slidingFooMinute = tumblingFooMinute")
      .select("slidingFooMinute as timestamp,slidingFooCreativeId as entityId,slidingFooEntityName as entityName,slidingFooCount,slidingBarCount,tumblingFooCount,tumblingBarCount");

    DataStream<Result> result = tEnv.toAppendStream(aggregatedTable,Result.class);
    result.addSink(sink); // write to an output stream to be picked up by a lambda function

如果有更多使用Flink的经验的人可以对我的计算方式发表评论,我将不胜感激?我的代码是否设计过度?是否有更好,更有效的方法来计数24小时内的事件?

我在Stackoverflow @DavidAnderson的某个地方读过,建议使用地图状态创建我们自己的滑动窗口,并按时间戳将事件切片。 但是我不确定这意味着什么,也没有找到任何代码示例来显示它。

解决方法

您正在其中创建许多窗口。如果您要创建一个大小为24h且滑动时间为5分钟的滑动窗口,这意味着其中将有很多打开的窗口,因此您可能希望您在给定日期收到的所有数据都将在考虑一下,至少要有一个窗口。因此,可以确定的是,检查点的大小和时间会随着数据本身的增长而增长。

要想得到答案,可以重写代码。您需要在此处提供更多详细信息,以了解您到底想达到什么目的。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。