Spark-单value算子总结

1. map算子(改变结构就用map)

先看map函数
 /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
功能说明

参数f是一个函数,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。即,这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的

代码演示
	// 创建SparkConf
	val conf: SparkConf = new SparkConf()
	// 创建SparkContext,该对象是提交Spark App的入口
 	val sc: SparkContext = new SparkContext(conf)
 	// 参数(数据源,分区数(可选) )
    val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)
    // map操作 元素乘以2
    val mapRdd: RDD[Int] = rdd.map(_*2) 
    mapRdd.collect().foreach(println)
结果:
2 
4 
6 
8
关于分区


图片中的说明:
先把一个数据拿过来以后进行 *2 操作
例如拿1 过来后 *2 = 2 后,1这个数据就离开这块区域
然后进行第二个数据的处理…

注意:map的分区数和RDD的分区数一致(看下面源码)

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
往下走
 override def getPartitions: Array[Partition] = firstParent[T].partitions
再往下走firstParent
/** Returns the first parent RDD */
  protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
    dependencies.head.rdd.asInstanceOf[RDD[U]]
  }

主要的是:firstParent[T].partitions 这里

2. mapPartitions() 以分区为单位执行Map

先看mapPartitions函数
def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }
功能说明

f: Iterator[T] => Iterator[U]:f函数把每个分区的数据分别放入到迭代器中(批处理)
preservesPartitioning: Boolean = false :是否保留RDD的分区信息
功能:一次处理一个分区数据

代码演示
	// 前面代码省略,直接从数据源开始
	val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)
    val mapRdd = rdd.mapPartitions(_.map(_*2))
    mapRdd.collect().foreach(println)
结果:
2
4
6
8
关于分区


分区说明
每一个分区的数据会先到内存空间,然后才进行逻辑操作,整个分区操作完之后,拿到分区的数据才会释放掉。
从性能方面讲:批处理效率高
从内存方面:需要内存空间较大

3. mapPartitionsWithIndex()带分区号

先看mapPartitionsWithIndex函数
def mapPartitionsWithIndex[U: ClassTag](
		// Int表示分区编号
      f: (Int, Iterator[T]) => Iterator[U], 
      preservesPartitioning: Boolean = false): RDD[U]

功能说明

f: (Int, Iterator[T]) => Iterator[U]:f函数把每个分区的数据分别放入到迭代器中(批处理)并且加上分区号
preservesPartitioning: Boolean = false :是否保留RDD的分区信息
功能:比mapPartitions多一个整数参数表示分区号

代码演示
	val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2)
    val mapRdd = rdd.mapPartitionsWithIndex((index, items) => {
      items.map((index, _))
    })
    // 打印修改后的RDD中数据
     mapRdd.collect().foreach(println)
结果:
(0,1)
(0,2)
(1,3)
(1,4)

4. flatMap()扁平化

先看flatMap函数
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

功能说明

与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。
区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中

代码演示
	val listRDD=sc.makeRDD(List(List(1,2),List(3,4),List(5,6),List(7)), 2)
    val mapRdd: RDD[Int]= listRDD.flatMap(item=>item)

    // 打印修改后的RDD中数据
     mapRdd.collect().foreach(println)
结果:
1
2
3
4
5
6
7

5. glom()分区转换数组

先看glom函数
 def glom(): RDD[Array[T]] = withScope {
    new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
  }

功能说明

该操作将RDD中每一个分区变成一个数组,并放置在新的RDD中,数组中元素的类型与原分区中元素类型一致

代码演示(求两个分区中的最大值)
	val sc: SparkContext = new SparkContext(conf)
    val rdd = sc.makeRDD(1 to 4, 2)
    val mapRdd = rdd.glom().map(_.max)
    // 打印修改后的RDD中数据
     mapRdd.collect().foreach(println)
结果:
2
4

5. groupBy()分组

先看groupBy函数
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
    groupBy[K](f, defaultPartitioner(this))
  }

功能说明

分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

代码演示 (按照元素模以2的值进行分组)
	val rdd = sc.makeRDD(1 to 4,2)
    val mapRdd: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
    // 打印修改后的RDD中数据
    mapRdd.collect().foreach(println)
结果:
(0,CompactBuffer(2, 4))
(1,CompactBuffer(1, 3))

6. sample()采样

先看sample函数
def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T]

功能说明

withReplacement: true为有放回的抽样,false为无放回的抽样;
fraction表示:以指定的随机种子随机抽样出数量为fraction的数据;
seed表示:指定随机数生成器种子。
两个算法介绍:

 抽取数据不放回(伯努利算法)
 		val sampleRDD: RDD[Int] = dataRDD.sample(false, 0.5)
        伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。
        具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
        第一个参数:抽取的数据是否放回,false:不放回
        第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
        第三个参数:随机数种子
 抽取数据放回(泊松算法)
 		val sampleRDD1: RDD[Int] = dataRDD.sample(true, 2)
 		第一个参数:抽取的数据是否放回,true:放回;false:不放回
        第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数
        第三个参数:随机数种子
