当用户用Session cli命令启动集群时,首先会在Flink集群启动脚本中调用ClusterEntrypoint抽象类中提供的main()方法,以启动和运行相应类型的集群环境。
也就是说,ClusterEntrypoint是整个集群的入口类,且带有main()方法。在运行时管理中,所有的服务都是通过CE类进行触发和启动,进而完成核心组件的创建和初始化。
我们先通过下图看一下CE抽象类的继承关系
可以看到ClusterEntrypoint分为两类
- SessionClusterEntrypoint
- 只建立一个集群,能够同时运行多个作业,这样资源利用率更高,但是如果集群挂掉, 会影响很多作业。
- JobClusterEntrypoint
standalone对应的本地模式,mesos、yarn集群模式的不同调度器。
我们再从StandalonesessionClusterEntrypoint中的main()方法开始,看看ClusterEntrypoint如何启动集群
public static void main(String[] args) {
// startup checks and logging 启动配置检查和日志加载
Environmentinformation.logEnvironmentInfo(LOG, StandalonesessionClusterEntrypoint.class.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
EntrypointClusterConfiguration entrypointClusterConfiguration = null;
final CommandLineParser<EntrypointClusterConfiguration> commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());
try {
entrypointClusterConfiguration = commandLineParser.parse(args);
} catch (FlinkParseException e) {
LOG.error("Could not parse command line arguments {}.", args, e);
commandLineParser.printHelp(StandalonesessionClusterEntrypoint.class.getSimpleName());
System.exit(1);
}
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
StandalonesessionClusterEntrypoint entrypoint = new StandalonesessionClusterEntrypoint(configuration);
//经过上面一系列的配置之后,通过调用CE抽象类的runclusterEntrypoint启动
ClusterEntrypoint.runclusterEntrypoint(entrypoint);
}
通过最后一行代码我们可以发现,经过一系列的配置和日志加载,最后调用了ClusterEntrypoint里的runclusterEntrypoint方法。我们再来看看这个方法干了什么。
public static void runclusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {
clusterEntrypoint.startCluster();//⭐通过这一行启动集群
} catch (ClusterEntrypointException e) {
LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> {
final int returnCode;
if (throwable != null) {
returnCode = RUNTIME_FAILURE_RETURN_CODE;
} else {
returnCode = applicationStatus.processExitCode();
}
LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable);
System.exit(returnCode);
});
}
上述代码中带⭐的代码又调用的CE.startCluster()继续启动,然后等运行结束,用clusterEntrypoint.getTerminationFuture().whenComplete()获取运行结束状态并进行对应的处理。
我们再看看startCluster()干了什么
public void startCluster() throws ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
try {
configureFileSystems(configuration);//配置文件系统
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured((Callable<Void>) () -> {
runcluster(configuration);//⭐在securityContext安全环境里继续启动
return null;
});
} catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
try {
// clean up any partial state
shutDownAsync(
ApplicationStatus.Failed,
ExceptionUtils.stringifyException(strippedThrowable),
false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
strippedThrowable.addSuppressed(e);
}
throw new ClusterEntrypointException(
String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()),
strippedThrowable);
}
}
注意⭐号的代码,这里是SecurityContext在继续runcluster,而不是ClusterEntrypoint在做,继续看runcluster
private void runcluster(Configuration configuration) throws Exception {
synchronized (lock) {
//⭐初始化运行时集群需要创建的基础组件服务,如HAServices、CommonRPCService等。
initializeServices(configuration);
// write host information into configuration 把host信息写入配置
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
final dispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createdispatcherResourceManagerComponentFactory(configuration);
//⭐创建集群组件clusterComponent
//⭐其中包含了resourceManager、dispatcher、webMonitorEndpoint
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new AkkaQueryServiceRetriever(
metricQueryServiceActorSystem,
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
this);
clusterComponent.getShutDownFuture().whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNowN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}
}
这一步启动了多种服务和组件,并通过dispatcherResourceManagerComponentFactory调用create来启动,继续看
@Override
public dispatcherResourceManagerComponent<T> create(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler) throws Exception {
leaderRetrievalService dispatcherleaderRetrievalService = null;
leaderRetrievalService resourceManagerRetrievalService = null;
WebMonitorEndpoint<U> webMonitorEndpoint = null;
ResourceManager<?> resourceManager = null;
JobManagerMetricGroup jobManagerMetricGroup = null;
T dispatcher = null;
try {
dispatcherleaderRetrievalService = highAvailabilityServices.getdispatcherleaderRetriever();
resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerleaderRetriever();
final leaderGatewayRetriever<dispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
dispatcherGateway.class,
dispatcherId::fromUuid,
10,
Time.milliseconds(50L));
final leaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
10,
Time.milliseconds(50L));
final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"dispatcherRestEndpoint");
final long updateInterval = configuration.getLong(Metricoptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher = updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getWebMonitorleaderElectionService(),
fatalErrorHandler);//⭐创建webMonitorEndpoint
log.debug("Starting dispatcher REST endpoint.");
webMonitorEndpoint.start();//⭐启动webMonitorEndpoint
final String hostname = getHostname(rpcService);
jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
metricRegistry,
hostname,
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
resourceManager = resourceManagerFactory.createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
fatalErrorHandler,
new Clusterinformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
jobManagerMetricGroup); //⭐创建ResourceManager
final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
dispatcher = dispatcherFactory.createdispatcher(
configuration,
rpcService,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricRegistry.getMetricQueryServicePath(),
archivedExecutionGraphStore,
fatalErrorHandler,
historyServerArchivist);//⭐创建dispatcher
log.debug("Starting ResourceManager.");
resourceManager.start();//⭐启动ResourceManager
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
log.debug("Starting dispatcher.");
dispatcher.start();//⭐启动dispatcher
dispatcherleaderRetrievalService.start(dispatcherGatewayRetriever);
return createdispatcherResourceManagerComponent(
dispatcher,
resourceManager,
dispatcherleaderRetrievalService,
resourceManagerRetrievalService,
webMonitorEndpoint,
jobManagerMetricGroup);
} catch (Exception exception) {
// clean up all started components
if (dispatcherleaderRetrievalService != null) {
try {
dispatcherleaderRetrievalService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
if (resourceManagerRetrievalService != null) {
try {
resourceManagerRetrievalService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
if (webMonitorEndpoint != null) {
terminationFutures.add(webMonitorEndpoint.closeAsync());
}
if (resourceManager != null) {
terminationFutures.add(resourceManager.closeAsync());
}
if (dispatcher != null) {
terminationFutures.add(dispatcher.closeAsync());
}
final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeall(terminationFutures);
try {
terminationFuture.get();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
if (jobManagerMetricGroup != null) {
jobManagerMetricGroup.close();
}
throw new FlinkException("Could not create the dispatcherResourceManagerComponent.", exception);
}
}
几个创建和启动组件的地方用⭐标注出来了。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。