微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink TaskManager 内存管理机制与调优

1.TaskManager 内存分区总览

在这里插入图片描述

在这里插入图片描述

  • Flink 总内存(Total Flink Memory)
  • JVM 堆内存(JVM Heap Memory), 框架堆上内存, Task堆上内存
  • JVM 堆外内存(JVM Off-Heap Memory), 直接内存, 管理内存
  • 托管内存(Managed Memory)
  • 直接内存(Direct Memory)
  • JVM 元空间(JVM Metaspace)
  • JVM 运行时开销(JVM Overhead)

2.TaskManager 各内存区域详解

2.1 JVM 进程总内存(Total Process Memory)

该区域表示在容器环境下,TaskManager 所在 JVM 的最大可用的内存配额,包含了本文后续介绍的所有内存区域,超用时可能被强制结束进程。我们可以通过 taskmanager.memory.process.size 参数控制它的大小。

例如我们设置 JVM 进程总内存为 4G,TaskManager 运行在 Kubernetes 平台,则 Pod 配置的 spec -> resources -> limits -> memory 项会被设置为 4Gi,源码见 org.apache.flink.kubernetes.kubeclient.decorators.InitTaskManagerDecorator#decorateMainContainer,运行时的 YAML 配置如下图:

在这里插入图片描述


而对于 YARN,如果 yarn.nodemanager.pmem-check-enabled 设为 true, 则也会在运行时定期检查容器内的进程是否超用内存。
如果进程总内存用量超出配额,容器平台通常会直接发送最严格的 SIGKILL 信号(相当于 kill -9)来中止 TaskManager,此时不会有任何延期退出的机会,可能会造成作业崩溃重启、外部系统资源无法释放等严重后果。
因此,在 有硬性资源配额检查 的容器环境下,请务必妥善设置该参数,对作业充分压测后,尽可能预留一部分安全余量,避免 TaskManager 频繁被 KILL 而导致的作业频繁重启。

2.2 Flink 总内存(Total Flink Memory)

该内存区域指的是 Flink 可以控制的内存区域,即上述提到的 JVM 进程总内存 减去 Flink 无法控制的 Metaspace(元空间)和 Overhead(运行时开销)区域。Flink 随后又把这部分内存区域划分为堆内、堆外(Direct)、堆外(Managed)等不同子区域

对于没有硬性资源限制的环境,我们建议使用 taskmanager.memory.flink.size 参数来配置 Flink 总内存的大小,然后 Flink 自己也会会自动根据参数,计算得到各个子区域的配额。如果作业运行正常,则无需单独调整。

例如 4G 的 进程总内存 配置下,JVM 运行时开销(Overhead)占 进程总内存 的 10% 但最多 1G(下图是 409.6M),元空间(Metaspace)占 256M;堆外直接(Direct)内存网络缓存占 Flink 总内存 的 10% 但最多 1G(下图是 343M),框架堆和框架堆外各占 128M,堆外管控(Managed)内存占 Flink 总内存 的 40%(下图是 1372M 即 1.34G),其他空间留给任务堆,即用户程序代码可以使用的内存空间(1459M 即 1.42G)

在这里插入图片描述

2.3 JVM 堆内存(JVM Heap Memory)

堆内存大家想必都不陌生,它是由 JVM 提供给用户程序运行的内存区域,JVM 会按需运行 GC(垃圾回收器),协助清理失效对象。
当任务启动时,ProcessMemoryUtils#generateJvmParameteRSStr 方法会通过 -Xmx -xms 参数设置堆内存的最大容量。

