在 Flink 1.5.0 版本发布的时候,Flink 迎来了一个重要的改进:根据 FLIP-6 重构了 Flink 集群部署和任务处理模型,以便更好地和管理资源和调度任务,更优雅地和 Yarn、 Mesos、Kubernetes 等框架进行集成。

在这篇文章中,我们将对 Flink 集群的启动流程加一分析。本文的分析基于 Flink 1.9-SNAPSHOT 版本的代码。

HA 及 Leader 选举
Flink 内部的组件如 ResourceManager, JobManager 等都可以配置 HA 模式,Flink 集群启动的的时候会大量涉及到 Leader 选举,Leader 地址获取等相关的操作,因而先对 HA 相关的概念进行介绍。

Leader 地址的获取通过 LeaderRetrievalLister 和 LeaderRetriverService 这两个接口来完成。 LeaderRetriverService 可以启动一个对 Leader 地址的监听,在 Leader 选举完成后得到通知。

public interface LeaderRetrievalService {
    void start(LeaderRetrievalListener listener) throws Exception;
    void stop() throws Exception;
}


public interface LeaderRetrievalListener {
    void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);
    void handleError(Exception exception);
}

GatewayRetriver 接口用于获取 RpcGateway,抽象类 LeaderGatewayRetriver 则同时继承了 LeaderRetriever 和 GatewayRetriver,因而1)可以在Leader选举完成后得到 Leader 地址 2)可以获取到 Leader 的 RpcGateway。

RpcGatewayRetriever 是 LeaderGatewayRetriver 的具体实现,根据 Leader 的地址通过 RpcService.connect() 方法获得对应 Leader 的 RpcGateway。

class RpcGatewayRetriever<F extends Serializable, T extends FencedRpcGateway<F>> extends LeaderGatewayRetriever<T> {
    @Override
    protected CompletableFuture<T> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture) {
        return FutureUtils.retryWithDelay(
            () ->
                leaderFuture.thenCompose(
                    (Tuple2<String, UUID> addressLeaderTuple) ->
                        rpcService.connect(
                            addressLeaderTuple.f0,
                            fencingTokenMapper.apply(addressLeaderTuple.f1),
                            gatewayType)),
            retries,
            retryDelay,
            rpcService.getScheduledExecutor());
    }
}

Leader 选举是通过 LeaderElectionService(选举服务)和 LeaderContender(参与竞选的对象)共同来完成的,每一次选举成功后都会有唯一的 leaderSessionID,可以用来作为 RpcGateway 通信的 fence token。当一个 LeaderContender 竞选成功了,会通过 LeaderContender#grantLeadership 得到通知。

public interface LeaderElectionService {
    void start(LeaderContender contender) throws Exception;

    void stop() throws Exception;

    void confirmLeaderSessionID(UUID leaderSessionID);

    boolean hasLeadership(@Nonnull UUID leaderSessionId);
}

public interface LeaderContender {
    void grantLeadership(UUID leaderSessionID);

    void revokeLeadership();

    String getAddress();

    void handleError(Exception exception);
}

LeaderElectionService 有多种实现,如无需进行选举过程的 StandaloneLeaderElectionService,以及借助 zookeeper 和 curator 框架实现的 ZooKeeperLeaderElectionService,具体的实现细节可参考对应的源码。

HighAvailabilityServices 接口则提供了获取 HA 相关所有服务的方法,包括:

ResourceManager 选举服务及 Leader 获取
Dispatcher 选举服务及 Leader 获取
任务状态的注册表
checkpoint recovery、blob store 等相关的服务

MiniCluster 的启动流程

我们先从最为简单的 MiniCluster 着手,分析一下 Flink 的启动流程以及内部各组件之间的交互。 MiniCluster 可以看做是一个内嵌的 Flink 运行时环境,所有的组件都在独立的本地线程中运行。MiniCluster 的启动入口在 LocalStreamEnvironment 中。

在 MiniCluster#start 中,启动流程大致分为三个阶段:

创建一些辅助的服务,如 RpcService, HighAvailabilityServices, BlobServer 等
启动 TaskManager
启动 Dispatcher, ResourceManager 等

创建 HighAvailabilityServices

