-
Flink 的容器化改造和实践
-
Flink sql 的实践和应用
-
未来规划
T
一、Flink 的容器化改造和实践
1. 有赞的集群演进历史
-
2014 年 7 月,第一个 Storm 任务正式上线;
-
2016 年,引入 Spark Streaming, 运行在 Hadoop Yarn;
-
2018 年,引入了 Flink,作业模式为 Flink on Yarn Per Job;
-
2020 年 6 月,实现了 100% Flink Jar 任务 K8s 化, K8s 作为 Flink Jar 默认计算资源,Flink sql 任务 On Yarn,Flink 统一实时开发;
-
2020 年 11 月,Storm 集群正式下线。原先的 storm 任务全部都迁移到了 Flink;
-
2021 年,我们打算把所有的 Flink 任务 K8s 化。
2. Flink 在内部支持的业务场景
Flink 支持的业务场景有风控,埋点的实时任务,支付,算法实时特征处理,BI 的实时看板,以及实时监控等等。目前的实时任务规模有 500+。
3. 有赞在 Flink on Yarn 的痛点
主要有三部分:
-
第一,cpu 没有隔离。Flink On Yarn 模式,cpu 没有隔离,某个实时任务造成某台机器 cpu 使用过高时, 会对该机器其他实时任务造成影响;
-
第二,大促扩缩容成本高。Yarn 和 HDFS 服务使用物理机,物理机在大促期间扩缩容不灵活,同时需要投入一定的人力和物力;
-
第三,需要投入人力运维。公司底层应用资源统一为 K8S,单独再对 Yarn 集群运维,会再多一类集群的人力运维成本。
4. Flink on k8s 相对于 Yarn 的优势
可以归纳为 4 点:
-
第一,统一运维。公司统一化运维,有专门的部门运维 K8S;
-
第二,cpu 隔离。K8S Pod 之间 cpu 隔离,实时任务不相互影响,更加稳定;
-
第三,存储计算分离。Flink 计算资源和状态存储分离,计算资源能够和其他组件资源进行 混部,提升机器使用率;
-
第四,弹性扩缩容。大促期间能够弹性扩缩容,更好的节省人力和物力成本。
5. 实时集群的部署情况
总体上分为三层。第一层是存储层;第二层是实时计算资源层;第三层是实时计算引擎层。
-
存储层主要分为两部分:
- 第一个就是云盘,它主要存储 Flink 任务本地的状态,以及 Flink 任务的日志;
- 第二部分是实时计算 HDFS 集群,它主要存储 Flink 任务的远端状态。
-
第二层是实时计算的资源层,分为两部分:
-
最上层有一些实时 Flink Jar,spark streaming 任务,以及 Flink sql 任务。
我们考虑混部的原因是,离线 HDFS 集群白天机器使用率不高。把离线 HDFS 集群计算资源给实时任务,离线使用内部其他组件的弹性计算资源,从而提升机器使用率,更好的达到降本效果。
6. Flink on k8s 的容器化流程
如下图所示:
-
第一步,实时平台的 Flink Jar 任务提交,Flink Jar 任务版本管理,Docker Flink 任务镜像构建,上传镜像到 Docker 镜像仓库;
-
第二步,任务启动;
-
第三步,yaml 文件创建;
-
第四步,和 k8s Api Server 之间进行命令交互;
-
第五步,从 Docker 镜像仓库拉取 Flink 任务镜像到 Flink k8s 集群;
-
最后,任务运行。这边有几个 tips:
7. 在 Flink on k8s 的一些实践
管理员根据内存分析指标以及并发度合理性,结合优化规则,预设置 Flink 资源。然后我们会和业务方沟通与调整。右图是两种分析结果,上面是 Flink on K8S pod 内存分析结果。下面是 Flink K8S 任务处理能力的分析结果。最终,我们根据这些指标就可以对任务进行一个资源的重新调整,降低资源浪费。目前我们打算把它做成一个自动化的分析调整工具。
-
接下来是 Flink on K8s 其他的相关实践:
- 第一,基于 Ingress Flink Web UI 和 Rest API 的使用。每个任务有一个 Ingress 域名,始终通过域名访问 Flink Web UI 以及 Resti API 使用;
- 第二,挂载多个 hostpath volume,解决单块云盘 IO 限制。单块云盘的写入带宽以及 IO 能力有瓶颈,使用多块云盘,降低云盘 Checkpoint 状态和本地写入的压力;
- 第三,Flink 相关通用配置 ConfigMap 化、Flink 镜像上传成功的检测。为 Filebeat、Flink 作业通用配置,创建 configmap,然后挂载到实时任务中,确保每个 Flink 任务镜像都成功上传到镜像仓库;
- 第四,HDFS 磁盘 SSD 以及基于 Filebeat 日志采集。SSD 磁盘主要是为了降低磁盘的 IO Wait 时 间,调整 dfs.block.invalidate.limit,降低 HDFS Pending delete block 数。任务日志使用 Filebeat 采集,输出到 kafka,后面通过自定义 LogServer 和离线公用 LogServer 查看。
8. Flink on K8s 当前面临的痛点
-
第一,JobManager HA 问题。JobManager Pod 如果挂掉,借助于 k8s Deployment 能力,JobManager 会根据 yaml 文件重启,状态可能会丢失。而如果 yaml 配置 Savepoint 恢复,则消息可能大量重复。我们希望后续借助于 ZK 或者 etcd 支持 Jobmanager HA;
-
第二,修改代码,再次上传时间久。一旦代码修改逻辑,Flink Jar 任务上传时间加上打镜像时间可能是分钟级别,对实时性要求比较高的业务或许有影响。我们希望后续可以参考社区的实现方式,从 HDFS 上面拉取任务 Jar 运行;
-
第三,K8S Node Down 机, JobManager 恢复慢。一旦 K8S Node down 机后, Jobmanager Pod 恢复运行需要 8分钟左右,主要是 k8s 内部异常发现时间以及作业启动时间,对部分业务有影响,比如cps实时任务。如何解决,平台端定时检测 K8s node 状态,一旦检测到 down 机状态,将 node 上面有 JobManager 所属的任务停止掉,然后从其之前 checkpoint 恢复;
-
第四,Flink on k8s 非云原生。当前通过 Flink Jar 任务并发度自动检测工具解决资源少配无法启动问题,但是如果任务的预执行计划无法获取,就无法获取到代码配置的并发度。我们的思考是:Flink on k8s 云原生功能以及前面的 1、2 问题,如果社区支持的比较快速的话,后面可能会考虑将 Flink 版本与社区版本对齐。
9. Flink on K8s的一些方案推荐
-
第一种方案,是平台自己去构建和管理任务的镜像。
-
二、Flink sql 实践和应用
1. 有赞 Flink sql 的发展历程
-
2020 年 4 月,开始支持实时数仓、有赞教育、美业、零售等相关实时需求。
-
2020 年 8 月,新版的实时平台才开始正式上线,目前主推 Flink sql 开发我们的实时任务。
2. 在 Flink sql 方面的一些实践
主要分为三个方面:
-
第一,Flink Connector 的实践包括:Flink sql 支持 Flink NSQ Connector、Flink sql 支持 Flink HA Hbase Sink 和维表、Flink sql 支持无密 MysqL Connector、Flink sql 支持标准输出(社区已经支持)、Flink sql 支持 Clickhouse Sink;
-
第二,平台层的实践包括:Flink sql 支持 UDF 以及 UDF 管理、支持任务从 Checkpoint 恢复、支持幂等函数、支持 Json 相关函数等、支持 Flink 运行相关参数配置,比如状态时间设置,聚合优化参数等等、Flink 实时任务血缘数据自动化采集、Flink 语法正确性检测功能;
3. 业务实践
-
第一个实践是我们内部的客服机器人实时看板。流程分为三层:
-
在上游,Druid 会消费这两个 topic 的数据,去进行一些指标的查询,最终提供给业务方使用。
-
第二个实践是实时用户行为中间层。用户在我们平台上面会去搜索、浏览、加入购物车等等,都会产生相应的事件。原先的方案是基于离线来做的。我们会把数据落库到 Hive 表,然后算法那边的同学会结合用户特征、机器学习的模型、离线的数据去生成一些用户评分预估,再把它输入到 HBase。
在这样的背景下面,会有如下诉求:当前的用户评分主要是基于离线任务,而算法同学希望结合实时的用户特征,更加及时、准确的提高推荐精准度。这其实就需要构建一个实时的用户行为中间层,把用户产生的事件输入到 Kafka 里面,通过 Flink sql 作业对这些数据进行处理,然后把相应的结果输出到 HBase 里面。算法的同学再结合算法模型,实时的更新模型里面的一些参数,最终实时的进行用户的评分预估,也会落库到 HBase,然后到线上使用。
用户行为中间层的构建流程分为三个步骤:
4. 在 HAHBase Connector 的实践
社区 HBase Connector 数据关联或者写入是单 HBase 集群使用,当 HBase 集群不可用时,实时任务数据的写入或者关联会受到影响,从而可能会影响到业务使用。至于怎么样去解决这个问题。首先,在 HBase 方面有两个集群,主集群和备集群。它们之间通过 WAL 进行主从的复制。Flink sql 作业先写入主集群,当主集群不可用的时候,自动降级到备集群,不会影响到线上业务的使用。
5. 无密 MysqL Connector 和指标扩展实践
左图是 Flink 无密 MysqL Sink 语法,解决的问题包括三点:然后是左下图,我们在 Flink 源码层面增加 Task 和 Operator 单条消息处理时间 Metric。目的是帮助业务方,根据消息处理时间的监控指标,排查和优化 Flink 实时任务。
6. Flink 任务血缘元数据自动化采集的实践
Flink 任务血缘元数据采集的流程如下图所示,平台启动实时任务后,根据当前任务是 Flink Jar 任务,还是 Flink sql 任务,分别走两条不同的路径,来获取任务的血缘数据,再把血缘数据上报元数据系统。这样做的价值有两点:
-
第二,更好的构建实时数仓。结合实时任务血缘图,提炼实时数据公共层,提升复用性,更好的构建实时数仓。
三、未来规划
最后是未来的规划,包括四点:-
第三,Flink sql 任务 k8s 化以及 K8s 云原生。Flink 底层计算资源统一为 k8s,降低运维成本,Flink k8s 云原生,更合理使用 K8s 资源。
-
第四,Flink 与数据湖以及 CDC 功能技术的调研。新技术的调研储备,为未来其他实时需求奠定技术基础。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。