Flink 将堆内存从逻辑上划分为 “框架堆”、“任务堆” 两个子区域,分别通过 taskmanager.memory.framework.heap.sizetaskmanager.memory.task.heap.size 来指定其大小:框架堆认是 128m,任务堆如果未显式设置`其大小,则会通过扣减其他区域配额来计算得到。例如对于 4G 的进程总内存,扣除了其他区域后,任务堆可用的只有不到 1.5G。

但需要注意的是,Flink 自身并不能精确控制框架自身及任务会用多少堆内存,因此上述配置项只提供理论上的计算依据。如果实际用量超出配额,且 JVM 难以回收对象释放空间,则会抛出 OutOfMemoryError,此时 Flink TaskManager 会退出,导致作业崩溃重启。因此对于堆内存的监控是必须要配置的,当堆内存用量超过一定比率,或者 Full GC 时长和次数明显增长时,需要尽快介入并考虑扩容。

高级内容:对于使用 HashMapStateBackend(旧版本称之为 FileSystem StateBackend)的流作业用户,如果在进程总内存固定的前提下,希望尽可能提升任务堆的空间,则可以减少 托管内存(Managed Memory)的比例。我们接下来也会讲到它。

2.4 JVM 堆外内存(JVM Off-Heap Memory)

广义上的 堆外内存 指的是 JVM 堆之外的内存空间,而我们这里特指 JVM 进程总内存除了元空间(Metaspace)和运行时开销(Overhead)以外的内存区域。

taskmanager.memory.framework.off-heap.size,认 128MB
taskmanager.memory.task.off-heap.size,认 0,表示不使用堆外内存

2.5 托管内存(Managed Memory)

用于 RocksDB State Backend 的本地内存和批的排序、哈希表、缓存中间结果。

  1. 批处理算法,例如排序、HashJoin 等。他们会从 Flink 的 MemoryManager 请求内存片段(MemorySegment),而 MemoryManager 则会调用 UNSAFE.allocateMemory 分配堆外内存。
  2. RocksDB StateBackend,Flink 只会预留一部分空间并扣除预算,但是不介入实际内存分配。因此该类型的内存资源被称为 OpaqueMemoryResource. 实际的内存分配还是由 JNI 调用的 RocksDB 自己通过 malloc 函数申请。
  3. PyFlink。与 JNI 类似,在与 Python 进程交互的过程中,也会用到一部分托管内存。
堆外:
taskmanager.memory.managed.fraction,认 0.4
taskmanager.memory.managed.size,认 none

2.6 直接内存(Direct Memory)

直接内存是 JVM 堆外的一类内存,它提供了相对安全可控但又不受 GC 影响的空间,JVM 参数是 -XX:MaxDirectMemorySize. 它主要用于

  1. 框架自身(taskmanager.memory.framework.off-heap.size 参数,认 128M,例如 Sort-Merge Shuffle 算法所需的内存)
  2. 用户任务(taskmanager.memory.task.off-heap.size 参数,认设为 0)
  3. Netty 对 Network Buffer 的网络传输(taskmanager.memory.network.fraction 等参数,认 0.1 即 10% 的 Flink 总内存)。

2.7 JVM 元空间(JVM Metaspace)

JVM Metaspace 主要保存了加载的类和方法的元数据,Flink 配置的参数是 taskmanager.memory.jvm-Metaspace.size认大小为 256M,JVM 参数是 -XX:MaxMetaspaceSize.

如果用户编写的 Flink 程序中,有大量的动态类加载的需求,例如我们之前遇到过一个用户作业,动态编译并加载了 44 万个类,此时就容易出现元空间用量远超预期,发生 OOM 报错。此时就需要适当调大元空间的大小,或者优化用户程序,及时卸载无用的 Classloader。

2.8 JVM 运行时开销(JVM Overhead)

JVM over-head 执行开销:JVM 执行时自身所需要的内容包括线程堆栈、IO、编译缓存等所使用的内存。

除了上述描述的内存区域外,JVM 自己还有一小块 “自留地”,用来存放线程栈、编译的代码缓存、JNI 调用的库所分配的内存等等,Flink 配置参数是 taskmanager.memory.jvm-overhead.fraction认是 JVM 总内存的 10%。

taskmanager.memory.jvm-overhead.fraction,认 0.1
taskmanager.memory.jvm-overhead.min,认 192mb
taskmanager.memory.jvm-overhead.max,认 1gb

总进程内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max
大小

3.案例分析

基于Yarn模式,一般参数指定的是总进程内存,taskmanager.memory.process.size,
比如指定为 4G,每一块内存得到大小如下:

  1. JVM 元空间 256m
  2. JVM 执行开销: 4g*0.1=409.6m,在[192m,1g]之间,最终结果 409.6m
  3. Flink 内存=4g-256m-409.6m=3430.4m
  4. 网络内存=3430.4m*0.1=343.04m,在[64m,1g]之间,最终结果 343.04m
  5. 托管内存=3430.4m*0.4=1372.16m
  6. 框架内存,堆内和堆外都是 128m
  7. Task 堆内内存=3430.4m-128m-128m-343.04m-1372.16m=1459.2m

在这里插入图片描述

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定 yarn 队列
-Djobmanager.memory.process.size=2048mb \ JM2~4G 足够
-Dtaskmanager.memory.process.size=4096mb \ 单个 TM2~8G 足
-Dtaskmanager.numberOfTaskSlots=2 \ 与容器核数 1core:1slot 或 2core:1slot
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

关键在于资源情况能不能抗住高峰时期每秒的数据量,通常用QPS/TPS 来描述数据情况。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