Shuffle模块详解

Shuffle翻译成中文就是洗牌,之所以需要Shuffle,还是因为具有某种共同特征的一类数据需要最终汇聚(aggregate)到一个计算节点上进行计算。这些数据分布在各个存储节点上,并且由不同节点的计算单元处理。

一般来说,每个Task处理的数据可以完全载入内存(如果不能,可以减小每个Partition的大小),因此Task可以做到在内存中计算。

除非非常复杂的计算逻辑,否则为了容错而持久化中间的数据是没有太大收益的,毕竟中间某个过程出错了可以从头开始计算。
但是对于Shuffle来说,如果不持久化这个中间结果,一旦数据丢失,就需要重新计算依赖的全部RDD,因此有必要持久化这个中间结果。

接下来会分析每个ShuffleMapTask结束时,数据是如何持久化(即Shuffle Write)以使得下游的Task可以获取到其需要处理的数据的(即Shuffle Read)。

注意Spark 0.8后,Shuffle Write会将数据持久化到硬盘,虽然之后Shuffle Write不断进行演进优化,但是数据落地到本地文件系统的实现并没有改变。

7.1 Hash Based Shuffle Write

在Spark1.0以前,Spark只支持Hash Based Shuffle。因为在很多运算场景中并不需要排序,因此多余的排序只能使性能变差,比如Hadoop的MapReduce就是这么实现的,也就是Reducer拿到的数据都是已经排好序的。

实际上Spark的实现很简单:每个ShuffleMapTask根据Key的哈希值,计算出每个Key需要写入的Partition,然后将数据单独写入一个文件,这个Partition实际上就对应了下游的一个ShuffleMapTask或者ResultTask。因此下游的Task在计算时会通过网络(如果该Task与上游的ShuffleMapTask运行在同一个节点,那么此时就是一个本地的硬盘读写)读取这个文件并进行计算。

7,1,1 Basic Shuffle Writter实现解析

  • 在Executor上执行ShuffleMapTask时,最终会调用org.apache.spark.scheduler.ShuffleMapTask的runTask,核心逻辑比较简单,总结如下:
  • 1)从SparkEnv中获得shuffleManager,就如前面提到的,Spark除了支持Hash和Sort Based Shuffle外,还支持external的Shuffle Service,用户可以通过实现几个类就可以使用自定义的Shuffle。
  • 2)从manager里获取Writer。
  • 3)调用rdd开始运算,运算结果通过Writer进行持久化。

7.1.2 存在的问题

  • 由于每个ShuffleMapTask需要为每个下游的Task创建一个单独的文件,因此文件的数量就是number(shuffle_map_task) * number(following_task)。如果ShuffleMapTask是1000,下游的Task是500,那么理论上会产生500 000个文件(对于size为0的文件Spark有特殊的处理)。生产环境中Task的数量是机会更多,因此有以下问题:
  • 1)每个节点可能会同时打开多个文件,每次打开文件都会占用一定内存。
  • 2)从整体的角度,打开多个文件对于系统来说意味着随机读,尤其是每个文件比较小但是数量非常多的情况。如果集群依赖的是固态硬盘,也许情况会改善很多,但是随机写的性能肯定不如顺序写的。

7.1.3 Shuffle Consolidate Writer

  • 为了解决Shuffle产生文件过多的问题,Spark0.8.1中加入了Shuffle Consolidate Files机制。它的主要目标是减少Shuffle过程产生的文件。若使用这个功能,则需要将spark.shuffle.consolidateFiles设置为true。
  • 对于运行在一个Core 的ShuffleMapTask,第一个ShuffleMapTask会创建一个文件,之后的就会将数据追加到这个文件上而不是新建一个文件。因此文件的数量就变成了number(cores) * number(following_task)。
    当然如果每个Core都只运行一个ShuffleMapTask,那么就和原来一样,但是ShuffleMapTask明显多于Core数量或者说每个Core都会运行多个ShuffleMapTask,所以这个实现能够显著减少文件的数量。

7.1.4 小结

  • Shuffle Consolidate机制虽然在某些场景下缓解了Shuffle过程产生文件过多的问题,但是还没有彻底解决7.1.2中的,同时打开多个文件造成Writer Handler内存使用过大的问题和产生多个文件造成的随机读从而影响性能的问题。
    实际上,通过上面的分析,要解决Basic Shuffle Write的问题还是比较困难的。毕竟对于一个已经在很多生产环境中使用的平台推倒重来要冒很大的风险。
    Spark采用了一个很聪明的做法,即建立另外一套Shuffle机制,让用户自己选择。Spark1.0建立了Shuffle Pluggable的框架,通过实现一个接口来很容易的实现第三方的external Shuffle Service。而Spark1.1实现了Sort Based Shuffle;Spark1.2里,Sort Based Shuffle取代Hash Based Shuffle成为Shuffle 的默认选项。