代码演示
	val dataRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))
    val mapRdd: RDD[Int] = dataRDD.sample(false, 0.5)
    // 打印修改后的RDD中数据
     mapRdd.collect().foreach(println)
结果:
3
5
6
	val sc: SparkContext = new SparkContext(conf)
    val dataRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))
    val mapRdd: RDD[Int] = dataRDD.sample(true, 2)
结果:
1
1
1
1
1
3
3
3
4
5
5
5
5
5
6
6

7. distinct()去重

先看distinct函数
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
  }

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = withScope {
    distinct(partitions.length)
  }

功能说明

对内部的元素去重,distinct后会生成与原RDD分区个数不一致的分区数
上面的函数还可以对去重后的修改分区个数

代码演示
	val sc: SparkContext = new SparkContext(conf)
    val distinctRdd: RDD[Int] = sc.makeRDD(List(1,2,1,5,2,9,6,1))
    distinctRdd.distinct(2).collect().foreach(println)
结果
6
2
1
9
5
distinct()实现的源码
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
  }
也就是这个玩意:
 map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

8. coalesce()合并分区

先看coalesce函数
def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T]
功能说明

功能说明:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率
默认false不执行shuffle

代码演示
	val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4),4)
    val mapRdd: RDD[Int] = rdd.coalesce(2)
    mapRdd.mapPartitionsWithIndex{
      (index,values)=>values.map((index,_))
    }.collect().foreach(println)
结果
(0,1)
(0,2)
(1,3)
(1,4)

总结:无shuffle

设置2个分区后的结果:
(0,1) (0,2) (1,3) (1,4) 
设置3个分区后的结果:
(0,1) (1,2) (2,3) (2,4) 
设置4个或者以上
(0,1) (1,2) (2,3) (3,4) 

设置shuffle

	val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4),4)
    val mapRdd: RDD[Int] = rdd.coalesce(2,true)
    mapRdd.mapPartitionsWithIndex{
      (index,values)=>values.map((index,_))
    }.collect().foreach(println)
结果
(0,1)
(0,2)
(0,3)
(0,4)
设置true后开启shuffle
 
设置1 ,2后的结果
(0,1) (0,2) (0,3) (0,4) 
设置3后的结果
(0,1) (1,2) (1,3) (2,4) 
设置4后的结果
(3,1) (3,2) (3,3) (3,4) 
....

源码:

for (i <- 0 until maxPartitions) {
  val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
  val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
  (rangeStart until rangeEnd).foreach{ j => groupArr(i).partitions += prev.partitions(j) }
}
解释说明:
    maxPartitions:传进来的新分区数
    prev.partitions:之前RDD的分区数
 
分区i
    开始 = 分区号*前一个分区数 / 新的分区数
    结束 =( 分区号+1)*前一个分区数 / 新的分区数

9. repartition()重新分区(执行Shuffle)

先看repartition函数
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }
功能说明

该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。

代码演示
	val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8),4)
    val mapRdd: RDD[Int] = rdd.repartition(8)
    mapRdd.mapPartitionsWithIndex{
      (index,values) =>values.map((index,_))
    }.collect().foreach(println)
结果
(6,1)
(6,3)
(6,5)
(6,7)
(7,2)
(7,4)
(7,6)
(7,8)
coalesce和repartition对比与区别
  1. coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
  2. repartition实际上是调用的coalesce,进行shuffle。源码如下:
    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
    }
  3. coalesce一般为缩减分区,如果扩大分区,也不会增加分区总数,意义不大。
  4. repartition扩大分区执行shuffle,可以达到扩大分区的效果。

10. sortBy()排序

先看sortBy函数
def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
功能说明

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。

代码演示
	val rdd: RDD[Int] = sc.makeRDD(Array(1,2,5,3,6,2))
    val mapRdd = rdd.sortBy(item=>item) // 默认为true为正序,false为倒序
    // 打印修改后的RDD中数据
    mapRdd.collect().foreach(println)
结果
1
2
2
3
5
6
	val rdd: RDD[(Int,Int)] = sc.makeRDD(Array((5,2),(5,3),(6,2)))
    val mapRdd = rdd.sortBy(item=>item) // 默认为true为正序,false为倒序
    // 打印修改后的RDD中数据
    mapRdd.collect().foreach(println)
结果
(5,2)
(5,3)
(6,2)

单value看完了,点击下面看key-value类型的算子
Spark key-value类型算子总结(图解和源码)