一个 Flink Job 提交到集群中运行时,会被调度为不同的 Task。在前面的文章中,我们已经介绍了 Flink 如何根据用户的编写的程序生成调度用的执行图,如何为 Task 分配计算资源,以及 Task 之间如何进行数据交换。在这篇文章中,我们将跟踪一个 Task 的完整的生命周期,进一步加深对 Flink 执行过程的理解。

Task 和 OperatorChain
在前面介绍如何生成 JobGraph 的文章中,我们已经了解了 Flink 会尽可能把能够 chaining 到一起的算子串联在一起,形成 OperatorChain,对应一个 JobVertex。

两个 Operator 能够串联在一起的条件包括:

class StreamingJobGraphGenerator {
    public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

        StreamOperator<?> headOperator = upStreamVertex.getOperator();
        StreamOperator<?> outOperator = downStreamVertex.getOperator();

        return downStreamVertex.getInEdges().size() == 1 //下游节点只有一个输入
                && outOperator != null
                && headOperator != null
                && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //在同一个 slot 共享组中
                //上下游算子的 chainning 策略,要允许 chainning
                && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
                && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
                    headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
                //上下游算子之间的数据传输方式必须是FORWARD,而不能是REBALANCE等其它模式
                && (edge.getPartitioner() instanceof ForwardPartitioner)
                //上下游算子的并行度要一致
                && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
                // StreamExecutionEnvironment 配置允许 chainning
                && streamGraph.isChainingEnabled();
    }
}

JobGraph 在 JobManager 中进一步被转换为可供调度的并行化版本的 ExecutionGraph,其中 JobVertex 被展开为并行化版本的 ExecutionVertex,每一个 ExecutionVertex 对应 JobVertex 的一个并行子任务,它的每一次调度对应一个 Execution,即 TaskManager 中的一个 Task。所以,一个 Task 运行期间的主要处理逻辑对应一个 OperatorChain,这个 OperatorChain 可能包含多个 Operator,也可能只有一个 Operator。

OperatorChain 内部的数据传递
在前面的文章中,我们已经介绍过不同的 Task 之间如何通过网络栈进行数据交换,并对 Task 之间应对”反压”的机制进行了分析。现在我们知道,在一个 Task 内部同样可能包含多个不同的算子,这些算子处理数据的主要逻辑由用户提供的自定义函数(UDF)实现,那么上游算子处理之后的记录如何传递给下游算子呢?既然一个 Task 是一个独立的线程,多个算子的计算逻辑是依次执行的,那么很直观的想法就是直接通过函数调用的参数来数据。我们看下 Flink 内部是如何处理的。

首先,要看一下 Output 接口,Output 接口继承自 Collector 接口,用于接受 Operator 提交的数据:

public interface Output<T> extends Collector<T> {
    /**
     * Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
     * operators.
     *
     * <p>A watermark specifies that no element with a timestamp lower or equal to the watermark
     * timestamp will be emitted in the future.
     */
    void emitWatermark(Watermark mark);

    /**
     * Emits a record the side output identified by the given {@link OutputTag}.
     *
     * @param record The record to collect.
     */
    <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record);

    void emitLatencyMarker(LatencyMarker latencyMarker);
}

在 OperatorChain 内部还有一个 WatermarkGaugeExposingOutput 接口继承自 Output,它主要是额外提供了一个获取 watermark 值的方法:

public interface WatermarkGaugeExposingOutput<T> extends Output<T> {
    Gauge<Long> getWatermarkGauge();
}

每一个 StreamOperator 都有一个 Output 成员,用于收集当前算子处理完的记录,比如在 StreamMap 中:

public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }    
}

在 StreamFilter 中:

public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        if (userFunction.filter(element.getValue())) {
            output.collect(element);
        }
    }
}

在 StreamFlatMap 中:

public class StreamFlatMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {
    private transient TimestampedCollector<OUT> collector;

    public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
        super(flatMapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        userFunction.flatMap(element.getValue(), collector);
    }
}

那么 Output 又是怎么处理算子提交的记录的呢?这就需要进一步看一下 Output 的具体实现类。

OperatorChain 的内部类 ChainingOutput 实现了 WatermarkGaugeExposingOutput 接口,它持有一个 OneInputStreamOperator, 即 OperatorChain 中当前算子的下游算子。当 ChainingOutput 接收到当前算子提交的数据时,直接将调用下游算子的 processElement 方法:

class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
    protected final OneInputStreamOperator<T, ?> operator; //这是下游算子

    @Override
    public void collect(StreamRecord<T> record) {
        if (this.outputTag != null) {
            // we are only responsible for emitting to the main input
            return;
        }
        pushToOperator(record);
    }

    @Override
    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
        //如果有 OutputTag, 则要求 OutputTag 匹配才会转发记录
        if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
            return;
        }
        pushToOperator(record);
    }

    protected <X> void pushToOperator(StreamRecord<X> record) {
        try {
            // we know that the given outputTag matches our OutputTag so the record
            // must be of the type that our operator expects.
            @SuppressWarnings("unchecked")
            StreamRecord<T> castRecord = (StreamRecord<T>) record;
            numRecordsIn.inc();
            //直接调用下游算子的 processElement 方法
            operator.setKeyContextElement1(castRecord);
            operator.processElement(castRecord);
        }
        catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    }
}

通过在 ChainingOutput 中保存下游 StreamOperator 的引用,ChainingOutput 直接将对象的引用传递给下游算子。但是 ExecutionConfig 有一个配置项,即 objectReuse,在默认情况下会禁止对象重用。如果不允许对象重用,则不会使用 ChainingOutput,而是会使用 CopyingChainingOutput。顾名思义,它和 ChainingOutput 的区别在于,它会对记录进行拷贝后传递给下游算子。

BroadcastingOutputCollector 封装了一组 Output, 即 Output<StreamRecord<t>>[] outputs, 在接收到 StreamRecord 时,会将消息提交到所有的 内部所有的 Output 中。BroadcastingOutputCollector 主要用在当前算子有多个下游算子的情况下。与此对应的还有一个 CopyingBroadcastingOutputCollector。</t>

DirectedOutput 基于 OutputSelector<out>[] outputSelectors 选择要转发的目标 Output,主要是在 split/select 的情况下使用。与 DirectedOutput 对应的也有一个 CopyingDirectedOutput。</out>

对于位于 OperatorChain 末尾的算子,它处理过的记录需要被其它 Task 消费,因此它的记录需要被写入 ResultPartition 。因此,Flink 提供了 RecordWriterOutput,它也实现了 WatermarkGaugeExposingOutput, 但是它是通过 RecordWriter 输出接收到的消息记录。RecordWriter 是 ResultPartitionWriter 的一层包装,提供了将记录序列化到 buffer 中的功能。

Task 的生命周期
下面我们将进一步对 Task 运行的生命周期进行分析。

任务调度
当 JobGraph 被提交到 JobMaster 之后,首先会生成 ExecutionGraph,这是任务调度时使用的调度执行图。然后通过 ExecutionGraph#scheduleForExecution 方法开始调度各个子任务。

class ExecutionGraph {
    public void scheduleForExecution() throws JobException {
        assertRunningInJobMasterMainThread();
        final long currentGlobalModVersion = globalModVersion;
        if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
            final CompletableFuture<Void> newSchedulingFuture;
            switch (scheduleMode) {
                //调度任务
                case LAZY_FROM_SOURCES: //只运行 source,其它的子任务由source进行通知
                    newSchedulingFuture = scheduleLazy(slotProvider);
                    break;
                case EAGER: //所有的子任务都立即进行调度,这是 streaming 模式采用的方式
                    newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
                    break;
                default:
                    throw new JobException("Schedule mode is invalid.");
            }
            if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
                schedulingFuture = newSchedulingFuture;
                newSchedulingFuture.whenComplete(
                    (Void ignored, Throwable throwable) -> {
                        if (throwable != null && !(throwable instanceof CancellationException)) {
                            // only fail if the scheduling future was not canceled
                            failGlobal(ExceptionUtils.stripCompletionException(throwable));
                        }
                    });
            } else {
                newSchedulingFuture.cancel(false);
            }
        }
        else {
            throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
        }
    }
}

在调度执行的时候,首先所有的子任务都需要先向 Scheduler 申请 slot 资源(关于计算资源的管理可以参考前面的文章),当所有需要调度的子任务都分配到 slot 资源后,才正式开始调度任务:

class ExecutionGraph {
    private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) {
        assertRunningInJobMasterMainThread();
        checkState(state == JobStatus.RUNNING, "job is not running currently");

        // collecting all the slots may resize and fail in that operation without slots getting lost
        final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
        final Set<AllocationID> allPreviousAllocationIds =
            Collections.unmodifiableSet(computeAllPriorAllocationIdsIfRequiredByScheduling());
        // allocate the slots (obtain all their futures
        for (ExecutionJobVertex ejv : getVerticesTopologically()) {
            // these calls are not blocking, they only return futures
            Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
                slotProvider,
                queued,
                LocationPreferenceConstraint.ALL,
                allPreviousAllocationIds,
                timeout);
            allAllocationFutures.addAll(allocationFutures);
        }

        // this future is complete once all slot futures are complete.
        // the future fails once one slot future fails.
        // 等待所有需要调度的子任务都分配到资源
        final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

        return allAllocationsFuture.thenAccept(
            (Collection<Execution> executionsToDeploy) -> {
                for (Execution execution : executionsToDeploy) {
                    try {
                        //启动 Execution
                        execution.deploy();
                    } catch (Throwable t) {
                        throw new CompletionException(
                            new FlinkException(
                                String.format("Could not deploy execution %s.", execution),
                                t));
                    }
                }
            })
            .exceptionally(......)
    }
}

Execution 是 ExecutionVertex 的一次执行,在调度的时候会先生成对任务的描述 TaskDeploymentDescription, TaskDeploymentDescription 包含了对输入的描述 InputGateDeploymentDescriptor, 对输出的描述 ResultPartitionDeploymentDescriptor,以及保存了这个 Task 中运行的所有算子运行时信息的 TaskInformation 和 JobInformation。生成了 TaskDeploymentDescription 通过 RPC 调用提交给 TaskExecutor 执行。

class Execution {
    public void deploy() throws JobException {
        ......
        try {
            // race double check, did we fail/cancel and do we need to release the slot?
            if (this.state != DEPLOYING) {
                slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING."));
                return;
            }

            final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
                attemptId,
                slot,
                taskRestore,
                attemptNumber);

            // null taskRestore to let it be GC'ed
            taskRestore = null;

            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

            final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
                vertex.getExecutionGraph().getJobMasterMainThreadExecutor();

            // We run the submission in the future executor so that the serialization of large TDDs does not block
            // the main thread and sync back to the main thread once submission is completed.
            CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
                .thenCompose(Function.identity())
                .whenCompleteAsync(
                    (ack, failure) -> {
                        // only respond to the failure case
                        if (failure != null) {
                            if (failure instanceof TimeoutException) {
                                String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';

                                markFailed(new Exception(
                                    "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
                                        + ") not responding after a rpcTimeout of " + rpcTimeout, failure));
                            } else {
                                markFailed(failure);
                            }
                        }
                    },
                    jobMasterMainThreadExecutor);
        }
        catch (Throwable t) {
            markFailed(t);
            ExceptionUtils.rethrow(t);
        }
    }

}

启动 Task 线程
当 TaskDeploymentDescription 被提交到 TaskExecutor 后,TaskExecutor 会据此创建一个 Task 对象,并在构造函数中完成一些初始化操作,如根据 InputGateDeploymentDescriptor 创建 InputGate,根据 ResultPartitionDeploymentDescriptor 创建 ResultPartition。

Task 实现了 Runnable 接口,每个 Task 都会在一个单独的线程中运行,Task 的启动流程包括:

首先完成状态的初始化 ExecutionState.CREATED -> ExecutionState.DEPLOYING
任务装配
创建一个用户加载用户代码的类加载器
通过反序列化得到 ExecutionConfig,从 ExecutionConfig 中可以的到所有算子相关的信息
向网络栈中注册 Task,为 ResultPartition 和 InputGate 分配缓冲池
初始化用户代码,通过反射得到 AbstractInvokable(StreamTask) 实例
执行任务
状态转换 ExecutionState.DEPLOYING -> ExecutionState.RUNNING
调用 AbstractInvokable.invoke() 启动任务
如下:

class Task {
    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
        // ----------------------------
        //  Initial State transition
        // ----------------------------
        while (true) {
            ExecutionState current = this.executionState;
            if (current == ExecutionState.CREATED) {
                if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
                    // success, we can start our work
                    break;
                }
            }
            ...... //handle other state
        }

        // all resource acquisitions and registrations from here on
        // need to be undone in the end
        Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
        AbstractInvokable invokable = null;

        try {
            ......
            userCodeClassLoader = createUserCodeClassloader();
            final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);

            network.registerTask(this);
            for (ResultPartition partition : producedPartitions) {
                taskEventDispatcher.registerPartition(partition.getPartitionId());
            }

            Environment env = new RuntimeEnvironment(.......)
            // now load and instantiate the task's invokable code
            // nameOfInvokableClass 是 JobVertex 的 invokableClassName,
            // 每一个 StreamNode 在添加的时候都会有一个 jobVertexClass 属性
            // 对于一个 operator chain,就是 head operator 对应的 invokableClassName,见 StreamingJobGraphGenerator.createChain
            // 通过反射创建 AbstractInvokable 对象
            // 对于 Stream 任务而言,就是 StreamTask 的子类,SourceStreamTask、OneInputStreamTask、TwoInputStreamTask 等
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            // 运行
            invokable.invoke();

            // make sure, we enter the catch block if the task leaves the invoke() method due
            // to the fact that it has been canceled
            if (isCanceledOrFailed()) {
                throw new CancelTaskException();
            }

            // 正常结束
            // finish the produced partitions. if this fails, we consider the execution failed.
            for (ResultPartition partition : producedPartitions) {
                if (partition != null) {
                    partition.finish();
                }
            }
            // try to mark the task as finished
            // if that fails, the task was canceled/failed in the meantime
            if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
                throw new CancelTaskException();
            }
        } catch (Throwable t) {
            ......
        } finally {
            ......
        }
    }
}

StreamTask
AbstractInvokable 是 Task 执行的主要逻辑,也是所有被执行的任务的基类,包括 Streaming 模式和 Batch 模式。在 Streaming 模式下,所有任务都继承自 StreamTask,包括 StreamTask 的子类包括 SourceStreamTask, OneInputStreamTask, TwoInputStreamTask, 以及用于迭代模式下的 StreamIterationHead 和 StreamIterationTail。

每一个 StreamNode 在添加到 StreamGraph 的时候都会有一个关联的 jobVertexClass 属性,这个属性就是该 StreamNode 对应的 StreamTask 类型;对于一个 OperatorChain 而言,它所对应的 StreamTask 就是其 head operator 对应的 StreamTask。

StreamTask 完整的生命周期包括:

创建状态存储后端,为 OperatorChain 中的所有算子提供状态
加载 OperatorChain 中的所有算子
所有的 operator 调用 setup
task 相关的初始化操作
所有 operator 调用 initializeState 初始化状态
所有的 operator 调用 open
run 方法循环处理数据
所有 operator 调用 close
所有 operator 调用 dispose
通用的 cleanup 操作
task 相关的 cleanup 操作

主要代码如下:

abstract class StreamTask {
    @Override
    public final void invoke() throws Exception {
        boolean disposed = false;
        try {
            // -------- Initialize ---------
            //创建状态存储后端
            stateBackend = createStateBackend();
            checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());
            // if the clock is not already set, then assign a default TimeServiceProvider
            if (timerService == null) {
                ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
                    "Time Trigger for " + getName(), getUserCodeClassLoader());
                timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
            }

            //创建 OperatorChain,会加载每一个 operator,并调用 setup 方法
            operatorChain = new OperatorChain<>(this, recordWriters);
            headOperator = operatorChain.getHeadOperator();

            // 和具体 StreamTask 子类相关的初始化操作
            init();

            // save the work of reloading state, etc, if the task is already canceled
            if (canceled) {
                throw new CancelTaskException();
            }

            // we need to make sure that any triggers scheduled in open() cannot be
            // executed before all operators are opened
            synchronized (lock) {
                // both the following operations are protected by the lock
                // so that we avoid race conditions in the case that initializeState()
                // registers a timer, that fires before the open() is called.
                //状态初始化
                initializeState();
                //open
                openAllOperators();
            }

            // final check to exit early before starting to run
            if (canceled) {
                throw new CancelTaskException();
            }

