微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink - 最新 StateBackend 状态后端详解

一.引言

使用 DataStreaming 编写流式程序时通常结合 KeyedStream 实现状态的读取与更新,为了防止数据丢失并持续恢复,状态在检查点的持久化方式和位置取决于 StateBackend,下面基于 1.8.x 和 1.13.x 新老版本的状态后端进行分析以及工程环境下状态后端的使用与调优。

二. StateBackend 简介 By 1.8.x

1.MemoryStateBackend

MemoryStateBackend 将内部状态存储在 Java TaskManager 的堆内存中,通过 CheckPoint 机制将状态进行 snapshot 快照保存至 JobManager 的堆内存中,其支持配置异步快照,只需初始化时增加 asynchronousSnapshots = true 参数即可:

认状态的大小限制为 5242880 即 5MB:

由于其状态大小的限制以及聚合状态必须存储在 JobManager 堆内存中的原因(上一篇内存分析已提到 JobManager 内存不大),MemoryStateBackend 的使用场景主要有:

- 本地调试

- 状态数据量较小的,流量不大的低吞吐的在线任务

由于新版本 HashMapStateBackend 状态后端的引入,该状态后端已弃用,可以看到上面初始化的 new MemoryStateBackend 已增加横线代表其处于 Deprecated 废弃状态。

2.FsstateBackend

Fs 顾名思义即 FileSystem,其通过配置文件系统路径来进行状态后端配置,FsstateBackend 将数据保存于 TaskManager 的堆内存中,通过 checkpoint 机制将 snapshot 快照存储于配置好的 Fs 文件系统中,由于 checkpoint 的存储不再受限于 JobManager 的内存而是转移到 Fs 文件系统,所以大大增加了其快照的存储容量,启用 FsstateBackend 需配置 Fs 地址,其同样支持 asynchronousSnapshots 异步快照参数,认为 true 即开启状态:

 FsstateBackend 的使用场景主要有:

- 大规模状态以及 checkpoint 较大的在线任务

由于新版本 HashMapStateBackend 状态后端的引入,该状态后端已弃用,可以看到上面初始化的 new FsstateBackend 已增加横线代表其处于 Deprecated 废弃状态。

3.RocksDBStateBackend

RocksDBStateBackend 适用于大状态或者容量持续增大的状态,其状态存储于执行机器,Checkpoint 快照存储于指定的 HDFS 路径即 rocksDBPath,工业场景下大规模的状态存储和读取可以考虑 RocksDBStateBackend,但是由于机器存储需要不断地序列化与反序列化,以及机器磁盘 IO 的限制,如果状态更新频率过高,任务将受限于 ValueState.get 即从机器获取状态的步骤,从而造成背压影响前面的算子。

除此之外,可以通过 enableIncrementalCheckpointing 参数配置是否进行增量 checkpoint 存储,该方法能够加速存储快照的速度,前面的 MemoryStateBackend 和 FsstateBackend 不支持该操作。

RocksDBStateBackend 的主要使用场景有:

- 具有大状态、长窗口、大键值状态的作业

- 状态持续增大,且读取频率可控的高吞吐任务

新版本下通过 EmbeddedRocksDBStateBackend 替代了原有的 RocksDBStateBackend:

新版本下推荐使用 EmbeddedRocksdBStateBend 代替 RocksDBStateBend,前者只是一个 API 的修改,其有助于 Flink 更好的实现本地状态存储与容错的分离。

三. StateBackend 简介 By 1.13.x

1.HashMapStateBackend

运行无状态作业 [即纯流式任务] 或者使用 HashMapStateBackend 时,可以将 ManagedMemory 设置为 0,这样可以确保为 JVM 上的用户代码分配更多的堆内存,而 HashMapStateBackend 状态存储的位置即为堆内存,如此配置可保证有更大的空间存储,避免堆内存过小状态过多或过大导致 OOM。

env.setStateBackend(new HashMapStateBackend());