7.2 Shuffle Pluggable框架

首先介绍一下需要实现的接口,如果要实现新的Shuffle机制,那么需要实现这些接口

7.2.1 org.apache.spark.shuffle.ShuffleManager

  • Driver和每个Executor都会持有一个ShuffleManage,这个ShuffleManager可以通过配置项spark.shuffle.manager指定,并且由SparkEnv创建。Driver中的ShuffleManager负责注册Shuffle的元数据,比如shuffleId、MapTask的数量等。
    Executor中的ShuffleManager则负责读和写Shuffle的数据。

7.2.2 org.apache.spark.shuffle.ShuffleWriter

  • ShuffleMapTask通过ShuffleWriter将Shuffle数据写入本地。这个Writer主要通过ShuffleBlockManager来写入数据,因此它的功能是比较轻量级的。

7.2.3 org.apache.spark.shuffle.ShuffleBlockManager

  • 主要使用从本地读取Shuffle数据的功能。

7.2.4 org.apache.spark.shuffle.ShuffleReader

  • ShuffleReader实现了下游Task如何读取上游ShuffleMapTask的Shuffle输出逻辑。

7.2.5 如何开发自己的Shuffle机制

  • 通过Hash Based Shuffle和Sort Based Shuffle,可以得出使用Spark Pluggable框架开发一个第三方的Shuffle Service是比较容易的;这个容易是指功能实现方面。但是这个实现必须要考虑超大规模数据场景下的性能问题以及资源消耗问题。

7.3 Sort Based Write

Spark1.2.0中,Spark Core的一个重要的升级就是讲默认的Hash Based Shuffle换成了Sort Based Shuffle,即spark.shuffle.manager从Hash换成了Sort。

那么Sort取代Hash的原因是什么呢?

正如前面提到的,Hash Based Shuffle的每个Mapper都要为每个Reducer写一个文件,共Reducer读取,即需要产生M*R个文件,如果Mapper和Reducer数量大,则产生的文件会非常多。

Hash Based Shuffle设计的目标之一就是避免不需要的排序(Hadoop MapReduce被人诟病的地方,很多不需要Sort的地方Sort导致了不必要的开销)。但是它在处理超大规模数据集的时候,产生了大量的Disk IO和内存的消耗,这无疑很影响性能。Hash Based Shuffle也在不断的优化中,正如前面讲到的spark引入的File Consolidate在一定程度上解决了这个问题,为了更好的解决这个问题,spark1.1引入了Sort Based Shuffle。

首先,每个ShuffleMapTask不会为每个Reducer生成一个单独的文件;相反,他会将所有的结果写到一个文件里,同时会生成一个Index文件,Reducer可以通过这个Index文件取得他所需要处理的数据。避免产生大量文件的直接受益就是节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。而减少文件的数量可以避免同时写多个文件给系统带来的压力。

实现详解:

ShuffleMapTask会按照key相对应的Partition ID进行Sort,其中属于同一个Partition 的key不会Sort。因为对于不需要Sort的操作来说,这个Sort是负收益的;要知道之前Spark刚开始使用Hash Based 的Shuffle而不是Sort Based就是为了避免Hadoop MapReduce对于所有计算都会Sort的性能损耗。对于那些需要Sort的运算,比如sortByKey,这个Sort在Spark1.2.0里还是由Reducer完成的。

如果这个过程内存不够用了,那么这些已经排序的内容会被写入到外部存储,然后在结束的时候将这些不同的文件进行归并排序。

为了便于下游的Task获取到其所需要的Partition,这里会生成一个Index文件,去记录不同的Partition的位置信息。

7.4 Shuffle Map Task运算结果的处理

ShuffleMapTask运算结果的处理分为两个部分,一个是在Executor端直接处理Task结果的;另一个是Driver端在接到Task运行结束的消息时对Shuffle Write的结果进行处理,从而在调度下游的Task时,使其可以得到需要的数据。

7.4.1 Executor端的处理

  • 得到的结果会存在org.apache.spark.scheduler.DirectTaskResult
  • 结果的大小会影响直接返回Driver还是存在BlockManager还是丢弃。

