Spark key-value类型算子

1. partitionBy()按照K重新分区

先看partitionBy函数
 def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
  }
功能说明

将RDD[K,V]中的K按照指定Partitioner重新进行分区;
如果原有的RDD和新的RDD是一致的话就不进行分区,否则会产生Shuffle过程

图解
代码演示(按HashPartitioner分区)
	val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
    val mapRdd: RDD[(Int, String)] = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
    mapRdd.mapPartitionsWithIndex{
      (index,values)=>values.map((index,_))
    }.collect().foreach(println)
结果
(0,(2,bbb))
(1,(1,aaa))
(1,(3,ccc))
自定义分区规则

要实现自定义分区器,需要继承org.apache.spark.Partitioner类,并实现下面三个方法。
(1)numPartitions: Int:返回创建出来的分区数。
(2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。
(3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同

// main方法
	val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
    val mapRdd: RDD[(Int, String)] = rdd.partitionBy(new MyPartition(2))
    mapRdd.mapPartitionsWithIndex{
      (index,values)=>values.map((index,_))
    }.collect().foreach(println)
// 主要代码
class MyPartition(num:Int) extends Partitioner{
    override def numPartitions: Int = num
    override def getPartition(key: Any): Int = {
      if(key.isInstanceOf[Int]){
        val i: Int = key.asInstanceOf[Int]
        if(i%2==0){
          0
        }else{
          1
        }
      }else{
        0
      }
    }
  }
结果
(0,(2,bbb))
(1,(1,aaa))
(1,(3,ccc))

2. reduceByKey()按照K聚合V

先看reduceByKey函数
 def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }
  
	def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
功能说明

该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数

代码演示
	val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
    val mapRdd: RDD[(String, Int)] = rdd.reduceByKey((v1,v2)=>v1+v2)
    // 打印修改后的RDD中数据
     mapRdd.collect().foreach(println)
结果
(a,6)
(b,7)

3. groupByKey()按照K重新分组

先看groupByKey函数
def groupByKey(): RDD[(K, Iterable[V])]

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(new HashPartitioner(numPartitions))
  }
功能说明

groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。
该操作可以指定分区器或者分区数(默认使用HashPartitioner)

代码演示
 	val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
    val mapRdd: RDD[(String, Iterable[Int])] = rdd.groupByKey(2)
    // 打印修改后的RDD中数据
    mapRdd.mapPartitionsWithIndex{
   
      (index,values)=>values.map((index,_))
    }.collect().foreach(println)
结果
(0,(b,CompactBuffer(5, 2)))
(1,(a,CompactBuffer(1, 5)))

4. aggregateByKey()按照K处理分区内和分区间逻辑

先看aggregateByKey函数
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
   
    aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
  }
功能说明

(1)zeroValue(初始值):给每一个分区中的每一种key一个初始值;
这个初始值的理解:
如何求最大值,所有的值为正数,设置为0,如果有负值,设置为 负无穷
这个初始值就是与第一个值进行比较,保证第一次对比下去
(2)seqOp(分区内):函数用于在每一个分区中用初始值逐步迭代value;
(3)combOp(分区间):函数用于合并每个分区中的结果。

代码演示
	val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
    val mapRdd: RDD[(String, Int)] = rdd.aggregateByKey(0)(math.max(_,_),_+_)
结果
(b,3)
(a,3)
(c,12)

5. foldByKey()分区内和分区间相同的aggregateByKey()

先看foldByKey函数
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
   
    aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
  }
功能说明

参数zeroValue:是一个初始化值,它可以是任意类型
参数func:是一个函数,两个输入参数相同

代码演示
 	val list = List(("a",1),("a",1),("a",1),("b",1),("b",1),("b",1),("b",1),("a",1))
    val rdd = sc.makeRDD(list,2)
    rdd.foldByKey(0)(_+_).collect().foreach(println)
结果
(b,4)
(a,4)

6. combineByKey()转换结构后分区内和分区间操作

先看combineByKey函数
def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
   
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
  }

功能说明

(1)createCombiner(转换数据的结构): combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值
(2)mergeValue(分区内): 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并
(3)mergeCombiners(分区间): 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的 mergeCombiners()方法将各个分区的结果进行合并。

针对相同K,将V合并成一个集合

代码演示(计算每个key出现的次数以及可以对应值的总和,再相除得到结果)
 	val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
    val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
    input.combineByKey(
      (_,1),
      (acc:(Int,Int),v)=>(acc._1+v,acc._2+1),
      (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
    ).map(item=>item._2._1/item._2._2.toDouble).collect().foreach(println)
结果
95.33333333333333
91.33333333333333
reduceByKey、foldByKey、aggregateByKey、combineByKey对比

reduceByKey的底层源码

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
   
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

第一个初始值不变,也即不用直接给出初始值,分区内和分区间的函数相同

foldByKey的底层源码

combineByKeyWithClassTag[V]((v: V) => 
cleanedFunc(createZero(), v),
cleanedFunc, 
cleanedFunc, 
partitioner)

初始值和分区内和分区间的函数(逻辑)相同

aggregateByKey的底层源码

 combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp, 
combOp, 
partitioner)

初始值和分区内处理逻辑一致
分区内和分区间的函数(逻辑)不相同

combineByKey的底层源码

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
   
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
  }

把第个值变成特定的结构

7. sortByKey()排序

先看sortByKey函数
def sortByKey(
ascending: Boolean = true, 
numPartitions: Int = self.partitions.length)
      : RDD[(K, V)] = self.withScope
  {
   
    val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K, V, V](self, part)
      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
  }

功能说明

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
ascending: Boolean = true 默认为升序
false为降序

代码演示(
	val rdd: RDD[(Int, String)] = sc.makeRDD(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
    val mapRdd: RDD[(Int, String)] = rdd.sortByKey()
    // 打印修改后的RDD中数据
     mapRdd.collect().foreach(println)
(1,dd)
(2,bb)
(3,aa)
(6,cc)

8. mapValues()只对V进行操作

先看mapValues函数
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
   
    val cleanF = self.context.clean(f)
    new MapPartitionsRDD[(K, U), (K, V)](self,
      (context, pid, iter) => iter.map {
    case (k, v) => (k, cleanF(v)) },
      preservesPartitioning = true)
  }

功能说明

针对于(K,V)形式的类型只对V进行操作

代码演示(
	val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (1, "d"), (2, "b"), (3, "c")))
    val mapRdd: RDD[(Int, String)] = rdd.mapValues(_+">>>>")
    // 打印修改后的RDD中数据
     mapRdd.collect().foreach(println)
结果
(1,a>>>>)
(1,d>>>>)
(2,b>>>>)
(3,c>>>>)

9. join()连接

先看join函数
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
   
    join(other, defaultPartitioner(self, other))
  }

功能说明

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

代码演示(
	val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))
    val mapRdd: RDD[(Int, (String, Int))] = rdd.join(rdd1)
    // 打印修改后的RDD中数据
     mapRdd.collect().foreach(println)
结果
(1,(a,4))
(2,(b,5))

10. cogroup()联合

先看cogroup函数
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
   
    cogroup(other, defaultPartitioner(self, other))
  }

功能说明

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

代码演示(
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"a"),(1,"b"),(3,"c")))
    val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1,4),(2,5),(3,6)))
    rdd.cogroup(rdd1).collect().foreach(println)
结果
(1,(CompactBuffer(a, b),CompactBuffer(4)))
(2,(CompactBuffer(),CompactBuffer(5)))
(3,(CompactBuffer(c),CompactBuffer(6)))