文章目录
概要
上一篇文章Spark RPC之Master启动并提供服务介绍了standalone模式下Master端的实现,接着我们看下Worker端的实现,以及Worker如何向Master启动,注册及发送心跳
1. Class Worker
查看Worker,和Master一样,Worker也是RpcEndpoint的子类,所以接下来查看RpcEndpoint生命周期的四个方法: onStart -> receive(receiveAndReply)* -> onStop。
1.1 Class Worker之onStart()
- 如果配置spark.shuffle.service.enabled=true,启动独立的shuffle service。
- 启动Worker的webUI,默认端口8081。
- 本篇的第一个重要部分,向Master注册以及发送心跳信息。但是,这部分,在前面已经写过了。,这里只是简单列一下流程,具体代码请参考请参考Master注册机制原理剖析
Worker端:
Master端:
receive()
函数中接收到Worker发来的注册消息–RegisterWorker
- 判断一下当前的Master是否是
standby Master
- 判断Worker是否已经注册过
cotain(id)
- Master 如果决定接收注册的工人,首先会创建
WorkerInfo
对象来保存注册的 Worker 的信息- 接着就是注册此Worker:
- 先过滤掉状态为DEAD
的Worker,对于状态为UNKNowN
的Worker,使用removeWorker
清理掉旧的Worker信息(包括清理该worker下的 Executors 和driver),替换为新的Worker信息
- 然后将worker加入内存缓存中- 使用
persistenceEngine()
将 Worker信息持久化send()
通知Worker注册成功- 调用
Schedule()
进行调度
Worker端:
- 等待接收Master返回的Response:
Case RegisterWorkerResponse => handleRegisterResponse(msg)
- 在
handleRegisterResponse
中,如果Case RegisteredWorker,那么
- 将当前状态修改为Registered
- 将修改MasterRef为当前的Master
- 定时使用masterRef.send()向Master发送HeartBeat。Master每60s查看Worker连接情况,Worker端每15s发送一次心跳(参考Spark Rpc之Master实现)
- 如果设置spark.worker.cleanup.enabled=true,清除Worker的工作目录。
- 通过masterRef向Master发送自己的WorkerLatestState,主要之让Master去判断与Worker相关的Executor和Driver是否应该继续运行,如果不,那么Masster会通知Worker去KillExecutor、KillDriver
- 针对Master的消息进行Kill.
1.2 Class Worker之receive()
1.3 Class Worker之receiveAndReply()
只处理Worker状态查询。
1.4 Class Worker之onStop()
相比于Master,多了executors、drivers和shuffleService的关闭。
2. Object Worker
启动程序和Master
几乎一致,但至少传入一个参数,MasterURL
,格式为spark://host:port
。
2.1 Object Worker之main()
2.2 Object Worker之startRpcEnvAndEndpoint()
总结
上一篇Spark RPC之Master启动并提供服务讲述了Master是如何接受Worker请求注册的信息和心跳机制
,本篇文章讲解了Worker
端对应的行为,如下
Worker
的onStart
方法中如何发送注册信息给Master
- 在注册成功后,
Worker
在处理Master
返回信息时,启动定时任务,每15s发送心跳
完整流程如下
至此,standalone模式
下,spark如何使用rpc完成Worker注册和心跳机制
就介绍完了。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。