class MiniCluster {
    public void start() {
        //......
        ioExecutor = Executors.newFixedThreadPool(
                    Hardware.getNumberCPUCores(),
                    new ExecutorThreadFactory("mini-cluster-io"));
        haServices = createHighAvailabilityServices(configuration, ioExecutor);
        //......
    }

    protected HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor) throws Exception {
        LOG.info("Starting high-availability services");
        return HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
            configuration,
            executor);
    }
}

HighAvailabilityServicesUtils 是创建 HighAvailabilityServices 的工具类,在没有配置 HA 的情况下,会创建 EmbeddedHaServices。 EmbeddedHaServices 不具备高可用的特性,适用于 ResourceMangaer, TaksManager,JobManager 等所有组件都运行在同一个进程的情况。EmbeddedHaService 为各组件创建的选举服务为 EmbeddedLeaderElectionService, 一旦有参与选举的 LeaderContender 加入,该 contender 就被选择为 leader。

启动 TaskManager

class MiniCluster {
    public void start() {
        //......
        startTaskManagers();
        //......
    }

    private void startTaskManagers() throws Exception {
        final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
        for (int i = 0; i < numTaskManagers; i++) {
            startTaskExecutor();
        }
    }

    @VisibleForTesting
    void startTaskExecutor() throws Exception {
        synchronized (lock) {
            final Configuration configuration = miniClusterConfiguration.getConfiguration();

            final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
                configuration,
                new ResourceID(UUID.randomUUID().toString()),
                taskManagerRpcServiceFactory.createRpcService(),
                haServices,
                heartbeatServices,
                metricRegistry,
                blobCacheService,
                useLocalCommunication(),
                taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));

            taskExecutor.start();
            taskManagers.add(taskExecutor);
        }
    }
}

在创建 HighAvailabilityServices 之后,就可以依次启动 TaskManager 了。TaskManagerRunner#startTaskManager 会创建一个 TaskExecutor, TaskExecutor 实现了 RpcEndpoint 接口。 TaskExecutor 需要和 ResourceManager 等组件进行通信,可以通过 HighAvailabilityServices 获得对应的服务地址。

在 TaskExecutor 启动的回调函数中,会启动一系列服务

class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
    public void onStart() throws Exception {
        try {
            //启动服务
            startTaskExecutorServices();
        } catch (Exception e) {
            final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
            onFatalError(exception);
            throw exception;
        }
        //超时交由 FatalErrorHandler 进行处理
        startRegistrationTimeout();
    }

    private void startTaskExecutorServices() throws Exception {
        try {
            // start by connecting to the ResourceManager
            resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

            // tell the task slot table who's responsible for the task slot actions
            taskSlotTable.start(new SlotActionsImpl());

            // start the job leader service
            jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

            fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
        } catch (Exception e) {
            handleStartTaskExecutorServicesException(e);
        }
    }

    /**
     * The listener for leader changes of the resource manager.
     */
    private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {

        @Override
        public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
            //获得 ResourceManager 的地址, 和 ResourceManager 建立连接
            runAsync(
                () -> notifyOfNewResourceManagerLeader(
                    leaderAddress,
                    ResourceManagerId.fromUuidOrNull(leaderSessionID)));
        }

        @Override
        public void handleError(Exception exception) {
            onFatalError(exception);
        }
    }

    private final class JobLeaderListenerImpl implements JobLeaderListener {

        @Override
        public void jobManagerGainedLeadership(
            final JobID jobId,
            final JobMasterGateway jobManagerGateway,
            final JMTMRegistrationSuccess registrationMessage) {
            //和 JobManager 建立连接
            runAsync(
                () ->
                    establishJobManagerConnection(
                        jobId,
                        jobManagerGateway,
                        registrationMessage));
        }

        @Override
        public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) {
            log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId);

            runAsync(() ->
                closeJobManagerConnection(
                    jobId,
                    new Exception("Job leader for job id " + jobId + " lost leadership.")));
        }

        @Override
        public void handleError(Throwable throwable) {
            onFatalError(throwable);
        }
    }
}

