Scheduler 模块详解

4.1 模块概述

4.1.1 整体架构

  • 任务调度模块主要包含两大部分:
    1.DAGScheduler
    2.TaskScheduler
    他们负责将用户提交的计算任务按照DAG划分为不同的阶段并且将不同阶段的计算任务提交到集群进行最终的计算。

    • 整个过程用流程图表示(从上向下传递):
      RDD Objects
      build operator DAG,将DAG传给DAGScheduler

      • RDD Objects可以理解为用户实际代码中创建的RDD,这些代码逻辑上组成了一个DAG。
        Spark的易用性就体现在这部分,他提供了基于RDD的多种转换和动作,使Spark的用户可以在基本不增加用户学习成本的前提下使用比较复杂的拓扑来实现策略。
        用户在实际编程的时候可以认为他处理的数据都可以存到内存中,而无需关心最终在急群众运行的任务是否整个数据都可以装在都内存中,或者究竟需要多少节点参与运算。
    • DAGScheduler
      split graph into stages of tasks
      submit each as ready
      DAGScheduler主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage(阶段),其中每个Stage由可以并发执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数据。
      而且DAG在不同的资源管理框架(Standalone、Mesos、YARN、Local、EC2)下的实现是相同的。
      将Tasks传给TaskScheduler。

    • TaskScheduler
      launch tasks via cluster manager
      retry failed or straggling tasks
      TaskScheduler通过Cluster Manager在集群中的某个Worker的Executor上启动任务。在Executor中运行的任务,如果缓存中没有计算结果,那么就需要开始计算,同时,计算的结果会回传到Driver或者保存在本地。
      在不同的资源管理框架下,TaskScheduler的实现方式有差别,最重要的实现就是org.apache.spark.scheduler.TaskSchedulerImpl。
      对于Local、Standalone和Mesos来说,他们的TaskScheduler就是TaskSchedulerImpl;对于YARN Cluster和YARN Client的TaskScheduler的实现是继承自TaskSchedulerImpl。

    • Worker
      execute tasks
      store and serve blocks

4.1.2 Scheduler 的实现概述

  • 任务调度模块涉及的最重要的三个类是:

    • 1.org.apache.spark.scheduler.DAGScheduler

      • 这个就是DAGScheduler的具体实现。
    • 2.org.apache.spark.scheduler.SchedulerBackend

      • org.apache.spark.scheduler.SchedulerBackend是一个trait(类似于java 的接口和抽象类),作用是分配当前可用的资源,具体就是向当前等待分配计算资源的Task分配计算资源(即Executor),并且在分配的Executor上启动Task,完成计算的调度过程。
        他使用reviveOffers完成上述的任务调度。reviveOffers可以说是他最重要的实现。org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend是SchedulerBackend的一个实现,同时YARN、Standalone、Mesos都是基于他加入了自身特有的逻辑。
    • 3.org.apache.spark.scheduler.TaskScheduler

      • org.apache.spark.scheduler.TaskScheduler也是一个trait,它的作用是为创建它的SparkContext调度任务,即从DAGScheduler接收不同Stage的任务,并且向集群提交这些任务,并为执行特别慢的任务启动备份任务。
        TaskScheduler是以后实现多种任务调度器的基础,不过当前的org.apache.spark.scheduler.TaskSchedulerImpl是唯一实现。
        每个SchedulerBackend都会对应一个TaskScheduler,而他们都被SparkContext持有。

        • TaskSchedulerImpl会在以下几种场景下调用org.apache.spark.scheduler.SchedulerBackend#reviveOffers:

          • 1.有新任务提交时
          • 2.有任务执行失败时
          • 3.计算节点(即Executor)不可用时
          • 4.某些任务执行过慢而需要为它重新分配资源时
  • 任务调度的逻辑流程(由上到下)

    • 生成Job
    • DAGScheduler划分Stage
    • DAGScheduler提交Stage
    • DAGScheduler为需要计算的Partition生成TaskSet
    • TaskScheduler提交计算任务
    • 调度器SchedulableBuilder调度任务
    • TaskScheduler为任务分配资源
    • SchedulerBackend将任务提交到Executor上运行

4.2 DAGScheduler 实现详解

4.2.1 DAGScheduler 的创建

  • TaskScheduler和DAGScheduler都是在SparkContext创建的时候创建的。其中TaskScheduler是通过org.apache.spark.SparkContext#createTaskScheduler创建的,而DAGScheduler是直接调用它的构造函数创建。只不过,DAGScheduler保存了TaskScheduler的引用,因此需要在TaskScheduler创建之后创建。

4.2.2 Job 的提交

  • 用户提交的Job最终会调用DAGScheduler的runJob,runjob又会调用submitJob。

    • submitJob首先会为Job生成一个Job ID,并且生成一个JobWaiter的实例来监听Job的执行状况。
    • JobWaiter会监听Job的执行状态,而Job是由多个Task组成的,因此只有Job的所有Task都成功完成,Job才会标记成功;任意一个Task失败都会标记Job失败。

