第三章 RDD实现详解
3.1 概述
Spark的目标是为基于工作集的应用(即多个并行操作重用中间结果的应用)提供抽象,同时保持MapReduce及相关模型的优势特性。
即自动容错、位置感知性和可伸缩性。
RDD比数据流模型更容易编程,同时基于工作集的计算也具有良好的描述能力。
在这些模型中最难实现的是容错性。
一般来说,分布式数据集的容错性有两种:数据检查点和记录数据的更新。
我们面对的是大规模数据分析,数据检查点操作成本很高:需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低得多,同时还需要消耗更多的存储资源。所以我们选择记录更新的方式,但是,如果更新太多,记录更新的成本也不低。
因此,RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列转换记录下来(即Lineage),以便恢复丢失的分区。
3.2 什么是RDD
RDD是已读的、分区记录的集合。
RDD只能由数据集或者其他已有的RDD创建。
RDD含有如何从其他RDD衍生出本RDD的相关信息(即Lineage)
RDD有5个主要的属性
-
1.一组分片(Partition),即数据集的基本组成单位。
- 对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD的时候制定RDD的分片个数,如果没有指定,则采用默认值。
默认值就是程序所分配到的CPU Core的数目。Page18中描述了分区存储的计算模型,每个分配的存储是由BlockManager实现的。每个分区会被逻辑映射成BlockManager的一个Block,这个Block会被一个Task负责计算。
- 对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD的时候制定RDD的分片个数,如果没有指定,则采用默认值。
-
2.一个计算每个分区的函数。
- Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
-
3.RDD之间的依赖关系。
- RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而非全部计算。
-
4.一个Partiioner,即RDD的分片函数。
- Spark中实现了两种类型的分片函数,一个是基于Hash的HashPartitioner, 另外一个是基于范围的RangePartitioner。
只有对于key-value的RDD, 才会有Partitioner,非k-v的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
- Spark中实现了两种类型的分片函数,一个是基于Hash的HashPartitioner, 另外一个是基于范围的RangePartitioner。
-
5.一个列表,存储存取每个Partition的优先位置(preferred location)。
- 对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的Block的位置。
按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能的将计算任务分配到其所要处理的数据块的存储位置。
- 对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的Block的位置。
3.2.1 RDD的创建
-
RDD有两种方式进行创建
- 1.有一个已经存在的Scala集合创建
- 2.由外部存储系统的数据集创建,包括本地的文件系统,还有Hadoop支持的数据集,比如HDFS、Cassandra、HBase、Amazon S3等。
-
RDD支持两种操作:
- 1.转换(transformation):从现有的数据集创建一个新的数据集。
- 2.动作(action):在数据集上进行计算后,返回一个值给Driver程序。
3.2.2 RDD的转换
-
RDD中所有的转换都是惰性的,不会直接计算结果。只是记住这些应用到基础数据集(例如一个文件)上的转换动作。
默认情况下,每一个转换过的RDD都会在他执行一个动作的时候被重新计算,但是使用persist或者cache方法,在内存中持久化一个RDD,下次查询的时候就不需要再次计算而直接访问即可。-
map(func)
- 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成。
-
filter(func)
- 返回一个新的RDD,该数据集由经过func函数计算后返回值为true的输入元素组成
-
flatmap(func)
- 类似map,但是每一个输入元素可以被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素)
-
mapPartitions(func)
- 类似map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
-
mapPartitionsWithSplit(func)
- 类似于mapPartitions,但func带有一个整数参数表示分片的索引值。因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U]
-
sample(withReplacement, fraction, seed)
- 根据fraction指定的比例对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器的种子。
-
union(otherDataset)
- 返回一个新的数据集,新数据集是由源数据集合参数数据集联合而成。
-
distinct([num Tasks])
- 返回一个包含原数据集中所有不重复元素的新数据集。
-
groupByKey([num Tasks])
- 在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集。
注意:默认情况下,只有8个并行任务来做操作,但是可以传入一个可选的numTasks参数来改变它
- 在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集。
-
reduceByKey(func, [num Tasks])
- 在一个(K,V)对的数据集上调用,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。
与groupByKey类似,reduce的任务个数是可以通过第二个可选参数来设置的。
- 在一个(K,V)对的数据集上调用,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。
-
sortByKey([ascending], [numTasks])
- 在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)对数据集。升序或者降序有ascending布尔参数决定。
-
join(otherDataset, [numTasks])
- 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))数据集。
-
cogroup(otherDataset, [numTasks])
- 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,Seq[V], Seq[W])元组的数据集,这个操作也可以成为groupwith。
-
cartesian(otherDataset)
- 笛卡尔积,在类型为T和U类型的数据集上调用,返回一个(T,U)对数据集(两两的元素对)
-
3.2.3 RDD的动作
-
reduce(func)
- 通过函数func(接受两个参数,返回一个参数)聚集数据集中的所有元素。这个功能必须可交换且可关联的,从而可以正确的并行执行。
-
collect()
- 在驱动程序中,以数组的形式返回数据及的所有元素。通常在使用filter或者其他操作返回一个足够小的数据子集后再使用会比较有用。
-
count()
- 返回数据集的元素的个数。
-
first()
- 返回数据集的第一个元素(类似于take(1))
-
take(n)
- 返回一个由数据集的前n个元素组成的数组
注意 这个操作并非是并行执行,而是由驱动程序计算所有的元素。
- 返回一个由数据集的前n个元素组成的数组
-
takeSample(withReplacement, num, seed)
- 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
-
saveAsTextFile(path)
- 将数据及的元素以textfile的形式保存在本地文件系统——HDFS或者任何其他Hadoop支持的文件系统。对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本行。
-
saveAsSequenceFile(path)
- 将数据及的元素以Hadoop sequencefile的格式保存到指定的目录下,可以是本地系统、HDFS或者任何其他Hadoop支持的文件系统。这个只限于由key-value对组成,并实现了Hadoop的Writable接口,或者可以隐式的转换为Writable的RDD(Spark包括了基本类型的转换,例如Int、Double、String等
-
countByKey()
- 对(K,V)类型的RDD有效,返回一个(K,Int)对的map,表示每一个key对应的元素个数。
-
foreach(func)
- 在数据集的每一个元素上,运行函数func进行更新。通常用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互,例如HBase
3.2.4 RDD的缓存
-
Spark速度快的原因之一,就是在不同操作中在内存中持久化一个数据集。
当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其他动作(action)中重用。-
persist()
-
cache()
- 是persist()的快捷方法
-
-
缓存有可能丢失,或者存储于内存的数据因为内存不足而被删除。
RDD的缓存的容错机制保证了即时缓存丢失也能保证计算的正确执行。
通过基于RDD的一系列的转换,丢失的数据会被重算。
RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可。
3.2.5 RDD的检查点
- checkpoint的必要性:
RDD的缓存能够在第一次计算完成后,将计算结果保存在内存、本地文件系统或Tachyon中。
如果缓存丢失了,需要重复计算,如果计算特别复杂或者计算耗时很久,那么缓存丢失对于整个job的影响是不容忽视的。
为了避免缓存丢失带来的重新计算的开销,Spark引入checkpoint。 - 缓存是在计算结束后,直接将计算结果通过用户定义的存储级别(存储级别定义了缓存存储的介质,现在支持内存、本地文件系统和Tachyon)写入不同的介质。
- 检查点是在计算完成后,重新建立一个Job来计算。为了避免重复计算,推荐先将RDD缓存,这样就能保证检查点的操作可以快速完成。
3.3 RDD的转换和DAG的生成
Spark根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。
- sc.textfile会创建一个HadoopRDD,并通过map转换成MapPartitionsRDD
- map或者flatmap会生成一个MapPartitionsRDD
- reduceByKey会先生成一个MapPartitionsRDD,起到map端combiner的作用,就是现在本Partition之内进行reduce聚合操作;然后会生成一个ShuffledRDD,从上一个RDD的输出读取数据,作为reducer的开端,重新划分Partition并将不同数据放置在不同分区;最后生成一个MapPartitionsRDD,起到reducer端的reduce作用,将重新划分过的Partition进行reduce聚合
3.3.1 RDD的依赖关系
-
窄依赖:
指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用- map
- filter
- union
- 只从已知的特定的Partition进行操作的join
-
宽依赖:
指的是多个子RDD的Partition会依赖同一个parent RDD的Partition- groupBy
- 所有Partition进行操作的join
3.3.2 DAG的生成
-
原始的RDD通过一系列转换就形成了DAG。
RDD之间的依赖关系,包含了RDD由哪些parent RDD转换而来和他依赖parent RDD的哪些Partitions,是DAG的重要属性。 -
借助这些依赖关系,DAG认为这些RDD之间形成了Lineage(血统)。
借助Lineage,能保证一个RDD被计算前,它所依赖的parent RDD都已经完成了计算;同时也实现了RDD的容错性,即如果一个RDD的部分或者全部的计算结果丢失了,那么就需要重新计算这部分丢失的数据。 -
Spark根据DAG生成计算任务。
- 首先,根据依赖关系的不同将DAG划分为不同的Stage。
对于窄依赖,由于Partition依赖关系的确定性,Partition的转换处理就可以在同一个线程里完成,窄依赖被Spark划分到同一个Stage中。
对于宽依赖,由于Shuffle的存在,只能在parent RDD Shuffle处理完成之后,才能开始接下来的计算,因此宽依赖就是Spark换粉Stage的依据。
即Spark根据宽依赖将DAG划分成不同的Stage。 - 在一个Stage内部,每个Partition都会被分配一个计算任务(Task),这些Task是可以并行执行的,Stage之间根据依赖关系变成了一个大粒度的DAG,这个DAG的执行顺序也是从前向后的,也就是说,Stage只有在他没有parent Stage或者parent Stage都已经执行完成之后,才可以执行。
- 首先,根据依赖关系的不同将DAG划分为不同的Stage。
3.3.3 Word Count的RDD转换和DAG划分的逻辑视图
- RDD在创建子RDD的时候,会通过Dependency来定义他们之间的关系,通过Dependency,子RDD也可以获得他的parent RDD和parent RDD的Partition。
- 需要强调的是在进行转换操作reduceByKey时会触发Shuffle,在Shuffle之前,有一个本地的聚合过程,比如某一分片含有(e,1) (e,1)的两个元素,Shuffle之前会被聚合成(e, 2)。
- Page32
3.4 RDD的计算
3.4.1 Task简介
-
原始的RDD经过一系列的转换后,会在最后一个RDD上触发一个动作,这个动作会生成一个Job。在Job被划分为一批计算任务(Task)后,这批Task会被提交到集群上的计算节点去计算。
计算节点执行计算逻辑的部分称为Executor。 -
Executor在准备好Task的运行环境之后,会通过调用org.apache.spark.scheduler.Task#run来进行计算。
-
Spark的Task分为两种
- 1.org.apache.spark.scheduler.ShuffleMapTask
- 2.org.apache.spark.scheduler.ResultTask
-
简单来说,DAG的最后一个阶段会为每个结果的Partition生成一个ResultTask,其余所有的阶段都会生成ShuffleTask。
生成的Task会被发送到已经启动的Executor上,由Executor来完成计算任务的执行。
3.4.2 Task的执行起点
-
org.apache.spark.scheduler.Task#run会调用ShuffleMapTask或者ResultTask的runTask;
runTask会调用RDD的org.apache.spark.rdd.RDD#iterator。计算由此开始。- iterator中实现这样一个逻辑:
如果存储级别不是None,那么先检查是否有缓存,如果没有则要进行计算;
如果存储级别是None,检查是否有checkpoint,如果有checkpoint,直接读取结果;否则直接进行计算
注意 checkpoint并不只是存储级别为None的时候才会用到的
- iterator中实现这样一个逻辑:
3.4.3 缓存的处理
- 如果存储级别不是NONE,那么先检查是否有缓存;没有缓存则要进行计算。
- 存储级别从用户的角度看就是缓存保存到不同的存储位置,比如内存、硬盘、tachyon;还有缓存的数据是否需要序列化等。详细的内容在第八章。
- RDD的每个Partition对应Storage模块的一个Block,只不过Block是Partition经过处理后的数据。
3.4.4 checkpoint的处理
-
在缓存没有命中的情况下,首先会判断是否保存了RDD的checkpoint,如果有,则读取checkpoint。
-
首先在job结束之后,会判断是否需要checkpoint。如果需要,就调用org.apache.spark.rdd.RDDCheckpointData#doCheckpoint。
- doCheckpoint首先为数据创建一个目录;
然后启动一个新的Job来计算,并且将计算结果写入新创建的目录;
接着创建一个org.apache.spark.rdd.CheckpointRDD;
最后,原始RDD的所有依赖被清除,这就意味着RDD的转换的计算链(compute chain)等信息都被清除。 - 数据的写入实现在org.apache.spark.rdd.CheckpointRDD$writeToFile
- 被checkpoint之后的RDD的依赖变为CheckpointRDD
- doCheckpoint首先为数据创建一个目录;
3.4.5 RDD的计算逻辑
- RDD的计算逻辑在org.apache.spark.rdd.RDD#compute实现。
每个特定的RDD都会实现compute。
比如前面提到的CheckpointRDD的compute就是直接读取checkpoint数据。HadoopRDD就是直接读取制定Partition的数据。MapPartitionsRDD就是将用户的转换逻辑作用到指定的Partition上。
3.5 RDD的容错机制
RDD实现了基于Lineage的容错机制。RDD的转换关系,构成了compute chain,可以把这个compute chain认为是RDD之间演化的Lineage。在部分结果丢失时,只需要根据这个Lineage重算即可。
内部实现上,DAG被Spark划分为不同的Stage,Stage之间的依赖关系可以认为就是Lineage。关于DAG的划分可以参考第四章。
提到Lineage的容错机制,不得不提Tachyon。
Yachyon包含两个维度的容错。
一个是Tachyon集群的元数据的容错,它采用了类似HDFS的Name Node的元数据容错机制,即将元数据保存到一个Image文件,并且保存了元数据变化的编辑日志(EditLog)。
另一个是Tachyon保存的数据的容错机制,这个机制类似于RDD的Lineage,Tachyon会保留生成文件数据的Lineage,在数据丢失时会通过这个Lineage来恢复数据。
如果是Spark数据,那么在数据丢失时Tachyon会启动Spark的Job来重算这部分内容。
如果是Hadoop产生的数据,那么重新启动相应的MapReduce Job就可以。