在 Flink 1.5.0 版本发布的时候,Flink 迎来了一个重要的改进:根据 FLIP-6 重构了 Flink 集群部署和任务处理模型,以便更好地和管理资源和调度任务,更优雅地和 Yarn、 Mesos、Kubernetes 等框架进行集成。
在这篇文章中,我们将对 Flink 集群的启动流程加一分析。本文的分析基于 Flink 1.9-SNAPSHOT 版本的代码。
HA 及 Leader 选举
Flink 内部的组件如 ResourceManager, JobManager 等都可以配置 HA 模式,Flink 集群启动的的时候会大量涉及到 Leader 选举,Leader 地址获取等相关的操作,因而先对 HA 相关的概念进行介绍。
Leader 地址的获取通过 LeaderRetrievalLister 和 LeaderRetriverService 这两个接口来完成。 LeaderRetriverService 可以启动一个对 Leader 地址的监听,在 Leader 选举完成后得到通知。
public interface LeaderRetrievalService { void start(LeaderRetrievalListener listener) throws Exception; void stop() throws Exception; } public interface LeaderRetrievalListener { void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID); void handleError(Exception exception); }
GatewayRetriver 接口用于获取 RpcGateway,抽象类 LeaderGatewayRetriver 则同时继承了 LeaderRetriever 和 GatewayRetriver,因而1)可以在Leader选举完成后得到 Leader 地址 2)可以获取到 Leader 的 RpcGateway。
RpcGatewayRetriever 是 LeaderGatewayRetriver 的具体实现,根据 Leader 的地址通过 RpcService.connect() 方法获得对应 Leader 的 RpcGateway。
class RpcGatewayRetriever<F extends Serializable, T extends FencedRpcGateway<F>> extends LeaderGatewayRetriever<T> { @Override protected CompletableFuture<T> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture) { return FutureUtils.retryWithDelay( () -> leaderFuture.thenCompose( (Tuple2<String, UUID> addressLeaderTuple) -> rpcService.connect( addressLeaderTuple.f0, fencingTokenMapper.apply(addressLeaderTuple.f1), gatewayType)), retries, retryDelay, rpcService.getScheduledExecutor()); } }
Leader 选举是通过 LeaderElectionService(选举服务)和 LeaderContender(参与竞选的对象)共同来完成的,每一次选举成功后都会有唯一的 leaderSessionID,可以用来作为 RpcGateway 通信的 fence token。当一个 LeaderContender 竞选成功了,会通过 LeaderContender#grantLeadership 得到通知。
public interface LeaderElectionService { void start(LeaderContender contender) throws Exception; void stop() throws Exception; void confirmLeaderSessionID(UUID leaderSessionID); boolean hasLeadership(@Nonnull UUID leaderSessionId); } public interface LeaderContender { void grantLeadership(UUID leaderSessionID); void revokeLeadership(); String getAddress(); void handleError(Exception exception); }
LeaderElectionService 有多种实现,如无需进行选举过程的 StandaloneLeaderElectionService,以及借助 zookeeper 和 curator 框架实现的 ZooKeeperLeaderElectionService,具体的实现细节可参考对应的源码。
HighAvailabilityServices 接口则提供了获取 HA 相关所有服务的方法,包括:
ResourceManager 选举服务及 Leader 获取
Dispatcher 选举服务及 Leader 获取
任务状态的注册表
checkpoint recovery、blob store 等相关的服务
MiniCluster 的启动流程
我们先从最为简单的 MiniCluster 着手,分析一下 Flink 的启动流程以及内部各组件之间的交互。 MiniCluster 可以看做是一个内嵌的 Flink 运行时环境,所有的组件都在独立的本地线程中运行。MiniCluster 的启动入口在 LocalStreamEnvironment 中。
在 MiniCluster#start 中,启动流程大致分为三个阶段:
创建一些辅助的服务,如 RpcService, HighAvailabilityServices, BlobServer 等 启动 TaskManager 启动 Dispatcher, ResourceManager 等
创建 HighAvailabilityServices
class MiniCluster { public void start() { //...... ioExecutor = Executors.newFixedThreadPool( Hardware.getNumberCPUCores(), new ExecutorThreadFactory("mini-cluster-io")); haServices = createHighAvailabilityServices(configuration, ioExecutor); //...... } protected HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor) throws Exception { LOG.info("Starting high-availability services"); return HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( configuration, executor); } }
HighAvailabilityServicesUtils 是创建 HighAvailabilityServices 的工具类,在没有配置 HA 的情况下,会创建 EmbeddedHaServices。 EmbeddedHaServices 不具备高可用的特性,适用于 ResourceMangaer, TaksManager,JobManager 等所有组件都运行在同一个进程的情况。EmbeddedHaService 为各组件创建的选举服务为 EmbeddedLeaderElectionService, 一旦有参与选举的 LeaderContender 加入,该 contender 就被选择为 leader。
启动 TaskManager
class MiniCluster { public void start() { //...... startTaskManagers(); //...... } private void startTaskManagers() throws Exception { final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); for (int i = 0; i < numTaskManagers; i++) { startTaskExecutor(); } } @VisibleForTesting void startTaskExecutor() throws Exception { synchronized (lock) { final Configuration configuration = miniClusterConfiguration.getConfiguration(); final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager( configuration, new ResourceID(UUID.randomUUID().toString()), taskManagerRpcServiceFactory.createRpcService(), haServices, heartbeatServices, metricRegistry, blobCacheService, useLocalCommunication(), taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size())); taskExecutor.start(); taskManagers.add(taskExecutor); } } }
在创建 HighAvailabilityServices 之后,就可以依次启动 TaskManager 了。TaskManagerRunner#startTaskManager 会创建一个 TaskExecutor, TaskExecutor 实现了 RpcEndpoint 接口。 TaskExecutor 需要和 ResourceManager 等组件进行通信,可以通过 HighAvailabilityServices 获得对应的服务地址。
在 TaskExecutor 启动的回调函数中,会启动一系列服务
class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { public void onStart() throws Exception { try { //启动服务 startTaskExecutorServices(); } catch (Exception e) { final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e); onFatalError(exception); throw exception; } //超时交由 FatalErrorHandler 进行处理 startRegistrationTimeout(); } private void startTaskExecutorServices() throws Exception { try { // start by connecting to the ResourceManager resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); // tell the task slot table who's responsible for the task slot actions taskSlotTable.start(new SlotActionsImpl()); // start the job leader service jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService()); } catch (Exception e) { handleStartTaskExecutorServicesException(e); } } /** * The listener for leader changes of the resource manager. */ private final class ResourceManagerLeaderListener implements LeaderRetrievalListener { @Override public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { //获得 ResourceManager 的地址, 和 ResourceManager 建立连接 runAsync( () -> notifyOfNewResourceManagerLeader( leaderAddress, ResourceManagerId.fromUuidOrNull(leaderSessionID))); } @Override public void handleError(Exception exception) { onFatalError(exception); } } private final class JobLeaderListenerImpl implements JobLeaderListener { @Override public void jobManagerGainedLeadership( final JobID jobId, final JobMasterGateway jobManagerGateway, final JMTMRegistrationSuccess registrationMessage) { //和 JobManager 建立连接 runAsync( () -> establishJobManagerConnection( jobId, jobManagerGateway, registrationMessage)); } @Override public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) { log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId); runAsync(() -> closeJobManagerConnection( jobId, new Exception("Job leader for job id " + jobId + " lost leadership."))); } @Override public void handleError(Throwable throwable) { onFatalError(throwable); } } }
当 ResourceManagerLeaderListener 的监听被回调时,TaskExecutor 会试图建立和 ResourceManager 的连接,连接被封装为 TaskExecutorToResourceManagerConnection。一旦获取 ResourceManager 的 leader 被确定后,就可以获取到 ResourceManager 对应的 RpcGateway, 接下来就可以通过 RPC 调用发起 ResourceManager#registerTaskExecutor 注册流程。注册成功后,TaskExecutor 向 ResourceManager 报告其资源(主要是 slots)情况。
class TaskExecutor { private void establishResourceManagerConnection( ResourceManagerGateway resourceManagerGateway, ResourceID resourceManagerResourceId, InstanceID taskExecutorRegistrationId, ClusterInformation clusterInformation) { //发送SlotReport final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport( getResourceID(), taskExecutorRegistrationId, taskSlotTable.createSlotReport(getResourceID()), taskManagerConfiguration.getTimeout()); //...... //连接建立 establishedResourceManagerConnection = new EstablishedResourceManagerConnection( resourceManagerGateway, resourceManagerResourceId, taskExecutorRegistrationId); stopRegistrationTimeout(); } }
启动 DispatcherResourceManagerComponent
class MiniCluster { public void start() { //...... dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents( configuration, dispatcherResourceManagreComponentRpcServiceFactory, haServices, blobServer, heartbeatServices, metricRegistry, metricQueryServiceRetriever, new ShutDownFatalErrorHandler() )); //...... } protected Collection<? extends DispatcherResourceManagerComponent<?>> createDispatcherResourceManagerComponents( Configuration configuration, RpcServiceFactory rpcServiceFactory, HighAvailabilityServices haServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { //Session dispatcher, standalone resource manager SessionDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(); return Collections.singleton( dispatcherResourceManagerComponentFactory.create( configuration, rpcServiceFactory.createRpcService(), haServices, blobServer, heartbeatServices, metricRegistry, new MemoryArchivedExecutionGraphStore(), metricQueryServiceRetriever, fatalErrorHandler)); } @Nonnull private SessionDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() { return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE); } }
在 MiniCluster 模式下,会创建一个 SessionDispatcherResourceManagerComponent 对象。SessionDispatcherResourceManagerComponent 继承自 DispatcherResourceManagerComponent,用来启动 Dispatcher, ResourceManager,和 WebMonitorEndpoint, 这些组件都在同一个进程中运行。MiniCluster 模式下启动的是 StandaloneDispatcher 和 StandaloneResourceManager。
在工厂类创建 DispatcherResourceManagerComponent, 会启动 Dispatcher, ResourceManager 等组件:
public abstract class AbstractDispatcherResourceManagerComponentFactory<T extends Dispatcher, U extends RestfulGateway> implements DispatcherResourceManagerComponentFactory<T> { @Override public DispatcherResourceManagerComponent<T> create( Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, ArchivedExecutionGraphStore archivedExecutionGraphStore, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { //....... webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getWebMonitorLeaderElectionService(), fatalErrorHandler); log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start(); resourceManager = resourceManagerFactory.createResourceManager( configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), jobManagerMetricGroup); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint); dispatcher = dispatcherFactory.createDispatcher( configuration, rpcService, highAvailabilityServices, resourceManagerGatewayRetriever, blobServer, heartbeatServices, jobManagerMetricGroup, metricRegistry.getMetricQueryServiceGatewayRpcAddress(), archivedExecutionGraphStore, fatalErrorHandler, historyServerArchivist); log.debug("Starting ResourceManager."); resourceManager.start(); resourceManagerRetrievalService.start(resourceManagerGatewayRetriever); log.debug("Starting Dispatcher."); dispatcher.start(); dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); return createDispatcherResourceManagerComponent( dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup); } }
在 ResourceManager 启动的回调函数中,会通过 HighAvailabilityServices 获取到选举服务,从而参与到选举之中。并启动 JobLeaderIdService,管理向当前 ResourceManager 注册的作业的 leader id。
abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> extends FencedRpcEndpoint<ResourceManagerId> implements ResourceManagerGateway, LeaderContender { @Override public void onStart() throws Exception { try { startResourceManagerServices(); } catch (Exception e) { final ResourceManagerException exception = new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()), e); onFatalError(exception); throw exception; } } private void startResourceManagerServices() throws Exception { try { leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService(); initialize(); //参与选举 leaderElectionService.start(this); jobLeaderIdService.start(new JobLeaderIdActionsImpl()); registerSlotAndTaskExecutorMetrics(); } catch (Exception e) { handleStartResourceManagerServicesException(e); } } }
在 Dispatcher 启动的回调函数中,当前 Dispatcher 也会通过 LeaderElectionService 参与选举。
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener { //resource manager 的 gateway retriever,可以和 resource manager 通信 private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever; @Override public void onStart() throws Exception { try { startDispatcherServices(); } catch (Exception e) { final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), e); onFatalError(exception); throw exception; } } private void startDispatcherServices() throws Exception { try { submittedJobGraphStore.start(this); leaderElectionService.start(this); registerDispatcherMetrics(jobManagerMetricGroup); } catch (Exception e) { handleStartDispatcherServicesException(e); } } }
提交 JobGraph
通过 MiniCluster#executeJobBlocking 提交 JobGraph 并等待运行完成,提交JobGraph和请求运行结果的逻辑如下,都是通过 RPC 调用来实现:
class MiniCluster { public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) { //通过 Dispatcher 的 gateway retriever 获取 DispatcherGateway final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture(); // we have to allow queued scheduling in Flip-6 mode because we need to request slots // from the ResourceManager jobGraph.setAllowQueuedScheduling(true); final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture); final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph); //通过 RPC 调用向 Dispatcher 提交 JobGraph final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture .thenCombine( dispatcherGatewayFuture, (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout)) .thenCompose(Function.identity()); return acknowledgeCompletableFuture.thenApply( (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID())); } public CompletableFuture<JobResult> requestJobResult(JobID jobId) { return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestJobResult(jobId, RpcUtils.INF_TIMEOUT)); } }
Dispatcher 在接收到提交 JobGraph 的请求后,会将提交的 JobGraph 保存在 SubmittedJobGraphStore 中(用于故障恢复),并为提交的 JobGraph 启动 JobManager:
class Dispatcher { private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) { final RpcService rpcService = getRpcService(); //创建 JobManagerRunner final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync( CheckedSupplier.unchecked(() -> jobManagerRunnerFactory.createJobManagerRunner( jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler)), rpcService.getExecutor()); return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner)); } private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception { final JobID jobId = jobManagerRunner.getJobGraph().getJobID(); jobManagerRunner.getResultFuture().whenCompleteAsync( (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { // check if we are still the active JobManagerRunner by checking the identity //noinspection ObjectEquality if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) { if (archivedExecutionGraph != null) { jobReachedGloballyTerminalState(archivedExecutionGraph); } else { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); if (strippedThrowable instanceof JobNotFinishedException) { jobNotFinished(jobId); } else { jobMasterFailed(jobId, strippedThrowable); } } } else { log.debug("There is a newer JobManagerRunner for the job {}.", jobId); } }, getMainThreadExecutor()); //启动JobManager jobManagerRunner.start(); return jobManagerRunner; } }
启动的 JobManagerRunner 会竞争 leader ,一旦被选举为 leader,就会启动一个 JobMaster。
public class JobManagerRunner implements LeaderContender, OnCompletionActions, AutoCloseableAsync { public void start() throws Exception { try { //竞争leader leaderElectionService.start(this); } catch (Exception e) { log.error("Could not start the JobManager because the leader election service did not start.", e); throw new Exception("Could not start the leader election service.", e); } } //被选举为 leader @Override public void grantLeadership(final UUID leaderSessionID) { synchronized (lock) { if (shutdown) { log.info("JobManagerRunner already shutdown."); return; } leadershipOperation = leadershipOperation.thenCompose( (ignored) -> { synchronized (lock) { return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID); } }); handleException(leadershipOperation, "Could not start the job manager."); } } private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) { final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus(); return jobSchedulingStatusFuture.thenCompose( jobSchedulingStatus -> { if (jobSchedulingStatus == JobSchedulingStatus.DONE) { return jobAlreadyDone(); } else { return startJobMaster(leaderSessionId); } }); } private CompletionStage<Void> startJobMaster(UUID leaderSessionId) { log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.", jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress()); try { runningJobsRegistry.setJobRunning(jobGraph.getJobID()); } catch (IOException e) { return FutureUtils.completedExceptionally( new FlinkException( String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()), e)); } final CompletableFuture<Acknowledge> startFuture; try { //使用特定的 JobMasterId 启动 JobMaster startFuture = jobMasterService.start(new JobMasterId(leaderSessionId)); } catch (Exception e) { return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e)); } final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture; return startFuture.thenAcceptAsync( (Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture), executor); } }
JobMaster 启动后会和 ResourceManager 建立连接,连接被封装为 ResourceManagerConnection。一旦连接建立之后,JobMaster 就可以通过 RPC 调用和 ResourceManager 进行通信了:
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService { private void startJobMasterServices() throws Exception { // start the slot pool make sure the slot pool now accepts messages for this leader slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor()); scheduler.start(getMainThreadExecutor()); //TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start // try to reconnect to previously known leader reconnectToResourceManager(new FlinkException("Starting JobMaster component.")); // job is ready to go, try to establish connection with resource manager // - activate leader retrieval for the resource manager // - on notification of the leader, the connection will be established and // the slot pool will start requesting slots resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); } }
在此之后就进入了任务调度执行的流程。
Standalone Cluster 模式的启动流程
在 Standalone 模式下,TaskManager 和 ResourceManager 等都在独立的进程中运行。Standalone Cluster 有两种启动方式, 即 standalonesession 模式和 standalonejob 方式,它们区别在于 Dispatcher 的实现方式不同。
JobManager 的启动
需要注意的一点是,这里我们所说的 JobManager 指的是包含 Dispatcher, ResouceManager 等组件的单一进程,而并非 Dispatcher 为执行 JobGraph 而启动的 JobManagerRunner。在 FLIP-6 的实现中,每个 JobGraph 的调度执行的实际上是由一个独立的 JobMaster 负责的。
standalonesession 方式启动的 JobManager 的入口类是 StandaloneSessionClusterEntrypoint, 继承自 SessionClusterEntrypoint;与此对应的是,以 standalonejob 方式启动 JobManager 的入口类是 StandaloneJobClusterEntryPoint,继承自 JobClusterEntrypoint。它们都由公共父类 ClusterEntrypoint 派生而来,区别在于生成的 DispatcherResourceManagerComponent 不同。
先来看下启动过程,实际上和 MiniCluster 模式下启动 DispatcherResourceManagerComponent 的过程类似:
abstract class ClusterEntrypoint { private void runCluster(Configuration configuration) throws Exception { synchronized (lock) { //初始化 RpcService, HighAvailabilityServices 等服务 initializeServices(configuration); // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); //生成 DispatcherResourceManagerComponentFactory,由具体子类实现 final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); //创建 DispatcherResourceManagerComponent, 启动 ResourceManager, Dispatcher clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), this); //一旦 DispatcherResourceManagerComponent#getShutDownFuture 完成,则关闭各项服务 clusterComponent.getShutDownFuture().whenComplete( (ApplicationStatus applicationStatus, Throwable throwable) -> { if (throwable != null) { shutDownAsync( ApplicationStatus.UNKNOWN, ExceptionUtils.stringifyException(throwable), false); } else { // This is the general shutdown path. If a separate more specific shutdown was // already triggered, this will do nothing shutDownAsync( applicationStatus, null, true); } }); } } }
这里生成的 HighAvailabilityServices 和 MiniCluster 模式下略有区别,由于各组件不在同一个进程中,因而需要从配置中加载配置:1)如果采用基于 Zookeeper 的 HA 模式,则创建 ZooKeeperHaServices,基于 zookeeper 获取 leader 通信地址 2)如果没有配置 HA,则创建 StandaloneHaServices, 并从配置文件中获取各组件的 RPC 地址信息。
在 StandaloneSessionClusterEntrypoint 中,生成 DispatcherResourceManagerComponent 的工厂类是 SessionDispatcherResourceManagerComponentFactory,该工厂类创建 SessionDispatcherResourceManagerComponent:由 SessionDispatcherFactory 创建 StandaloneDispatcher, 由 StandaloneResourceManagerFactory 创建 StandaloneResourceManager。
在 StandaloneJobClusterEntrypoint 中,生成 DispatcherResourceManagerComponent 的工厂类是 JobDispatcherResourceManagerComponentFactory,该厂类创建 JobDispatcherResourceManagerComponent:由 StandaloneResourceManagerFactory 创建 StandaloneResourceManager,由 JobDispatcherFactory 创建 MiniDispatcher。一个 MiniDispatcher 和一个 JobGraph 相绑定,一旦绑定的 JobGraph 执行结束,则关闭 MiniDispatcher,进而停止 JobManager 进程。
Dispatcher 和 ResourceManager 服务内部的启动流程则和 MiniCluster 中一致,这里不再赘述。
TaskManager 的启动
TaskManager 的启动入口在 TaskManagerRunner 中,它的启动流程和 MiniCluster 模式下基本一致,区别在于: 1)运行在独立的进程中, 2)HighAvailabilityServices 的创建要依赖配置文件获取。 TaskManagerRunner 会创建 TaskExecutor,TaskExecutor 通过 HighAvailabilityServices 获取 ResourceManager 的通信地址,并和 ResourceManager 建立连接。
Yarn Cluster 的启动流程
Yarn Cluster 的启动入口在 FlinkYarnSessionCli 中 :首先根据命令行参数创建 YarnClusterDescriptor,接着调用 YarnClusterDescriptor#deploySessionCluster 触发集群的部署。
实际启动的逻辑在 AbstractYarnClusterDescriptor#deployInternal 中,主要就是通过 YarnClient 向 yarn 集群提交应用,启动 ApplicationMaster:
abstract class AbstractYarnClusterDescriptor { protected ClusterClient<ApplicationId> deployInternal( ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, @Nullable JobGraph jobGraph, boolean detached) throws Exception { //....... ApplicationReport report = startAppMaster( flinkConfiguration, applicationName, yarnClusterEntrypoint, jobGraph, yarnClient, yarnApplication, validClusterSpecification); //..... return createYarnClusterClient( this, validClusterSpecification.getNumberTaskManagers(), validClusterSpecification.getSlotsPerTaskManager(), report, flinkConfiguration, true); } }
根据 sessioncluster 和 jobcluster 者两种启动的区别, 提交到 Yarn 中 ApplicationMatser 的入口类分别为 YarnSessionClusterEntrypoint 和 YarnJobClusterEntrypoint, 区别在于 Dispatcher 分别为 StandaloneDispatcher 和 MiniDispatcher。ResoureManager 的具体实现类为 YarnResourceManager。
和前述的 Standalone Cluster 不同, Yarn Cluster 模式下启动的 Flink 集群,其 TaskManager 是由 YarnResourceManager 根据 JobMaster 的请求动态向 Yarn 的 ResourceManager 进行申请的。在 JobMaster 向 ResourceManager 申请资源时,如果当前没有足够的资源分配,则 YarnResourceManager 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 TaskManager:
class YarnResourceManager { //申请container private void requestYarnContainer() { resourceManagerClient.addContainerRequest(getContainerRequest()); // make sure we transmit the request fast and receive fast news of granted allocations resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); numPendingContainerRequests++; log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.", resource, numPendingContainerRequests); } //分配container的回调函数 @Override public void onContainersAllocated(List<Container> containers) { runAsync(() -> { final Collection<AMRMClient.ContainerRequest> pendingRequests = getPendingRequests(); final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator(); for (Container container : containers) { log.info( "Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); if (numPendingContainerRequests > 0) { removeContainerRequest(pendingRequestsIterator.next()); final String containerIdStr = container.getId().toString(); final ResourceID resourceId = new ResourceID(containerIdStr); workerNodeMap.put(resourceId, new YarnWorkerNode(container)); try { // Context information used to start a TaskExecutor Java process ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext( container.getResource(), containerIdStr, container.getNodeId().getHost()); //启动 TaskManager nodeManagerClient.startContainer(container, taskExecutorLaunchContext); } catch (Throwable t) { log.error("Could not start TaskManager in container {}.", container.getId(), t); // release the failed container workerNodeMap.remove(resourceId); resourceManagerClient.releaseAssignedContainer(container.getId()); // and ask for a new one requestYarnContainerIfRequired(); } } else { // return the excessive containers log.info("Returning excess container {}.", container.getId()); resourceManagerClient.releaseAssignedContainer(container.getId()); } } // if we are waiting for no further containers, we can go to the // regular heartbeat interval if (numPendingContainerRequests <= 0) { resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis); } }); } //创建 TaskManager 的启动上下文 private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host) throws Exception { // init the ContainerLaunchContext final String currDir = env.get(ApplicationConstants.Environment.PWD.key()); final ContaineredTaskManagerParameters taskManagerParameters = ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numberOfTaskSlots); log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " + "JVM direct memory limit {} MB", containerId, taskManagerParameters.taskManagerTotalMemoryMB(), taskManagerParameters.taskManagerHeapSizeMB(), taskManagerParameters.taskManagerDirectMemoryLimitMB()); Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig); log.debug("TaskManager configuration: {}", taskManagerConfig); ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext( flinkConfig, yarnConfig, env, taskManagerParameters, taskManagerConfig, currDir, YarnTaskExecutorRunner.class, //入口类 log); // set a special environment variable to uniquely identify this container taskExecutorLaunchContext.getEnvironment() .put(ENV_FLINK_CONTAINER_ID, containerId); taskExecutorLaunchContext.getEnvironment() .put(ENV_FLINK_NODE_ID, host); return taskExecutorLaunchContext; } }
小结
本文简单分析了 Flink 集群的启动流程,以及 ResourceManager、 TaskExecutor Dispatcher、 JobMaster 等不同组件之间的通信过程。
转发自:https://blog.jrwang.me/2019/flink-source-code-bootstarp/