7.4.2 Driver端的处理

  • Task的不同State会有不同的处理,例如FINISHED、LOST等等

7.5 Shuffle Read

回忆一下,每个Stage的上边界,要么需要从外部存储读取数据,要么需要读取上一个Stage的输出;而下边界,要么是需要写入到本地文件系统(需要Shuffle),以供child Stage读取,要么是最后一个Stage,需要输出结果。

这里的Stage,在运行时就是可以以流水线的方式运行的一组Task,除了最后一个Stage对应的是ResultTask,其余的Stage都是ShuffleMapTask。

而除了需要从外部存储读取数据和RDD已经做过cache或者checkpoint的Task,一般Task都是从ShuffledRDD的Shuffle Read开始的。

7.5.1 整体流程

  • Shuffle Read的整体架构。org.apache.spark.rdd.ShuffledRDD#compute开始,通过调用org.apache.spark.shuffle.ShuffleManager的getReader方法,获取到org.apache.spark.shuffle.ShuffleReader,然后调用其read方法进行读取。

7.5.2 数据读取策略的划分

  • org.apache.spark.storage.ShuffleBlockFetcherIterator会通过splitLocalRemoteBlocks划分数据的读取策略:如果数据在本地,那么可以直接从BlockManager中获取;如果需要从其他节点上获取,则需要通过网络。由于Shuffle的数据量可能很大,因此这里的网络读取分为以下几种策略:
  • 1)每次最多启动5个线程到最多5个节点上读取数据。
  • 2)每次请求的数据大小不会超过spark.reducer.maxMbInFlight(默认48MB)的五分之一。
  • 这样的原因有以下几点:
  • 1)避免占用目标机器过多带宽,在千兆网卡为主流的今天,带宽还是比较重要的。
  • 2)请求数据可以平行化,这样请求数据的时间可以大大减少。请求数据的总时间就是请求中耗时最长的,这样可以缓解一个节点出现网络拥塞时的影响。

7.5.3 本地读取

  • fetchLocalBlock()负责本地Block的获取。

7.5.4 远程读取

  • 现在支持两种远程读取的方式,一种是netty,一种是nio,可以通过spark.shuffle.blockTransferService来进行设置。

7.6 性能调优

7.6.1 spark.shuffle.manager

  • 如果需要Hash Based Shuffle,只需将spark.shuffle.manager设置成 ’ hash’ 即可。
    如果对于性能有比较苛刻的要求,那么就要理解这两种不同的Shuffle机制的原理,结合具体的应用场景进行选择。
  • Hash Based Shuffle,就是根据Hash的结果,将各个Reducer Partition的数据写到单独的文件中去,写数据时不会有排序的操作。如果Reducer的Partition比较多,会产生大量的磁盘文件,会带来大量内存消耗和随机读问题。
  • Sort Based Shuffle会根据实际情况对数据采用不同的方式进行Sort。这个排序可能仅仅是按照Reducer的Partition进行排序,保证一个ShuffleMapTask对应的不同的Reducer Partition的数据都可以写到一个数据文件,通过一个offset来标记不同Reducer Partition的分界。因此ShuffleMapTask仅仅会生成一个数据文件(还有一个Index索引文件),从而避免了HashBasedShuffle文件数量过多的问题。
  • 选择Hash还是Sort,取决于内存、排序和文件操作等因素的综合影响。
  • 对于不需要进行排序且Shuffle产生的文件不是特别多时,选择Hash;因为Sort Based Shuffle会按照Reducer的Partition进行排序。
  • 而Sort Based Shuffle的优势就在于可扩展性,它的出现实际上很大程度上是解决了Hash Based Shuffle的可扩展性的问题。由于Sort Based Shuffle还在演进中,因此它的性能可能会得到不断的改善。
  • 默认的Sort可以满足大部分场景的需要。

7.6.2 spark.shuffle.spill

  • 这个参数的默认值是true,用于指定Shuffle的过程中如果内存中的数据超过阈值时是否需要将部分数据写入外部存储。如果设置为false,那么这个过程就会一直使用内存,会有内存溢出的风险,因此只有在确定内存足够使用是,才设置成false。

7.6.3 spark.shuffle.memoryFraction和spark.shuffle.safetyFraction

  • 在启用了spark.shuffle.spill的情况下,memeoryFraction决定了当Shuffle过程中使用的内存达到总内存比例的多少开始spill。spark1.2.0中是0.2。通过这个参数可以设置Shuffle占用内存的大小,他直接影响了写入到外部存储的频率和垃圾回收的频率。
  • spark.shuffle.safetyFraction作为一个保险系数降低实际Shuffle过程所需要的内存值,可以降低实际内存超过用户配置值的风险。