当 ResourceManagerLeaderListener 的监听被回调时,TaskExecutor 会试图建立和 ResourceManager 的连接,连接被封装为 TaskExecutorToResourceManagerConnection。一旦获取 ResourceManager 的 leader 被确定后,就可以获取到 ResourceManager 对应的 RpcGateway, 接下来就可以通过 RPC 调用发起 ResourceManager#registerTaskExecutor 注册流程。注册成功后,TaskExecutor 向 ResourceManager 报告其资源(主要是 slots)情况。

class TaskExecutor {
    private void establishResourceManagerConnection(
            ResourceManagerGateway resourceManagerGateway,
            ResourceID resourceManagerResourceId,
            InstanceID taskExecutorRegistrationId,
            ClusterInformation clusterInformation) {
        //发送SlotReport
        final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
            getResourceID(),
            taskExecutorRegistrationId,
            taskSlotTable.createSlotReport(getResourceID()),
            taskManagerConfiguration.getTimeout());
        //......
        //连接建立
        establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
            resourceManagerGateway,
            resourceManagerResourceId,
            taskExecutorRegistrationId);

        stopRegistrationTimeout();
    }
}

启动 DispatcherResourceManagerComponent

class MiniCluster {
    public void start() {
        //......
        dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(
                    configuration,
                    dispatcherResourceManagreComponentRpcServiceFactory,
                    haServices,
                    blobServer,
                    heartbeatServices,
                    metricRegistry,
                    metricQueryServiceRetriever,
                    new ShutDownFatalErrorHandler()
                ));
        //......
    }

    protected Collection<? extends DispatcherResourceManagerComponent<?>> createDispatcherResourceManagerComponents(
            Configuration configuration,
            RpcServiceFactory rpcServiceFactory,
            HighAvailabilityServices haServices,
            BlobServer blobServer,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            MetricQueryServiceRetriever metricQueryServiceRetriever,
            FatalErrorHandler fatalErrorHandler) throws Exception {
       //Session dispatcher, standalone resource manager
        SessionDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory();
        return Collections.singleton(
            dispatcherResourceManagerComponentFactory.create(
                configuration,
                rpcServiceFactory.createRpcService(),
                haServices,
                blobServer,
                heartbeatServices,
                metricRegistry,
                new MemoryArchivedExecutionGraphStore(),
                metricQueryServiceRetriever,
                fatalErrorHandler));
    }

    @Nonnull
    private SessionDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() {
        return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
    }
}

在 MiniCluster 模式下,会创建一个 SessionDispatcherResourceManagerComponent 对象。SessionDispatcherResourceManagerComponent 继承自 DispatcherResourceManagerComponent,用来启动 Dispatcher, ResourceManager,和 WebMonitorEndpoint, 这些组件都在同一个进程中运行。MiniCluster 模式下启动的是 StandaloneDispatcher 和 StandaloneResourceManager。

在工厂类创建 DispatcherResourceManagerComponent, 会启动 Dispatcher, ResourceManager 等组件:

public abstract class AbstractDispatcherResourceManagerComponentFactory<T extends Dispatcher, U extends RestfulGateway> implements DispatcherResourceManagerComponentFactory<T> {
    @Override
    public DispatcherResourceManagerComponent<T> create(
            Configuration configuration,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            BlobServer blobServer,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            ArchivedExecutionGraphStore archivedExecutionGraphStore,
            MetricQueryServiceRetriever metricQueryServiceRetriever,
            FatalErrorHandler fatalErrorHandler) throws Exception {
        //.......

        webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
                configuration,
                dispatcherGatewayRetriever,
                resourceManagerGatewayRetriever,
                blobServer,
                executor,
                metricFetcher,
                highAvailabilityServices.getWebMonitorLeaderElectionService(),
                fatalErrorHandler);

        log.debug("Starting Dispatcher REST endpoint.");
        webMonitorEndpoint.start();

        resourceManager = resourceManagerFactory.createResourceManager(
                configuration,
                ResourceID.generate(),
                rpcService,
                highAvailabilityServices,
                heartbeatServices,
                metricRegistry,
                fatalErrorHandler,
                new ClusterInformation(hostname, blobServer.getPort()),
                webMonitorEndpoint.getRestBaseUrl(),
                jobManagerMetricGroup);

