第三章 RDD实现详解

3.1 概述

Spark的目标是为基于工作集的应用(即多个并行操作重用中间结果的应用)提供抽象,同时保持MapReduce及相关模型的优势特性。

即自动容错、位置感知性和可伸缩性。
RDD比数据流模型更容易编程,同时基于工作集的计算也具有良好的描述能力。

在这些模型中最难实现的是容错性。

一般来说,分布式数据集的容错性有两种:数据检查点和记录数据的更新。

我们面对的是大规模数据分析,数据检查点操作成本很高:需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低得多,同时还需要消耗更多的存储资源。所以我们选择记录更新的方式,但是,如果更新太多,记录更新的成本也不低。

因此,RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列转换记录下来(即Lineage),以便恢复丢失的分区。

3.2 什么是RDD

RDD是已读的、分区记录的集合。

RDD只能由数据集或者其他已有的RDD创建。
RDD含有如何从其他RDD衍生出本RDD的相关信息(即Lineage)

RDD有5个主要的属性

  • 1.一组分片(Partition),即数据集的基本组成单位。

    • 对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD的时候制定RDD的分片个数,如果没有指定,则采用默认值。
      默认值就是程序所分配到的CPU Core的数目。Page18中描述了分区存储的计算模型,每个分配的存储是由BlockManager实现的。每个分区会被逻辑映射成BlockManager的一个Block,这个Block会被一个Task负责计算。
  • 2.一个计算每个分区的函数。

    • Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
  • 3.RDD之间的依赖关系。

    • RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而非全部计算。
  • 4.一个Partiioner,即RDD的分片函数。

    • Spark中实现了两种类型的分片函数,一个是基于Hash的HashPartitioner, 另外一个是基于范围的RangePartitioner。
      只有对于key-value的RDD, 才会有Partitioner,非k-v的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
  • 5.一个列表,存储存取每个Partition的优先位置(preferred location)。

    • 对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的Block的位置。
      按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能的将计算任务分配到其所要处理的数据块的存储位置。

3.2.1 RDD的创建

  • RDD有两种方式进行创建

    • 1.有一个已经存在的Scala集合创建
    • 2.由外部存储系统的数据集创建,包括本地的文件系统,还有Hadoop支持的数据集,比如HDFS、Cassandra、HBase、Amazon S3等。
  • RDD支持两种操作:

    • 1.转换(transformation):从现有的数据集创建一个新的数据集。
    • 2.动作(action):在数据集上进行计算后,返回一个值给Driver程序。

3.2.2 RDD的转换

  • RDD中所有的转换都是惰性的,不会直接计算结果。只是记住这些应用到基础数据集(例如一个文件)上的转换动作。
    默认情况下,每一个转换过的RDD都会在他执行一个动作的时候被重新计算,但是使用persist或者cache方法,在内存中持久化一个RDD,下次查询的时候就不需要再次计算而直接访问即可。

    • map(func)

      • 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成。
    • filter(func)

      • 返回一个新的RDD,该数据集由经过func函数计算后返回值为true的输入元素组成
    • flatmap(func)

      • 类似map,但是每一个输入元素可以被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素)
    • mapPartitions(func)

      • 类似map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
    • mapPartitionsWithSplit(func)

      • 类似于mapPartitions,但func带有一个整数参数表示分片的索引值。因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U]
    • sample(withReplacement, fraction, seed)

      • 根据fraction指定的比例对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器的种子。
    • union(otherDataset)

      • 返回一个新的数据集,新数据集是由源数据集合参数数据集联合而成。
    • distinct([num Tasks])

      • 返回一个包含原数据集中所有不重复元素的新数据集。
    • groupByKey([num Tasks])

      • 在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集。
        注意:默认情况下,只有8个并行任务来做操作,但是可以传入一个可选的numTasks参数来改变它
    • reduceByKey(func, [num Tasks])

      • 在一个(K,V)对的数据集上调用,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。
        与groupByKey类似,reduce的任务个数是可以通过第二个可选参数来设置的。
    • sortByKey([ascending], [numTasks])

      • 在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)对数据集。升序或者降序有ascending布尔参数决定。
    • join(otherDataset, [numTasks])

      • 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))数据集。
    • cogroup(otherDataset, [numTasks])

      • 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,Seq[V], Seq[W])元组的数据集,这个操作也可以成为groupwith。
    • cartesian(otherDataset)

      • 笛卡尔积,在类型为T和U类型的数据集上调用,返回一个(T,U)对数据集(两两的元素对)

