微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink通过本地文件持久化算子状态并重启恢复数据

Flink在运行过程中, 难免会因为一些异常导致服务终止, 因为Flink的优势在于处理实时数据, 所以重启的话, 可能会导致部分数据指标不正确, 会丢失部分数据, 比如统计最近一小时数据, 运行半小时终止, 再次重启, 也只能重新开启统计. 但Flink可以通过state来解决这个问题, 将状态保存在内存, 文件系统或者db中, 持久化后, 即可实现故障后重启继续计算.

以下示例是通过kafka作为数据源, 统计各message出现的次数, 利用keyBy, process和窗口富函数实现state初始化及更新的.

示例

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsstateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class FlinkStateTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 每5s进行一次checkpoint
        env.enableCheckpointing(5000);
        // 恰好只消费一次 根据要求切换模式
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // checkpoint时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 最大并发产生checkpoint个数
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // job cancel之后checkpoint相关文件是否会清理
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // backend保存位置
        env.setStateBackend(new FsstateBackend("file:///Users/guands/dev/checkpoints"));

        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProps.setProperty("group.id", "flink");
        FlinkKafkaConsumer<String> kafka = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), kafkaProps);
        // kafka的offset开始位置
        kafka.setStartFromGroupOffsets();
        // 每次生成checkpoint时提交offset 保证数据不会丢失
        kafka.setCommitOffsetsOnCheckpoints(true);
        DataStreamSource<String> dataStreamByEventTime = env.addSource(kafka);
        dataStreamByEventTime
                .keyBy((KeySelector<String, String>) s -> s)
                .process(new KeyedProcessFunction<String, String, String>() {
                    @Override
                    public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx, Collector<String> out) throws Exception {
                        long count = 0;
                        if (countState.value() != null) {
                            count = countState.value();
                        }
                        count++;
                        countState.update(count);
                        System.out.println("count: " + count);
                        out.collect(value);
                    }

                    private ValueState<Long> countState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // 初始化state 获取当前key对应的个数
                        countState = getRuntimeContext().getState(new ValueStateDescriptor<>("test_count", Long.class));
                    }
                }).print();
        env.execute();
    }

}

运行后会在对应目录下产生一些文件

在这里插入图片描述

canncel job后通过以下命令重启flink job, 一般使用最新的checkpoint文件来恢复state, 避免无用计算.

重启

flink run -s D:\Users\guands\dev\checkpoints\7be830af951177a89815c8ab450b3c41\chk-6\_Metadata -c com.....FlinkStateTest D:\\flink-1.0-SNAPSHOT.jar

再次生产kafka消息, 可以看到基数在上次基础上累加. 重启成功!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