微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 flink 1.13 中配置 RocksDB

如何解决在 flink 1.13 中配置 RocksDB

我在 Flink 1.13 版本中读过 EmbeddedRocksDBStateBackend 但有大小限制,所以我想保留我之前的 Flink 1.11 版本的当前配置,但重点是这种配置 RocksDB 的方式已被弃用( new RocksDBStateBackend("path",true);).

我已尝试使用 EmbeddedRocksDBStateBackend (new EmbeddedRocksDBStateBackend(true)) 使用新配置,但出现此错误

java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=9126648,maxSize=5242880 . Consider using a different state backend,like the File System State backend.

从 Java 以编程方式为 flink 1.13 配置 RocksDB 状态后端的最佳方法是什么?

解决方法

在 Flink 1.13 中,我们重新组织了状态后端,因为旧的方式导致了对事物如何运作的许多误解。因此,这两个问题被解耦了:

  1. 您的工作状态的存储位置(状态后端)。 (在 RocksDB 的情况下,它应该配置为使用最快的可用本地磁盘。)
  2. 检查点的存储位置(检查点存储)。在大多数情况下,这应该是分布式文件系统。

使用旧的 API,RocksDB 的情况下涉及两个不同的文件系统的事实被检查点路径传递给 RocksDBStateBackend 构造函数的方式所掩盖。因此,该部分配置已移至别处(见下文)。

此表显示了旧状态后端和新状态后端之间的关系(结合检查点存储):

传统状态后端 新的状态后端+检查点存储
MemoryStateBackend HashMapStateBackend + JobManagerCheckpointStorage
FsStateBackend HashMapStateBackend + FileSystemCheckpointStorage
RocksDBStateBackend EmbeddedRocksDBStateBackend + FileSystemCheckpointStorage

在您的情况下,您希望将 EmbeddedRocksDBStateBackendFileSystemCheckpointStorage 一起使用。您目前遇到的问题是您将内存检查点存储 (JobManagerCheckpointStorage) 与 RocksDB 一起使用,这严重限制了可以检查点的状态数量。

您可以通过在flink-conf.yaml

中指定检查点目录来解决此问题
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/

# Optional,Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

或在您的代码中

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");

// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));

有关完整的详细信息,请参阅 Migrating from Legacy Backends 上的文档。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。