4.2.3 Stage的划分

  • org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted首先会根据RDD创建finalStage。
    finalStage,顾名思义,就是最后的那个Stage。然后创建ActiveJob之后提交计算任务。

  • 用户提交的计算任务是一个由RDD组成的DAG,如果RDD在转换的时候需要做Shuffle,那么这个Shuffle的过程就将这个DAG分成了不同的阶段(Stage)。由于Shuffle的存在,不同的Stage是不能并行计算的,因为后面Stage的计算需要前面Stage的Shuffle 的结果。而一个Stage由一组完全独立的计算任务(即Task)组成,每个Task的运算逻辑完全相同,只不过每个Task都会处理其所对应的Partition。
    其中Partition的数量和Task的数量是一致的,即一个Partition会被该Stage的一个Task处理。

  • 1.划分依据

    • 对于窄依赖,由于RDD每个Partition依赖固定数量的parent RDD的Partition,因此可以通过一个Stage来处理这些Partition,而且这些Partition相互独立,所以这些Task可以并行执行。
    • 对于宽依赖,由于需要Shuffle,因此只有所有的parent RDD的Partition Shuffle完成,新的Partition才会形成,这样接下来的Task才可以继续处理。
      因此,宽依赖可以认为是DAG的分界线,或者说Spark根据宽依赖将Job划分成不同的阶段(Stage)。
  • 2.划分过程

    • Page50
    • Stage的划分是从最后一个RDD开始的,也就是出发Action的那个RDD。假设RDD_G = RDD_B.leftOuterJoin(RDD_F) ,那么划分就是从G开始。
      RDD_G依赖两个RDD,一个是RDD_B,另一个是RDD_F。其中先处理RDD_B还是RDD_F是随机的。这里我们首先处理RDD_B,由于leftOuterJoin操作以前面RDD为基准,所以RDD_B和RDD_G之间是窄依赖,和RDD_F是宽依赖。
      因此RDD_B和RDD_G可以划分到一个Stage中(Stage 3)。
      再处理RDD_F,由于这个依赖是宽依赖,所以RDD_F和RDD_G被划分到不同的Stage(Stage 2 和Stage 3),其中RDD_F所在的Stage 2是RDD_G所在的Stage 3 的parent Stage。
    • 接下来处理RDD_B的依赖,由于RDD_B = RDD_A.groupByKey(),所以RDD_A与RDD_B之间是宽依赖,他们属于不同的Stage(Stage1 和Stage3)。
      这样,RDD_B和RDD_G同属于Stage 3,而这个Stage直接的parent Stage有两个,就是RDD_A和RDD_F分别属于的两个Stage(即Stage 1和Stage 2)。
    • 最后,这个DAG被划分成三个Stage,即RDD_A所在的Stage 1,RDD_C、RDD_D、RDD_E和RDD_F所在的Stage 2(RDD_F = RDD_C.map(func).union(RDD_E) )和RDD_B及RDD_G所在的Stage 3.
      其中Stage 1和 Stage 2是相互独立的,可以并发执行;Stage 3依赖于Stage 1和 Stage 2,只有这两个Stage完成计算,他才可以开始计算。
    • 那么Stage是如何计算的?以Stage 1为例,由于RDD_A有三个Partition,因此他会生成三个org.apache.spark.scheduler.ShuffleMapTask,这些Task会将结果写入到三个Partition,实现细节在第七章。
      Stage 2同样也是由ShuffleMapTask组成,但是Stage 3是由三个org.apache.spark.scheduler.ResultTask组成的。
  • 3.实现细节

    • handleJobSubmitted通过调用org.apache.spark.scheduler.DAGScheduler#newStage来创建finalStage,即上文的Stage 3。

4.2.4 任务的生成

4.3 任务调度实现详解

每个TaskScheduler都对应一个SchedulerBackend。

  • TaskScheduler负责Application的不同的Job之间的调度,在Task执行失败时启动重试机制,并且为执行速度慢的Task启动备份的任务。
  • SchedulerBackend负责与Cluster Manager交互,取得该Application分配到的资源,并且将这些资源传给TaskScheduler,由TaskScheduler为Task最终分配计算资源。

4.3.1 TaskScheduler 的创建

  • TaskScheduler和DAGScheduler都是在SparkContext创建的时候创建的。
  • org.apache.spark.SparkContext#createTaskScheduler会根据传入的Master的URL的规则判断集群的部署方式(或者说资源管理方式),比如是Standalone、Mesos、YARN或者Local。
    根据不同的部署方式,生成不同的TaskScheduler和SchedulerBackend。
  • org.apache.spark.scheduler.SchedulerBackend是一个trait,作用是分配当前可用的资源,具体就是向当前等待分配计算资源的Task分配计算资源(Executor),并且在分配的Executor上启动Task,完成计算的调度过程。
    他使用reviveOffers完成上述的任务调度。reviveOffers可以说是它的最重要的实现。

