本文主要介绍内容
一、flink分层架构
API层:主要包含 Flink 的流处理API 和批处理API
执行引擎:Flink 的执行处理,Flink 的执行引擎基于流处理实现。
物理资源层:Flink 任务执行的物理资源,主要有 本地(JVM) , 集群(yarn),云端(GCE/EC2)等,flink1.11以上版本也支持k8s部署。
二、flink系统架构
Jobmanger:Flink系统协调者,负责接收job任务并调度job的多个task执行。同时负责job信息的收集和管理Taskmanger。
Taskmanger:负责执行计算的Worker,同时进行所在节点的资源管理(包括内存,cup,网络),启动时向Jobmanger汇报资源信息
三、flink任务部署流程
flink目前支持本地,yarn,k8s等多种部署方案。
3.1、flink任务yarn部署流程
jobmanger 和taskmanger 分别申请Container 资源运行各自的进程。
jobmanger 和yarn AM 属于同一个Container中,从而 yarn AM 可进行申请Container及调度Taskmanger
HDFS 用于数据的存储,如checkpoints, savepoints 等数据
四、flink任务执行流程及原理
一个flink程序执行时,都会映射为一个Streaming Dataflow 进行处理,类似一个DAG图。从Source开始到Sink结束。
4.1、 flink程序与DataFlow映射
支持的Source:kafka,hdfs,本地文件
支持的Sink:kafka,MysqL,hdfs,本地文件
4.2、parallel Dataflow (并发原理)
flink程序天生就支持并行及分部署处理:
a、一个Stream支持分为多个Stream分区,一个Operate支持分成多个Operate Subtask,每个Subtask都执行在不同的线程中。
b、一个Operate 并行度等于Operate Subtask个数,Stream并行度总等于Operate并行度。
parallel Dataflow 示例:
Source并行度为2,Sink并行度为1。
上图展示了Operate与Stream之间存在的两种模式:
a、One-to-one模式:Source[1]–>Map[1]的数据流模式,该模式保持了Source的分区特性及数据处理的有序性。
b、Redistribution模式:map[1]–>apply()[2]的数据流模式,该模式会改变数据流的分区。其与选择的Operate操作有关。
4.3、Task & Operator Chain
flink 在分布式环境中会将多个Operate Subtask 串在一起作为一个 Operate Chain 的执行链。每个执行链在Taskmanger上独立的线程中执行。多个Operate 通过Stream进行连接。每个Operate对应一个task。
下图分别展示单个并发与多个并发的执行原理图。
4.4、时间窗口
flink支持基于时间和数据的时间窗口。Flink支持基于多种时间的窗口。
a、基于事件的创建时间
b、基于事件进入 Dataflow 的时间
c、基于某Operate对事件处理时的本地时间
各种时间所处的位置及含义:
五、checkpoint原理
flink是在Chandy–Lamport算法[的基础上实现的一种分布式快照。通过不断的生成分布式Streaming数据流Snapshot,实现利用snapshot进行数据流的恢复处理。
5.1、checkpoint主要步骤
a、Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint
b、source 节点向下游广播 barrier (附带在数据流中,随DAG流动)
c、task收到barrier后,异步的执行快照并进行持久化处理。
d、sink完成快照后,表示本次checkpoint完成。并将所有快照数据进行整合持久化处理。
5.2、Barrier
a、Stream Barrier是Flink分布式Snapshotting中的核心元素,它会对数据流进行记录并插入到数据流中,对数据流进行分组,并沿着数据流的方向向前推进。
b、每个Barrier会携带一个Snapshot ID,属于该Snapshot的记录会被推向该Barrier的前方。Barrier非常轻量,不会中断数据流处理。
带有Barrier的数据流图:
5.3、Strean Aligning
当Operate具有多个数据输入流时,需在Snapshot Barrier中进行数据对齐处理。
具体处理过程:
a、Operator从一个incoming Stream接收到Snapshot Barrier n,然后暂停处理,直到其它的incoming Stream的Barrier n(否则属于2个Snapshot的记录就混在一起了)到达该Operator。
b、接收到Barrier n的Stream被临时搁置,来自这些Stream的记录不会被处理,而是被放在一个Buffer中
c、一旦最后一个Stream接收到Barrier n,Operator会emit所有暂存在Buffer中的记录,然后向Checkpoint Coordinator发送Snapshot n
d、继续处理来自多个Stream的记录
基于Stream Aligning操作能够实现Exactly Once语义,但是也会给流处理应用带来延迟,因为为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中。通常以最迟对齐Barrier的一个Stream做为处理Buffer中缓存记录的时刻点。可通过开关,选择是否使用Stream Aligning,如果关掉则Exactly Once会变成At least once。
5.4、State Backend(数据持久化方案)
flink的State Backend 是实现快照持久化的重要功能,flink将State Backend抽象成一种插件,支持三种State Backend。
a、MemoryStateBackend:基于内存实现,将数据存储在堆中。数据过大会导致OOM问题,不建议生产环境使用,默认存储的大小为4M。
设置内存大小:env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE))
b、FsstateBackend:将数据持久化到文件系统包括(本地,hdfs,Amazon,阿里云),通过地址进行指定。
// 使用HDFS作为State Backend
env.setStateBackend(new FsstateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"))
// 使用Amazon作为State Backend
env.setStateBackend(new FsstateBackend("s3://<your-bucket>/<endpoint>"))
// 关闭Asynchronous Snapshot,默认开启
env.setStateBackend(new FsstateBackend(checkpointPath, false))
c、RocksDBStateBackend:RocksDB是一种嵌入式Key-Value数据库,数据实际保存在本地磁盘上。比起FsstateBackend的本地状态存储在内存中,RocksDB利用了磁盘空间,所以可存储的本地状态更大。从RocksDB中读写数据都需要进行序列化和反序列化,读写成本更高。允许增量快照,每次快照时只对发生变化的数据增量写到分布式存储上,而不是将所有的本地状态都拷贝过去。
// 开启Incremental Checkpoint (增量快照)
val enableIncrementalCheckpointing = true
env.setStateBackend(new RocksDBStateBackend(checkpointPath, enableIncrementalCheckpointing))
六、checkpoint相关配置
默认情况下,Checkpoint机制是关闭的,需要调用env.enableCheckpointing(n)来开启。参数需要配置统一封装在CheckpointConfig中,常用配置如下:
val cpConfig: CheckpointConfig = env.getCheckpointConfig
// 使用At-Least-Once
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
// 超时时间100s
env.getCheckpointConfig.setCheckpointTimeout(100*1000)
// 两次Checkpoint的间隔为60秒
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60*1000)
// 最多同时进行3个Checkpoint
env.getCheckpointConfig.setMaxConcurrentCheckpoints(3)
作业取消后仍然保存Checkpoint,否则自动删除Checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 关闭checkpoint失败重启任务
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。