HashMapStateBackend 状态存储于堆内,这与 MemoryStateBackend 和 FsstateBackend 的状态存储相同,其认的 checkpoint 快照也存储于堆内,此时 HashMapStateBackend 类似于 MemoryStateBackend,当 value 数量过大时会导致内存溢出;也可以通过 setCheckpointStorage 参数配置 HashMapStateBackend checkpoint 存储位置,当设定为 FileSystemCheckpointStorage 时,HashMapStateBackend 变化为 FsstateBackend ,快照 snapshot 也存储于对应的 Fs 文件系统中,所以新版本通过 HashMapStateBackend 实现了 MemoryStateBackend 与 FsstateBackend,从而导致两位老大哥被废弃。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new HashMapStateBackend)
// 切换为 MemoryStateBackend
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage)
// 切换为 FsstateBackend
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(path));

HashMapStateBackend 的主要使用场景有:

- 具有大状态、长窗口、大键值状态的作业

- 高可用的线上作业


2.EmbeddedRocksDBStateBackend

EmbeddedRocksDBStateBackend 将动态数据保存在 RocksDB 数据库中,该数据库认存储在 TaskManager 本地数据目录中,EmbeddedRocksDBStateBackend 始终执行异步快照。由于 RocksDB 的 JNI 序列化 API 基于 Byte[],因此每次获取状态都需要进行序列化与反序列化的操作,这使得 EmbeddedRocksDBStateBackend 不适应频繁大规模更新状态的作业,因为磁盘的 IO 会限制状态读取的性能。除此之外,EmbeddedRocksDBStateBackend 使用本机内存。认情况下,RocksDB 本机分配内存受限于 Managed Memory,因此需要程序提供足够多的 Managed Memory。如果禁用认的 RocksDB内存控制,RocksDB分配的内存超过请求的容器大小(总进程内存)的限制,则容器化部署中的任务管理器可能会被终止。

env.setStateBackend(new EmbeddedRocksDBStateBackend());

上面也提到了,EmbeddedRocksDBStateBackend 基本是 RocksDBStateBackend 的一次 API 转换,所以二者使用场景差异不大:

- 具有非常大的状态、长窗口、大键值状态的作业

- 状态持续增大,且读取频率可控的高吞吐任务

下面两种配置方法得到的结果相同:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new EmbeddedRocksDBStateBackend)
// 直接配置
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir")
// 通过 Fs 配置
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"))

除此之外,使用 rocksDB 需要单独引入 maven 依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

四.StateBackend 集群配置

由于老版本已弃用,这里介绍新版本的集群配置方法,相关配置信息在 flink-conf.yaml 文件下,一般在客户端文件夹下,即 /data/flink/flink-version/conf/flink-conf.yaml,主要包含如下参数:

A.状态后端

# The backend that will be used to store operator state checkpoints
state.backend: hashmap / rocksdb

hashmap 代表 HashMapStateBackend,rocksdb 代表 EmbeddedRocksDBStateBackend,也可以是继承实现了 AbstractStateBackend 的状态后端的完全限定名,例如:

org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactoryEmbeddedRocksDBStateBackend

B.快照存储

# Directory for storing checkpoints
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoint

五.总结

StateBackend version 状态存储 快照存储
MemoryStateBackend TaskManager Heap JobManager 
FsstateBackend TaskManager Heap FileSystem
RocksDBStateBackend 执行机器 RocksDBPath[Hdfs]
HashMapStateBackend TaskManager Heap FileSystem / 内存
EmbeddedRocksDBState 执行机器 RocksDBPath[Hdfs]

上述表格总结了新老版本中的 5 种 StateBackend,其中:

MemoryStateBackend 被 HashMapStateBackend + JobManagerCheckpointStorage 替代

FsstateBackend 被  HashMapStateBackend + FileSystemCheckpointStorage 替代

RocksDBStateBackend 被 EmbeddedRocksDBStateBackend  + FileSystemCheckpointStorage 替代

上述转换实现了新老版本的更迭,日常开发中,常规的带状态作业可以采用 HashMapStateBackend,增量超大状态的作业可以采用 EmbeddedRocksDBStateBackend,后续将基于工程实例,分析 HashMapStateBackend 与 EmbeddedRocksDBStateBackend 的使用与调优。

原文地址:https://www.jb51.cc/wenti/3284221.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