        final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);

        dispatcher = dispatcherFactory.createDispatcher(
                configuration,
                rpcService,
                highAvailabilityServices,
                resourceManagerGatewayRetriever,
                blobServer,
                heartbeatServices,
                jobManagerMetricGroup,
                metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
                archivedExecutionGraphStore,
                fatalErrorHandler,
                historyServerArchivist);

        log.debug("Starting ResourceManager.");
        resourceManager.start();
        resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);

        log.debug("Starting Dispatcher.");
        dispatcher.start();
        dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);

        return createDispatcherResourceManagerComponent(
                dispatcher,
                resourceManager,
                dispatcherLeaderRetrievalService,
                resourceManagerRetrievalService,
                webMonitorEndpoint,
                jobManagerMetricGroup);
    }
}

在 ResourceManager 启动的回调函数中,会通过 HighAvailabilityServices 获取到选举服务,从而参与到选举之中。并启动 JobLeaderIdService,管理向当前 ResourceManager 注册的作业的 leader id。

abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
        extends FencedRpcEndpoint<ResourceManagerId>
        implements ResourceManagerGateway, LeaderContender {
        @Override
    public void onStart() throws Exception {
        try {
            startResourceManagerServices();
        } catch (Exception e) {
            final ResourceManagerException exception = new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()), e);
            onFatalError(exception);
            throw exception;
        }
    }

    private void startResourceManagerServices() throws Exception {
        try {
            leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();

            initialize();
            //参与选举
            leaderElectionService.start(this);
            jobLeaderIdService.start(new JobLeaderIdActionsImpl());

            registerSlotAndTaskExecutorMetrics();
        } catch (Exception e) {
            handleStartResourceManagerServicesException(e);
        }
    }
}

在 Dispatcher 启动的回调函数中,当前 Dispatcher 也会通过 LeaderElectionService 参与选举。

public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements
    DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener {
    //resource manager 的 gateway retriever,可以和 resource manager 通信
    private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;

    @Override
    public void onStart() throws Exception {
        try {
            startDispatcherServices();
        } catch (Exception e) {
            final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), e);
            onFatalError(exception);
            throw exception;
        }
    }

    private void startDispatcherServices() throws Exception {
        try {
            submittedJobGraphStore.start(this);
            leaderElectionService.start(this);

            registerDispatcherMetrics(jobManagerMetricGroup);
        } catch (Exception e) {
            handleStartDispatcherServicesException(e);
        }
    }
}

提交 JobGraph
通过 MiniCluster#executeJobBlocking 提交 JobGraph 并等待运行完成,提交JobGraph和请求运行结果的逻辑如下,都是通过 RPC 调用来实现:

class MiniCluster {
    public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
    //通过 Dispatcher 的 gateway retriever 获取 DispatcherGateway
        final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();

        // we have to allow queued scheduling in Flip-6 mode because we need to request slots
        // from the ResourceManager
        jobGraph.setAllowQueuedScheduling(true);

        final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);

        final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);

        //通过 RPC 调用向 Dispatcher 提交 JobGraph
        final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
            .thenCombine(
                dispatcherGatewayFuture,
                (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
            .thenCompose(Function.identity());

        return acknowledgeCompletableFuture.thenApply(
            (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
    }

    public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
        return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestJobResult(jobId, RpcUtils.INF_TIMEOUT));
    }
}

Dispatcher 在接收到提交 JobGraph 的请求后,会将提交的 JobGraph 保存在 SubmittedJobGraphStore 中(用于故障恢复),并为提交的 JobGraph 启动 JobManager:

class Dispatcher {
    private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
        final RpcService rpcService = getRpcService();

        //创建 JobManagerRunner
        final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
            CheckedSupplier.unchecked(() ->
                jobManagerRunnerFactory.createJobManagerRunner(
                    jobGraph,
                    configuration,
                    rpcService,
                    highAvailabilityServices,
                    heartbeatServices,
                    jobManagerSharedServices,
                    new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
                    fatalErrorHandler)),
            rpcService.getExecutor());

        return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
    }

    private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
        final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
        jobManagerRunner.getResultFuture().whenCompleteAsync(
            (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
                // check if we are still the active JobManagerRunner by checking the identity
                //noinspection ObjectEquality
                if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
                    if (archivedExecutionGraph != null) {
                        jobReachedGloballyTerminalState(archivedExecutionGraph);
                    } else {
                        final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);

                        if (strippedThrowable instanceof JobNotFinishedException) {
                            jobNotFinished(jobId);
                        } else {
                            jobMasterFailed(jobId, strippedThrowable);
                        }
                    }
                } else {
                    log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
                }
            }, getMainThreadExecutor());

        //启动JobManager
        jobManagerRunner.start();

        return jobManagerRunner;
    }
}