3.2.3 RDD的动作

  • reduce(func)

    • 通过函数func(接受两个参数,返回一个参数)聚集数据集中的所有元素。这个功能必须可交换且可关联的,从而可以正确的并行执行。
  • collect()

    • 在驱动程序中,以数组的形式返回数据及的所有元素。通常在使用filter或者其他操作返回一个足够小的数据子集后再使用会比较有用。
  • count()

    • 返回数据集的元素的个数。
  • first()

    • 返回数据集的第一个元素(类似于take(1))
  • take(n)

    • 返回一个由数据集的前n个元素组成的数组
      注意 这个操作并非是并行执行,而是由驱动程序计算所有的元素。
  • takeSample(withReplacement, num, seed)

    • 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
  • saveAsTextFile(path)

    • 将数据及的元素以textfile的形式保存在本地文件系统——HDFS或者任何其他Hadoop支持的文件系统。对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本行。
  • saveAsSequenceFile(path)

    • 将数据及的元素以Hadoop sequencefile的格式保存到指定的目录下,可以是本地系统、HDFS或者任何其他Hadoop支持的文件系统。这个只限于由key-value对组成,并实现了Hadoop的Writable接口,或者可以隐式的转换为Writable的RDD(Spark包括了基本类型的转换,例如Int、Double、String等
  • countByKey()

    • 对(K,V)类型的RDD有效,返回一个(K,Int)对的map,表示每一个key对应的元素个数。
  • foreach(func)

    • 在数据集的每一个元素上,运行函数func进行更新。通常用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互,例如HBase

3.2.4 RDD的缓存

  • Spark速度快的原因之一,就是在不同操作中在内存中持久化一个数据集。
    当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其他动作(action)中重用。

    • persist()

    • cache()

      • 是persist()的快捷方法
  • 缓存有可能丢失,或者存储于内存的数据因为内存不足而被删除。
    RDD的缓存的容错机制保证了即时缓存丢失也能保证计算的正确执行。
    通过基于RDD的一系列的转换,丢失的数据会被重算。
    RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可。

3.2.5 RDD的检查点

  • checkpoint的必要性:
    RDD的缓存能够在第一次计算完成后,将计算结果保存在内存、本地文件系统或Tachyon中。
    如果缓存丢失了,需要重复计算,如果计算特别复杂或者计算耗时很久,那么缓存丢失对于整个job的影响是不容忽视的。
    为了避免缓存丢失带来的重新计算的开销,Spark引入checkpoint。
  • 缓存是在计算结束后,直接将计算结果通过用户定义的存储级别(存储级别定义了缓存存储的介质,现在支持内存、本地文件系统和Tachyon)写入不同的介质。
  • 检查点是在计算完成后,重新建立一个Job来计算。为了避免重复计算,推荐先将RDD缓存,这样就能保证检查点的操作可以快速完成。

3.3 RDD的转换和DAG的生成

Spark根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。

  • sc.textfile会创建一个HadoopRDD,并通过map转换成MapPartitionsRDD
  • map或者flatmap会生成一个MapPartitionsRDD
  • reduceByKey会先生成一个MapPartitionsRDD,起到map端combiner的作用,就是现在本Partition之内进行reduce聚合操作;然后会生成一个ShuffledRDD,从上一个RDD的输出读取数据,作为reducer的开端,重新划分Partition并将不同数据放置在不同分区;最后生成一个MapPartitionsRDD,起到reducer端的reduce作用,将重新划分过的Partition进行reduce聚合

3.3.1 RDD的依赖关系

  • 窄依赖:
    指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用

    • map
    • filter
    • union
    • 只从已知的特定的Partition进行操作的join
  • 宽依赖:
    指的是多个子RDD的Partition会依赖同一个parent RDD的Partition

    • groupBy
    • 所有Partition进行操作的join

3.3.2 DAG的生成

  • 原始的RDD通过一系列转换就形成了DAG。
    RDD之间的依赖关系,包含了RDD由哪些parent RDD转换而来和他依赖parent RDD的哪些Partitions,是DAG的重要属性。

  • 借助这些依赖关系,DAG认为这些RDD之间形成了Lineage(血统)。
    借助Lineage,能保证一个RDD被计算前,它所依赖的parent RDD都已经完成了计算;同时也实现了RDD的容错性,即如果一个RDD的部分或者全部的计算结果丢失了,那么就需要重新计算这部分丢失的数据。

  • Spark根据DAG生成计算任务。

    • 首先,根据依赖关系的不同将DAG划分为不同的Stage。
      对于窄依赖,由于Partition依赖关系的确定性,Partition的转换处理就可以在同一个线程里完成,窄依赖被Spark划分到同一个Stage中。
      对于宽依赖,由于Shuffle的存在,只能在parent RDD Shuffle处理完成之后,才能开始接下来的计算,因此宽依赖就是Spark换粉Stage的依据。
      即Spark根据宽依赖将DAG划分成不同的Stage。
    • 在一个Stage内部,每个Partition都会被分配一个计算任务(Task),这些Task是可以并行执行的,Stage之间根据依赖关系变成了一个大粒度的DAG,这个DAG的执行顺序也是从前向后的,也就是说,Stage只有在他没有parent Stage或者parent Stage都已经执行完成之后,才可以执行。

3.3.3 Word Count的RDD转换和DAG划分的逻辑视图

  • RDD在创建子RDD的时候,会通过Dependency来定义他们之间的关系,通过Dependency,子RDD也可以获得他的parent RDD和parent RDD的Partition。
  • 需要强调的是在进行转换操作reduceByKey时会触发Shuffle,在Shuffle之前,有一个本地的聚合过程,比如某一分片含有(e,1) (e,1)的两个元素,Shuffle之前会被聚合成(e, 2)。
  • Page32

3.4 RDD的计算

3.4.1 Task简介

  • 原始的RDD经过一系列的转换后,会在最后一个RDD上触发一个动作,这个动作会生成一个Job。在Job被划分为一批计算任务(Task)后,这批Task会被提交到集群上的计算节点去计算。
    计算节点执行计算逻辑的部分称为Executor。

  • Executor在准备好Task的运行环境之后,会通过调用org.apache.spark.scheduler.Task#run来进行计算。

  • Spark的Task分为两种

    • 1.org.apache.spark.scheduler.ShuffleMapTask
    • 2.org.apache.spark.scheduler.ResultTask
  • 简单来说,DAG的最后一个阶段会为每个结果的Partition生成一个ResultTask,其余所有的阶段都会生成ShuffleTask。
    生成的Task会被发送到已经启动的Executor上,由Executor来完成计算任务的执行。

3.4.2 Task的执行起点

  • org.apache.spark.scheduler.Task#run会调用ShuffleMapTask或者ResultTask的runTask;
    runTask会调用RDD的org.apache.spark.rdd.RDD#iterator。计算由此开始。

    • iterator中实现这样一个逻辑:
      如果存储级别不是None,那么先检查是否有缓存,如果没有则要进行计算;
      如果存储级别是None,检查是否有checkpoint,如果有checkpoint,直接读取结果;否则直接进行计算
      注意 checkpoint并不只是存储级别为None的时候才会用到的

3.4.3 缓存的处理

  • 如果存储级别不是NONE,那么先检查是否有缓存;没有缓存则要进行计算。
  • 存储级别从用户的角度看就是缓存保存到不同的存储位置,比如内存、硬盘、tachyon;还有缓存的数据是否需要序列化等。详细的内容在第八章。
  • RDD的每个Partition对应Storage模块的一个Block,只不过Block是Partition经过处理后的数据。

3.4.4 checkpoint的处理

  • 在缓存没有命中的情况下,首先会判断是否保存了RDD的checkpoint,如果有,则读取checkpoint。

  • 首先在job结束之后,会判断是否需要checkpoint。如果需要,就调用org.apache.spark.rdd.RDDCheckpointData#doCheckpoint。

    • doCheckpoint首先为数据创建一个目录;
      然后启动一个新的Job来计算,并且将计算结果写入新创建的目录;
      接着创建一个org.apache.spark.rdd.CheckpointRDD;
      最后,原始RDD的所有依赖被清除,这就意味着RDD的转换的计算链(compute chain)等信息都被清除。
    • 数据的写入实现在org.apache.spark.rdd.CheckpointRDD$writeToFile
    • 被checkpoint之后的RDD的依赖变为CheckpointRDD

3.4.5 RDD的计算逻辑

  • RDD的计算逻辑在org.apache.spark.rdd.RDD#compute实现。
    每个特定的RDD都会实现compute。
    比如前面提到的CheckpointRDD的compute就是直接读取checkpoint数据。HadoopRDD就是直接读取制定Partition的数据。MapPartitionsRDD就是将用户的转换逻辑作用到指定的Partition上。

3.5 RDD的容错机制

RDD实现了基于Lineage的容错机制。RDD的转换关系,构成了compute chain,可以把这个compute chain认为是RDD之间演化的Lineage。在部分结果丢失时,只需要根据这个Lineage重算即可。

内部实现上,DAG被Spark划分为不同的Stage,Stage之间的依赖关系可以认为就是Lineage。关于DAG的划分可以参考第四章。

提到Lineage的容错机制,不得不提Tachyon。

Yachyon包含两个维度的容错。
一个是Tachyon集群的元数据的容错,它采用了类似HDFS的Name Node的元数据容错机制,即将元数据保存到一个Image文件,并且保存了元数据变化的编辑日志(EditLog)。
另一个是Tachyon保存的数据的容错机制,这个机制类似于RDD的Lineage,Tachyon会保留生成文件数据的Lineage,在数据丢失时会通过这个Lineage来恢复数据。
如果是Spark数据,那么在数据丢失时Tachyon会启动Spark的Job来重算这部分内容。
如果是Hadoop产生的数据,那么重新启动相应的MapReduce Job就可以。

Tachyon还处于开发阶段,但并不影响Spark使用Tachyon。如果Spark保存到Tachyon的部分数据丢失,Spark会根据自有的容错机制来重算这部分数据。

3.6 小结

RDD是Spark最基本,也是最根本的数据抽象。

RDD是只读的、分区记录的集合。

RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。这些确定性操作称为转换,如map、filter、groupByKey、join。

RDD不需要物化。RDD含有如何从其他RDD衍生出本RDD的相关信息(即Lineage),据此RDD部分分区数据丢失时可以通过物理存储的数据计算出相应的RDD分区。