Executor模块详解
Executor模块负责运行Task计算任务,并将结果会传到Driver。Spark支持多种资源调度框架,这些资源框架在为计算任务分配资源后,最终都会使用Executor模块完成最终的计算。
每个Spark的Application都是从SparkContext开始的,他通过Cluster Manager和Worker上的Executor建立联系,由每个Executor完成Application的部分计算任务。不同的Cluster Master,即资源调度框架的实现模式会有区别,但是任务的划分和调度都是由运行SparkContext端的Driver完成的,资源调度框架在为Application分配资源后,将Task分配到计算的物理单元Executor去处理。
6.1 Standalone模式的Executor分配详解
Standalone模式下集群启动时,Worker会向Master注册,使得Master可以感知进而管理整个集群;Master通过借助ZooKeeper,可以简单实现高可用性;而应用方通过SparkContext这个与集群的交互接口,在创建SparkContext的时候就完成的Application的注册,由Master为其分配Executor;在应用方创建了RDD并且在这个RDD进行了很多的转换后,出发了Action,通过DAGScheduler将DAG划分为不同的Stage,并将Stage转换为TaskSet交给TaskScheduler;再由TaskSchedulerImpl通过SparkDeploySchedulerBackend的reviveOffers,最终向ExecutorBackend发送LaunchTask的消息;ExecutorBackend收到消息后,启动Task,开始在集群中启动计算。
SparkContext是用户应用和Spark集群的交换的主要接口,用户应用一般首先要创建他。如果使用SparkShell,不必显式地创建他,系统会自动创建一个名为sc的SparkContext实例。
创建SparkContext实例,主要工作是设置一些参数,比如Executor使用到的内存的大小。如果系统的配置文件已经设置,那么久直接读取该配置;否则读取环境变量。如果都没有设置,那么取默认值为512M。
除了加载这些集群的参数,他完成了TaskScheduler和DAGScheduler的创建。
6.1.1 SchedulerBackend创建AppClient
- SparkDeploySchedulerBackend是Standalone模式的SchedulerBackend。在org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend#start中会创建AppClient,而AppClient可以向Standalone的Master注册Application,然后Master会通过Application的信息为他分配Worker,包括每个Worker上使用CPU Core的数目等。
- SparkDeploySchedulerBackend准备好Application相关的信息后,创建AppClient;AppClient通过调用SparkDeploySchedulerBackend的某些接口来通知Driver某些状态更新;通过ApplicationDescription中携带的Driver Actor的URL,Executor可以通过AKKA和Driver进行通信。
- 子主题 3
6.1.2 AppClient向Master注册Application
- AppClient是Application和Master的交互接口。它包含了一个actor负责了所有与Master 的交互。
Master与actor主要的消息如下: - 1)Master成功注册Application
- 2)Master删除Application
- 3)添加Executor
- 4)Master Executor状态更新的消息
- 5)Master变换的消息
- 6)停止AppClient
- 如果超过20s没有接收到注册成功的消息,那么会重新注册;如果重试超过3次仍未成功,那么本次的注册就失败了。事实上,除非连接不上Master,否则注册都会成功。
6.1.3 Master根据AppClient的提交选择Worker
- org.apache.spark.deploy.master.Master#schedule为处于待分配资源的Application分配资源。在每次有新的Application加入或者新的资源加入时都会调用schedule进行调度。为Application分配资源选择Worker(Executor),现在有两种策略:
- 1)尽量打散,即将一个Application尽可能多地分配到不同的节点。默认。
- 2)尽量集中,即一个Application尽量分配到尽可能少的节点。CPU密集型而内存占用比较少的Application适合使用这种策略。
- 对于同一个Application,他在一个Worker上只能拥有一个Executor;这并不代表一个物理节点只能部署一个Executor;可以通过在一个物理节点上部署多个Worker来完成。
6.1.4 Worker根据Master的资源分配结果创建Executor
6.2 Task的执行
Executor会为Task生成一个TaskRunner(继承自Runnable);TaskRunner的run中会执行Task。
TaskRunner最终会被放到一个ThreadPool中去执行。
6.2.1依赖环境的创建和分发
- 在Driver端封装Task时,会将Task依赖的文件封装到Task中。
在Executor端,恢复Task的依赖和Task本身。
从序列化的Task中获得依赖的文件和依赖的位置信息后,Executor会下载Task的运行依赖,完成后才具备执行Task的能力。
接下来,Executor会通过TaskRunner执行任务。
6.2.2任务执行
- runTask在不同的Task会有不同的实现。
- 1)org.apache.spark.scheduler.ResultTask
对于最后一个Stage,会根据生成结果的Partition来生成与Partition数量相同的ResultTask,然后ResultTask会将计算的结果汇报到Driver端。 - 2)org.apache.spark.scheduler.ShuffleMapTask
对于非最后的Stage,会根据每个Stage的Partition数量来生成ShuffleMapTask。ShuffleMapTask会根据下游Task的Partition数量和Shuffle的策略来生成一系列文件。
6.2.3任务结果的处理
- 在Executor运行Task时,得到的计算结果会存入org.apache.spark.scheduler.DirectTaskResult,但是这个serializedDirectResult并不是直接回传到Driver,在将结果回传到Driver时,会根据结果的大小使用不同的策略:
- 1)结果大于1GB,那么会直接丢弃这个结果。可以通过spark.driver.maxResultSize来进行设置。
- 2)对于较大的结果,将其以taskId为key存入org.apache.spark.storage.BlockManager;如果结果不大,则直接回传给Driver。那么如何判定这个阈值?这里的回传是直接通过AKKA的消息传递机制。因此结果首先不能超过该机制设置的消息的最大值。这个最大值是通过spark.akka.framesize设置的,单位是MByte,默认是10MB。
除此之外还有200KB的预留空间,因此阈值是conf.getInt(“spark.akka.framesize”, 10) * 1024 * 1024 - 200 * 1024 - 3)其他情况则直接通过AKKA回传到Driver
6.2.4 Driver端的处理
- 根据Task的State是FINISHED还是FAILED、KILLED或者LOST进行不同处理;如果是ShuffleMapTask,那么他需要将结果通过某种机制告诉下游的Stage,以便其可以作为下游Stage的输入。
6.3 参数设置
6.3.1 spark.executor.memory
- 它配置了Executor可以最多使用的内存的大小。一个集群的内存资源总归是有限的,这些内存会被很多应用所共享。因此设置合理的内存值是非常必要的,过大则会导致部分任务由于分配不到资源而等待,延长应用的执行时间;过小则会产生频繁的垃圾回收和读写外部磁盘。
- Executor的内存是被其内部所有的任务所共享的,而每个Executor上可以支持的任务的数量取决于Executor所持有的CPU Core的数量。因此为了评估一个Executor占用多少内存是合适的,需要了解每个任务的数据规模的大小和计算过程中所需要的临时内存空间的大小(困难)。如果需要比较准确的评估数据集的大小的话,可以将RDD cache到内存中,从BlockManager的日志中可以看到每个Cache分区的大小。
- 如果内存比较紧张,就需要合理规划每个分区任务的数据规模,例如采用更多的分区,用增加任务数量(进而需要更多的批次来运算所有的任务)的方式减少每个任务所需要的处理的数据大小。
6.3.2 日志相关
- 如果spark.eventLog.enabled设置为true(默认为false),那么需要设置日志写入目录spark.eventLog.dir,这样日志就可以保存到本地,方便调试和问题追踪。
但是随着时间推移,节点的日志必须需要一个清除机制,否则日志很容易写满磁盘。
6.3.3 spark.executor.heartbeatInterval
- Executor和Driver之间心跳的间隔,单位是毫秒。心跳主要是Executor向Driver汇报运行状态和Executor上报Task的统计信息。