            // let the task do its work
            isRunning = true;
            //开始处理数据,这里通常是个循环
            run();

            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }

            synchronized (lock) {
                // this is part of the main logic, so if this fails, the task is considered failed
                closeAllOperators();
                // make sure no new timers can come
                timerService.quiesce();
                // only set the StreamTask to not running after all operators have been closed!
                // See FLINK-7430
                isRunning = false;
            }

            // make sure all timers finish
            timerService.awaitPendingAfterQuiesce();
            // make sure all buffered data is flushed
            operatorChain.flushOutputs();
            // make an attempt to dispose the operators such that failures in the dispose call
            // still let the computation fail
            tryDisposeAllOperators();
            disposed = true;
        } finally {
            //cleanup
        }

    }
}

OperatorChain
前面已经提了很多次 OperatorChain,下面我们就看下 OperatorChain 是如何加载的,主要逻辑都在 OperatorChain 的构造函数中:

public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
    public OperatorChain(
            StreamTask<OUT, OP> containingTask,
            List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters) {
        final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
        final StreamConfig configuration = containingTask.getConfiguration();

        //head operator
        headOperator = configuration.getStreamOperator(userCodeClassloader);

        //OperatorChain 内部所有的 operator 的配置
        // we read the chained configs, and the order of record writer registrations by output name
        Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);

        // create the final output stream writers
        // we iterate through all the out edges from this job vertex and create a stream output
        // 所有的输出边,这是对外输出,不包含内部 operator 之间的的数据传输
        List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap = new HashMap<>(outEdgesInOrder.size());
        this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
        // from here on, we need to make sure that the output writers are shut down again on failure
        boolean success = false;
        try {
            //对外输出的 RecordWriterOutput
            for (int i = 0; i < outEdgesInOrder.size(); i++) {
                StreamEdge outEdge = outEdgesInOrder.get(i);

                RecordWriterOutput<?> streamOutput = createStreamOutput(
                    recordWriters.get(i),
                    outEdge,
                    chainedConfigs.get(outEdge.getSourceId()),
                    containingTask.getEnvironment());

                this.streamOutputs[i] = streamOutput;
                streamOutputMap.put(outEdge, streamOutput);
            }

            // we create the chain of operators and grab the collector that leads into the chain
            List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
            //这里会递归调用,为 OperatorChain 内部的所有的 Operator 都创建 output
            this.chainEntryPoint = createOutputCollector(
                containingTask,
                configuration,
                chainedConfigs,
                userCodeClassloader,
                streamOutputMap,
                allOps);

            if (headOperator != null) {
                //chainEntryPoint 是 headOperator 的 output
                WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint();
                //header operator 调用 setup 方法
                headOperator.setup(containingTask, configuration, output);

                headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());
            }

            // add head operator to end of chain
            allOps.add(headOperator);
            this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
            success = true;
        }
        finally {
            // make sure we clean up after ourselves in case of a failure after acquiring
            // the first resources
            if (!success) {
                for (RecordWriterOutput<?> output : this.streamOutputs) {
                    if (output != null) {
                        output.close();
                    }
                }
            }
        }
    }

    //创建 output collector
    private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
            StreamTask<?, ?> containingTask,
            StreamConfig operatorConfig,
            Map<Integer, StreamConfig> chainedConfigs,
            ClassLoader userCodeClassloader,
            Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
            List<StreamOperator<?>> allOperators) {
        List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);
        // create collectors for the network outputs
        for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
            @SuppressWarnings("unchecked")
            RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);

            allOutputs.add(new Tuple2<>(output, outputEdge));
        }

        // Create collectors for the chained outputs
        // OperatorChain 内部 Operator 之间的边
        for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
            int outputId = outputEdge.getTargetId();
            StreamConfig chainedOpConfig = chainedConfigs.get(outputId);

            //创建当前节点的下游节点,并返回当前节点的 output
            //createChainedOperator 在创建 operator 的时候,会调用 createOutputCollector 为 operator 创建 output
            //随意会形成递归调用关系,所有的 operator 以及它们的 output 都会被创建出来
            WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator(
                containingTask,
                chainedOpConfig,
                chainedConfigs,
                userCodeClassloader,
                streamOutputs,
                allOperators,
                outputEdge.getOutputTag());
            allOutputs.add(new Tuple2<>(output, outputEdge));
        }

        // if there are multiple outputs, or the outputs are directed, we need to
        // wrap them as one output
        List<OutputSelector<T>> selectors = operatorConfig.getOutputSelectors(userCodeClassloader);

        if (selectors == null || selectors.isEmpty()) {
            // simple path, no selector necessary
            //只有一个输出
            if (allOutputs.size() == 1) {
                return allOutputs.get(0).f0;
            }
            else {
                //不止有一个输出,需要使用 BroadcastingOutputCollector 进行封装
                // send to N outputs. Note that this includes the special case
                // of sending to zero outputs
                @SuppressWarnings({"unchecked", "rawtypes"})
                Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
                for (int i = 0; i < allOutputs.size(); i++) {
                    asArray[i] = allOutputs.get(i).f0;
                }

                // This is the inverse of creating the normal ChainingOutput.
                // If the chaining output does not copy we need to copy in the broadcast output,
                // otherwise multi-chaining would not work correctly.
                if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
                    return new CopyingBroadcastingOutputCollector<>(asArray, this);
                } else  {
                    return new BroadcastingOutputCollector<>(asArray, this);
                }
            }
        }
        else {
            // selector present, more complex routing necessary
            // 存在 selector,用 DirectedOutput 进行封装
            // This is the inverse of creating the normal ChainingOutput.
            // If the chaining output does not copy we need to copy in the broadcast output,
            // otherwise multi-chaining would not work correctly.
            if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
                return new CopyingDirectedOutput<>(selectors, allOutputs);
            } else {
                return new DirectedOutput<>(selectors, allOutputs);
            }
        }
    }

    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
            StreamTask<?, ?> containingTask,
            StreamConfig operatorConfig,
            Map<Integer, StreamConfig> chainedConfigs,
            ClassLoader userCodeClassloader,
            Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
            List<StreamOperator<?>> allOperators,
            OutputTag<IN> outputTag) {
        // create the output that the operator writes to first. this may recursively create more operators
        // 为当前 Operator 创建 output
        WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(
            containingTask,
            operatorConfig,
            chainedConfigs,
            userCodeClassloader,
            streamOutputs,
            allOperators);

        // now create the operator and give it the output collector to write its output to
        //从 StreamConfig 中取出当前 Operator
        OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);

        chainedOperator.setup(containingTask, operatorConfig, chainedOperatorOutput);

        allOperators.add(chainedOperator);

        //这里是在为当前 operator 前向的 operator 创建 output
        //所以当前 operator 被传递给前一个 operator 的 output,这样前一个 operator 的输出就可以直接调用当前 operator
        WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
        }
        else {
            TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
            currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
        }

        // wrap watermark gauges since registered metrics must be unique
        chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);
        chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);

        return currentOperatorOutput;
    }
}