7.6.4 spark.shuffle.sort.bypassMergeThreshold

  • 这个配置的默认值是200,用于设置Reducer的Partition的数目少于多少的时候,Sort Based Shuffle内部不使用归并排序的方式处理数据,而是直接将每个Partition写入单独的文件。这个方式和Hash Based的方式类似,区别就是在最后这些文件还是会合并成一个单独的文件,并通过一个Index索引文件来标记不同Partition的位置信息。从Reducer来看,数据文件和索引文件的格式和内部是否做过归并排序是完全相同的。
  • 这个可以看做Sort Based Shuffle在Shuffle量较小的饿时候对于Hash Based Shuffle的一种折中。当然了他和Hash Based Shuffle一样,也存在打开文件过多导致内存占用增加的问题。

7.6.5 spark.shuffle.blockTransferService

  • 在Spark1.2.0中默认为netty,之前的版本为nio。
    它主要是用于在各个Executor之间传输Shuffle数据。netty的实现更加简洁,但实际用户并不关心。

7.6.6 spark.shuffle.consolidateFiles

  • 这个配置的默认值是false。主要是为了解决在Hash Based Shuffle过程中产生过多文件的问题。如果为true,那么对于同一个Core上运行的Shuffle Map Task不会产生一个新的Shuffle文件而是重用原来的。但是每个ShuffleMapTask还是需要产生下游Task数量的文件,因此并没有减少同时打开文件的数量。

7.6.7 spark.shuffle.compress 和 spark.shuffle.spill.compress

  • 这两个参数默认都是true,都是用来设置Shuffle过程中是否对Shuffle数据进行压缩,其中前者针对最终写入文件系统的输出文件;后者针对在处理过程中需要写入到外部存储的中间数据,即针对最终的Shuffle输出文件。

  • 1.设置spark.shuffle.compress

    • 如果下游的Task通过网络获取上游的ShuffleMapTask的结果的网络IO成为瓶颈,那么就需要考虑设置为true:通过压缩来减少网络IO。缺点是增加了上游的压缩时间和下游的解压时间。
      如果网络时瓶颈,比如集群普遍使用的千兆网卡,设置true;如果计算是CPU密集型的,那么设置false更好。
  • 2.设置spark.shuffle.spill.comoress

    • 如果设置成true,代表处理的中间结果在spill到本地硬盘时都会进行压缩,在将中间结果取回进行merge的时候,要进行解压。同样增加了压缩和解压的时间。
      在Disk IO成为瓶颈的时候,设置为true;如果硬盘是SSD,设置为false

7.6.8 spark.reducer.maxMbInFlight

  • 这个参数用于限制一个Reducer Task向其他的Executor请求Shuffle数据时所占用的最大内存数,尤其是如果网卡是千兆或者以下时。默认值是48M。

7.7 小结

在Spark0.6和0.7时,Shuffle的结果都需要先存储到内存中(有可能要写入磁盘),因此在大数据量的情况下,发生GC和OOM(out of memory,内存用尽)的概率非常大。因此Spark0.8的时候,Shuffle 的每个结果都会直接写入磁盘,并且为下游的每个Task都生成一个单独的文件。这样解决了Shuffle的结果都需要存入内存的问题,但是又引入了另外一个问题:生成的小文件过多,尤其在每个文件的数据量不大而文件特别多的时候,大量的随机读会严重影响性能。Spark0.8.1为了解决0.8中的问题,引入了File Consolidation机制,这在一定程度上解决了这个问题。

由此可见,HaseBasedShuffle在可扩展性方面的确有局限性。而Spark1.0中引入的Shuffle Pluggable框架,为加入新的Shuffle机制和引入第三方的Shuffle机制奠定了基础。

在Spark1.1的时候,引入的Sort Based Shuffle;并且在Spark1.2.0的时候,Sort Based Shuffle成为Shuffle的默认选项。

但是,随着内存成本不断下降和容量的不断上升,Spark Core会在未来重新将Shuffle过程在内存中进行吗?我认为不太可能也没有必要,如果用户对于性能有比较苛刻的要求而Shuffle的过程的确是性能优化的重点时,可以尝试以下实现方式:

1)Worker的节点采用固态硬盘

2)Worker的Shuffle结果保存到RAM Disk上

3)根据自己的应用场景,实现自己的Shuffle机制。