启动的 JobManagerRunner 会竞争 leader ,一旦被选举为 leader,就会启动一个 JobMaster。

public class JobManagerRunner implements LeaderContender, OnCompletionActions, AutoCloseableAsync {
    public void start() throws Exception {
        try {
            //竞争leader
            leaderElectionService.start(this);
        } catch (Exception e) {
            log.error("Could not start the JobManager because the leader election service did not start.", e);
            throw new Exception("Could not start the leader election service.", e);
        }
    }

    //被选举为 leader
    @Override
    public void grantLeadership(final UUID leaderSessionID) {
        synchronized (lock) {
            if (shutdown) {
                log.info("JobManagerRunner already shutdown.");
                return;
            }

            leadershipOperation = leadershipOperation.thenCompose(
                (ignored) -> {
                    synchronized (lock) {
                        return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
                    }
                });

            handleException(leadershipOperation, "Could not start the job manager.");
        }
    }

    private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
        final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();

        return jobSchedulingStatusFuture.thenCompose(
            jobSchedulingStatus -> {
                if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
                    return jobAlreadyDone();
                } else {
                    return startJobMaster(leaderSessionId);
                }
            });
    }

    private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
        log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
            jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());

        try {
            runningJobsRegistry.setJobRunning(jobGraph.getJobID());
        } catch (IOException e) {
            return FutureUtils.completedExceptionally(
                new FlinkException(
                    String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
                    e));
        }

        final CompletableFuture<Acknowledge> startFuture;
        try {
          //使用特定的 JobMasterId 启动 JobMaster
            startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
        } catch (Exception e) {
            return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
        }

        final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
        return startFuture.thenAcceptAsync(
            (Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
            executor);
    }
}

JobMaster 启动后会和 ResourceManager 建立连接,连接被封装为 ResourceManagerConnection。一旦连接建立之后,JobMaster 就可以通过 RPC 调用和 ResourceManager 进行通信了:

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
    private void startJobMasterServices() throws Exception {
        // start the slot pool make sure the slot pool now accepts messages for this leader
        slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
        scheduler.start(getMainThreadExecutor());

        //TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
        // try to reconnect to previously known leader
        reconnectToResourceManager(new FlinkException("Starting JobMaster component."));

        // job is ready to go, try to establish connection with resource manager
        //   - activate leader retrieval for the resource manager
        //   - on notification of the leader, the connection will be established and
        //     the slot pool will start requesting slots
        resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    }
}

在此之后就进入了任务调度执行的流程。

Standalone Cluster 模式的启动流程
在 Standalone 模式下,TaskManager 和 ResourceManager 等都在独立的进程中运行。Standalone Cluster 有两种启动方式, 即 standalonesession 模式和 standalonejob 方式,它们区别在于 Dispatcher 的实现方式不同。

JobManager 的启动
需要注意的一点是,这里我们所说的 JobManager 指的是包含 Dispatcher, ResouceManager 等组件的单一进程,而并非 Dispatcher 为执行 JobGraph 而启动的 JobManagerRunner。在 FLIP-6 的实现中,每个 JobGraph 的调度执行的实际上是由一个独立的 JobMaster 负责的。

standalonesession 方式启动的 JobManager 的入口类是 StandaloneSessionClusterEntrypoint, 继承自 SessionClusterEntrypoint;与此对应的是,以 standalonejob 方式启动 JobManager 的入口类是 StandaloneJobClusterEntryPoint,继承自 JobClusterEntrypoint。它们都由公共父类 ClusterEntrypoint 派生而来,区别在于生成的 DispatcherResourceManagerComponent 不同。

先来看下启动过程,实际上和 MiniCluster 模式下启动 DispatcherResourceManagerComponent 的过程类似:

abstract class ClusterEntrypoint {
    private void runCluster(Configuration configuration) throws Exception {
        synchronized (lock) {

            //初始化 RpcService, HighAvailabilityServices  等服务
            initializeServices(configuration);

            // write host information into configuration
            configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

            //生成 DispatcherResourceManagerComponentFactory,由具体子类实现
            final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

            //创建 DispatcherResourceManagerComponent, 启动 ResourceManager, Dispatcher
            clusterComponent = dispatcherResourceManagerComponentFactory.create(
                configuration,
                commonRpcService,
                haServices,
                blobServer,
                heartbeatServices,
                metricRegistry,
                archivedExecutionGraphStore,
                new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
                this);

            //一旦 DispatcherResourceManagerComponent#getShutDownFuture 完成,则关闭各项服务
            clusterComponent.getShutDownFuture().whenComplete(
                (ApplicationStatus applicationStatus, Throwable throwable) -> {
                    if (throwable != null) {
                        shutDownAsync(
                            ApplicationStatus.UNKNOWN,
                            ExceptionUtils.stringifyException(throwable),
                            false);
                    } else {
                        // This is the general shutdown path. If a separate more specific shutdown was
                        // already triggered, this will do nothing
                        shutDownAsync(
                            applicationStatus,
                            null,
                            true);
                    }
                });
        }
    }

}

这里生成的 HighAvailabilityServices 和 MiniCluster 模式下略有区别,由于各组件不在同一个进程中,因而需要从配置中加载配置:1)如果采用基于 Zookeeper 的 HA 模式,则创建 ZooKeeperHaServices,基于 zookeeper 获取 leader 通信地址 2)如果没有配置 HA,则创建 StandaloneHaServices, 并从配置文件中获取各组件的 RPC 地址信息。

在 StandaloneSessionClusterEntrypoint 中,生成 DispatcherResourceManagerComponent 的工厂类是 SessionDispatcherResourceManagerComponentFactory,该工厂类创建 SessionDispatcherResourceManagerComponent:由 SessionDispatcherFactory 创建 StandaloneDispatcher, 由 StandaloneResourceManagerFactory 创建 StandaloneResourceManager。

在 StandaloneJobClusterEntrypoint 中,生成 DispatcherResourceManagerComponent 的工厂类是 JobDispatcherResourceManagerComponentFactory,该厂类创建 JobDispatcherResourceManagerComponent:由 StandaloneResourceManagerFactory 创建 StandaloneResourceManager,由 JobDispatcherFactory 创建 MiniDispatcher。一个 MiniDispatcher 和一个 JobGraph 相绑定,一旦绑定的 JobGraph 执行结束,则关闭 MiniDispatcher,进而停止 JobManager 进程。

Dispatcher 和 ResourceManager 服务内部的启动流程则和 MiniCluster 中一致,这里不再赘述。

TaskManager 的启动
TaskManager 的启动入口在 TaskManagerRunner 中,它的启动流程和 MiniCluster 模式下基本一致,区别在于: 1)运行在独立的进程中, 2)HighAvailabilityServices 的创建要依赖配置文件获取。 TaskManagerRunner 会创建 TaskExecutor,TaskExecutor 通过 HighAvailabilityServices 获取 ResourceManager 的通信地址,并和 ResourceManager 建立连接。

Yarn Cluster 的启动流程
Yarn Cluster 的启动入口在 FlinkYarnSessionCli 中 :首先根据命令行参数创建 YarnClusterDescriptor,接着调用 YarnClusterDescriptor#deploySessionCluster 触发集群的部署。

实际启动的逻辑在 AbstractYarnClusterDescriptor#deployInternal 中,主要就是通过 YarnClient 向 yarn 集群提交应用,启动 ApplicationMaster:

abstract class AbstractYarnClusterDescriptor {
    protected ClusterClient<ApplicationId> deployInternal(
            ClusterSpecification clusterSpecification,
            String applicationName,
            String yarnClusterEntrypoint,
            @Nullable JobGraph jobGraph,
            boolean detached) throws Exception {
        //.......

        ApplicationReport report = startAppMaster(
            flinkConfiguration,
            applicationName,
            yarnClusterEntrypoint,
            jobGraph,
            yarnClient,
            yarnApplication,
            validClusterSpecification);

        //.....

        return createYarnClusterClient(
            this,
            validClusterSpecification.getNumberTaskManagers(),
            validClusterSpecification.getSlotsPerTaskManager(),
            report,
            flinkConfiguration,
            true);
    }
}