这里的主要逻辑其实就是递归地创建 OpeartorChain 内部所有的 StreamOperator,并为每一个 StreamOperator 创建 Output collecto,结合本文上面对 Output 的介绍应该就很容易理解了。

几类不同的 StreamTask
StreamTask 的 init 方法和 run 方法等都是在子类中自行实现的。下面我们先主要看先 SourceStramTask, OneInputStreamTask 和 TwoInputStreamTask。对于在迭代场景下使用的 StreamIterationHead 和 StreamIterationTail 这里先不加以介绍了,留在后面分析迭代任务的实现时再进行说明。

SourceStreamTask
顾名思义,SourceStreamTask 负责为下游任务生成数据,因此它没有输入,只负责对外输出记录。

public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
    extends StreamTask<OUT, OP> {
    protected void init() {
        // we check if the source is actually inducing the checkpoints, rather
        // than the trigger
        SourceFunction<?> source = headOperator.getUserFunction();
        // 如果用户提供的 SourceFunction 是 ExternallyInducedSource,则需要创建一个 CheckpointTrigger 对象提供给 ExternallyInducedSource
        if (source instanceof ExternallyInducedSource) {
            externallyInducedCheckpoints = true;

            ExternallyInducedSource.CheckpointTrigger triggerHook = new ExternallyInducedSource.CheckpointTrigger() {
                @Override
                public void triggerCheckpoint(long checkpointId) throws FlinkException {
                    // TODO - we need to see how to derive those. We should probably not encode this in the
                    // TODO -   source's trigger message, but do a handshake in this task between the trigger
                    // TODO -   message from the master, and the source's trigger notification
                    final CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
                    final long timestamp = System.currentTimeMillis();

                    final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);

                    try {
                        SourceStreamTask.super.triggerCheckpoint(checkpointMetaData, checkpointOptions, false);
                    }
                    catch (RuntimeException | FlinkException e) {
                        throw e;
                    }
                    catch (Exception e) {
                        throw new FlinkException(e.getMessage(), e);
                    }
                }
            };

            ((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);
        }
    }

    @Override
    protected void run() throws Exception {
        // 对source而言,就是调用 head operator 的 run 方法
        //head operator 是一个 StreamSource,最终会调用用户提供的 SourceFunction 的 run 方法,一般是一个循环
        //head operator 通过 Output 将数据传递给下游的算子
        headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
    }
}