4.3.2 Task的提交概述

  • DAGScheduler完成了对Stage的划分后,会按照顺序逐个将Stage通过调用下面的函数提交到TaskScheduler:
    org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks
    由submitTasks开始Task级别的资源调度。最终,这些Task会被分配Executor,运行在Worker上的Executor完成任务的最终执行。这个过程详细的调用堆栈如下:

    • 1.org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks

      • 主要是将保存这组任务的TaskSet加入到一个TaskSetManager中。TaskSetManager会根据数据的就近原则(locality aware)为Task分配计算资源,监控Task的执行状态并采取必要的措施,比如失败重试,慢任务的推测性执行。
    • 2.org.apache.spark.scheduler.SchedulableBuilder#addTaskSetManager

      • schedulableBuilder是Application级别的调度器,支持FIFO(默认)和FAIR。
        schedulableBuilder会确定TaskSetManager的调度顺序,然后由TaskSetManager根据就近原则来确定Task运行在哪个Executor上。
    • 3.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers

    • 4.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#makeOffers

    • 5.org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers

      • 相应CoarseGrainedSchedulerBackend的资源调度请求,为每个Task具体分配资源。它的输入是一个Executor的列表,输出是org.apache.spark.scheduler.TaskDescription的二维数组。TaskDescription包含了TaskID、Executor ID和Task执行环境的依赖信息等。
    • 6.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#launchTasks

      • 将上个调用得到的tasks发送到Executor
    • 7.org.apache.spark.executor.CoarseGrainedExecutorBackend.receiveWithLogging#launchTask

    • 8.org.apache.spark.executor.Executor#launchTask

    • 这个简化的调用栈只是为了说明任务是如何从TaskScheduler开始,最终提交到Executor上去的。
      其中,调用栈16都是在Driver端的,78就到了Executor上,8最终完成了任务的执行。7~8可以参阅第六章。

4.3.3 任务调度具体实现

  • 1.FIFO

    • FIFO的调度顺序:
      首先保证Job ID较小的被先调度,如果是同一个Job,那么Stage小的先被调度(同一个Job,可能多个Stage可以并行执行)。
  • 2.FAIR

    • 首先是挂到rootPool下面的pool先确定调度顺序,然后在每个pool内部使用相同的算法来确定TaskSetManager的调度顺序。确定了调度顺序,TaskScheduler就可以按照这个顺序将Task发送到Executor进行执行了。

4.3.4 Task 运算结果的处理

  • 1.Driver收到Executor的任务执行结果

    • Task在Executor执行完成时,会通过向Driver发送StatusUpdate的消息来通知Driver任务的状态更新为TaskState.FINISHED。Driver首先会将任务的状态更新通知TaskScheduler,然后会在这个Executor上重新分配新的计算任务。
      一个Task的状态只有是TaskState.FINISHED才标记他成功执行;其余的状态包括TaskState.FAILED、TaskState.KILLED和TaskState.LOST都是执行失败。

    • Executor在将结果回传到Driver时,会根据结果的大小使用不同的策略:

      • 1.如果结果大于1GB,那么直接丢弃这个结果。
      • 2.对于较大的结果,将其以tid为key存入org.apache.spark.storage.BlockManager;如果结果不大,则直接回传给Driver。
      • 3.其他的直接通过AKKA回传到Driver。
  • 2.处理任务成功执行的机制

  • 3.处理任务执行失败的容错机制

    • 如果失败没有超过阈值(默认4),那么会重新提交任务。
      TaskSetManager首先会根据失败的原因来采取不同的动作,比如如果是因为Task的结果发序列化失败,那么说明任务的执行是有问题的,这种任务即使重试也不会成功,因此这个TaskSetManager会直接失败。
      如果这些任务需要重试,那么他会重新将这些任务标记为等待调度。

4.4 Word Count调度计算过程详解

Page73

4.5 小结

要理解Stage,首先要搞明白什么是Task。Task是集群上运行的基本单位。一个Task负责处理RDD的一个Partition。RDD的多个Partition会分别由不同的Task去处理。当然,这些Task的处理逻辑是完全一致的。这一组Task就组成了一个Stage。

有两种Task:

1.org.apache.spark.scheduler.ShuffleMapTask
2.org.apache.spark.scheduler.ResultTask

ShuffleMapTask根据Task的partitionner将计算结果放到不同的bucket中。而ResultTask是将计算结果发送回Driver Application。一个Job包含了多个Stage,而Stage是由一组完全相同的Task组成的。最后的Stage包含了一组ResultTask。

在用户触发了一个动作后,比如count、collect,SparkContext会通过runJob的函数开始进行任务提交。最后会通过DAG的事件处理器传递到DAGScheduler本身的handleJobSubmitted,他首先会划分Stage,提交Stage,然后提交Task。至此,Task就开始在集群上运行了。

一个Stage的开始就是从外部存储或shuffle结果中读取数据;一个Stage的结束就是由于发生shuffle或者生成结果时。

在DAGScheduler将用户提交的应用划分成不同的Stage后,TaskScheduler模块负责为Stage的Task分配计算资源,这个计算资源的分配实际上是Cluster Manager职责。