根据 sessioncluster 和 jobcluster 者两种启动的区别, 提交到 Yarn 中 ApplicationMatser 的入口类分别为 YarnSessionClusterEntrypoint 和 YarnJobClusterEntrypoint, 区别在于 Dispatcher 分别为 StandaloneDispatcher 和 MiniDispatcher。ResoureManager 的具体实现类为 YarnResourceManager。

和前述的 Standalone Cluster 不同, Yarn Cluster 模式下启动的 Flink 集群,其 TaskManager 是由 YarnResourceManager 根据 JobMaster 的请求动态向 Yarn 的 ResourceManager 进行申请的。在 JobMaster 向 ResourceManager 申请资源时,如果当前没有足够的资源分配,则 YarnResourceManager 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 TaskManager:

class YarnResourceManager {
    //申请container
    private void requestYarnContainer() {
        resourceManagerClient.addContainerRequest(getContainerRequest());

        // make sure we transmit the request fast and receive fast news of granted allocations
        resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);

        numPendingContainerRequests++;

        log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
            resource,
            numPendingContainerRequests);
    }

    //分配container的回调函数
    @Override
    public void onContainersAllocated(List<Container> containers) {
        runAsync(() -> {
            final Collection<AMRMClient.ContainerRequest> pendingRequests = getPendingRequests();
            final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();

            for (Container container : containers) {
                log.info(
                    "Received new container: {} - Remaining pending container requests: {}",
                    container.getId(),
                    numPendingContainerRequests);

                if (numPendingContainerRequests > 0) {
                    removeContainerRequest(pendingRequestsIterator.next());

                    final String containerIdStr = container.getId().toString();
                    final ResourceID resourceId = new ResourceID(containerIdStr);

                    workerNodeMap.put(resourceId, new YarnWorkerNode(container));

                    try {
                        // Context information used to start a TaskExecutor Java process
                        ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
                            container.getResource(),
                            containerIdStr,
                            container.getNodeId().getHost());
                        //启动 TaskManager
                        nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
                    } catch (Throwable t) {
                        log.error("Could not start TaskManager in container {}.", container.getId(), t);

                        // release the failed container
                        workerNodeMap.remove(resourceId);
                        resourceManagerClient.releaseAssignedContainer(container.getId());
                        // and ask for a new one
                        requestYarnContainerIfRequired();
                    }
                } else {
                    // return the excessive containers
                    log.info("Returning excess container {}.", container.getId());
                    resourceManagerClient.releaseAssignedContainer(container.getId());
                }
            }

            // if we are waiting for no further containers, we can go to the
            // regular heartbeat interval
            if (numPendingContainerRequests <= 0) {
                resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
            }
        });
    }

    //创建 TaskManager 的启动上下文
    private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
            throws Exception {
        // init the ContainerLaunchContext
        final String currDir = env.get(ApplicationConstants.Environment.PWD.key());

        final ContaineredTaskManagerParameters taskManagerParameters =
                ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numberOfTaskSlots);

        log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +
                "JVM direct memory limit {} MB",
                containerId,
                taskManagerParameters.taskManagerTotalMemoryMB(),
                taskManagerParameters.taskManagerHeapSizeMB(),
                taskManagerParameters.taskManagerDirectMemoryLimitMB());

        Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);

        log.debug("TaskManager configuration: {}", taskManagerConfig);

        ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
            flinkConfig,
            yarnConfig,
            env,
            taskManagerParameters,
            taskManagerConfig,
            currDir,
            YarnTaskExecutorRunner.class, //入口类
            log);

        // set a special environment variable to uniquely identify this container
        taskExecutorLaunchContext.getEnvironment()
                .put(ENV_FLINK_CONTAINER_ID, containerId);
        taskExecutorLaunchContext.getEnvironment()
                .put(ENV_FLINK_NODE_ID, host);
        return taskExecutorLaunchContext;
    }
}

小结
本文简单分析了 Flink 集群的启动流程,以及 ResourceManager、 TaskExecutor Dispatcher、 JobMaster 等不同组件之间的通信过程。

转发自:https://blog.jrwang.me/2019/flink-source-code-bootstarp/