OneInputStreamTask
对于 OneInputStreamTask,它的主要执行逻辑就是不断循环调用 StreamInputProcessor.processInpt() 方法。

StreamInputProcessor 从缓冲区中读取记录或 watermark 等消息,然后调用 streamOperator.processElement(record) 交给 head operator 进行处理,并依次将处理结果交给下游算子。

public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
    public void init() throws Exception {
        //创建一个 StreamInputProcessor
        StreamConfig configuration = getConfiguration();

        TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
        int numberOfInputs = configuration.getNumberOfInputs();

        if (numberOfInputs > 0) {
            InputGate[] inputGates = getEnvironment().getAllInputGates();

            inputProcessor = new StreamInputProcessor<>(
                    inputGates,
                    inSerializer,
                    this,
                    configuration.getCheckpointMode(),
                    getCheckpointLock(),
                    getEnvironment().getIOManager(),
                    getEnvironment().getTaskManagerInfo().getConfiguration(),
                    getStreamStatusMaintainer(),
                    this.headOperator,
                    getEnvironment().getMetricGroup().getIOMetricGroup(),
                    inputWatermarkGauge);
        }
        headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
        // wrap watermark gauge since registered metrics must be unique
        getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
    }

    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
        //循环调用 StreamInputProcessor.processInput 方法
        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }
}

public class StreamInputProcessor<IN> {
    public boolean processInput() throws Exception {
        if (isFinished) {
            return false;
        }
        if (numRecordsIn == null) {
            try {
                numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                numRecordsIn = new SimpleCounter();
            }
        }

        //这里虽然是一个while循环,但其实只会处理一条记录,因为单条记录可能需要多个 buffer 传输
        while (true) {
            if (currentRecordDeserializer != null) {
                //反序列化
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

                if (result.isBufferConsumed()) {
                    //如果buffer里面的数据已经被消费了,则归还buffer
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    //得到了一条完整的记录
                    StreamElement recordOrMark = deserializationDelegate.getInstance();

                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                        continue;
                    } else if (recordOrMark.isLatencyMarker()) {
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // now we can do the actual processing
                        //是一条正常的记录,调用 operator 的处理方法,最终会调用用户自定义的函数的处理方法
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        //处理完一条记录,结束本次调用
                        return true;
                    }
                }
            }

            //获取下一个 BufferOrEvent,这是个阻塞的调用
            final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
            if (bufferOrEvent != null) {
                if (bufferOrEvent.isBuffer()) {
                    //如果是Buffer,要确定是哪个 channel 的,然后用对应 channel 的反序列化器解析
                    //不同channel在反序列化的时候不能混淆
                    currentChannel = bufferOrEvent.getChannelIndex();
                    currentRecordDeserializer = recordDeserializers[currentChannel];
                    currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                }
                else {
                    // Event received
                    final AbstractEvent event = bufferOrEvent.getEvent();
                    if (event.getClass() != EndOfPartitionEvent.class) {
                        throw new IOException("Unexpected event: " + event);
                    }
                }
            }
            else {
                //表明上游结束了
                isFinished = true;
                if (!barrierHandler.isEmpty()) {
                    throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
                }
                return false;
            }
        }
    }
}

TwoInputStreamTask
TwoInputStreamTask 和 OneInputStreamTask 的处理逻辑类似,只是要对两个上游的输入分别调用 TwoInputStreamOperator.processElement1 和 TwoInputStreamOperator.processElement2 进行处理。这里就不再赘述了。

小节
Task 是 Flink 任务调度的最小单位。本文简要地介绍了 Task 的生命周期以及数据的处理的基本模式。通过 StreamTask -> StreamOperator -> User-define-function 这样的封装,用户自定义的数据处理逻辑最终得以调度执行。

转发自:https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#operatorchain-%E5%86%85%E9%83%A8%E7%9A%84%E6%95%B0%E6%8D%AE%E4%BC%A0%E9%80%92