Transformation算子
基本的初始化
java

static SparkConf conf = null;
    static JavaSparkContext sc = null;
    static {
         conf = new SparkConf();
         conf.setMaster("local").setAppName("TestTransformation");
         sc = new JavaSparkContext(conf);
    }

scala

private val conf: SparkConf = new SparkConf().setAppName("TestTransformation").setMaster("local")
  private val sparkContext = new SparkContext(conf)

一、map、flatMap、mapParations、mapPartitionsWithIndex
1.1 map
使用Java8编写

public static void map(){
        String[] names = {"张无忌","赵敏","周芷若"};
        List<String> list = Arrays.asList(names);
        JavaRDD<String> listRDD = sc.parallelize(list);

        JavaRDD<String> nameRDD = listRDD.map(name -> {
            return "Hello " + name;
        });

        nameRDD.foreach(name -> System.out.println(name));

    }

使用scala进行编写

def map(): Unit ={
    val list = List("张无忌", "赵敏", "周芷若")
    val listRDD = sc.parallelize(list)
    val nameRDD = listRDD.map(name => "Hello " + name)
    nameRDD.foreach(name => println(name))
  }

(4) 运行结果
图片说明

(5) 总结

可以看出,对于map算子,源JavaRDD的每个元素都会进行计算,由于是依次进行传参,所以他是有序的,新RDD的元素顺序与源RDD是相同的。而由有序又引出接下来的flatMap。

1.2 flatMap
使用Java8进行编写

public static void flatMap(){
        List<String> list = Arrays.asList("张无忌 赵敏","宋青书 周芷若");
        JavaRDD<String> listRDD = sc.parallelize(list);

        JavaRDD<String> nameRDD = listRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
                .map(name -> "Hello " + name);

        nameRDD.foreach(name -> System.out.println(name));
    }

使用scala进行编写

def flatMap(): Unit ={
    val list = List("张无忌 赵敏","宋青书 周芷若")
    val listRDD = sc.parallelize(list)

    val nameRDD = listRDD.flatMap(line => line.split(" ")).map(name => "Hello " + name)
    nameRDD.foreach(name => println(name))
  }

(4) 运行结果
图片说明
(5) 总结
flatMap的特性决定了这个算子在对需要随时增加元素的时候十分好用,比如在对源RDD查漏补缺时。

map和flatMap都是依次进行参数传递的,但有时候需要RDD中的两个元素进行相应操作时(例如:算存款所得时,下一个月所得的利息是要原本金加上上一个月所得的本金的),这两个算子便无法达到目的了,这是便需要mapPartitions算子,他传参的方式是将整个RDD传入,然后将一个迭代器传出生成一个新的RDD,由于整个RDD都传入了,所以便能完成前面说的业务。

1.3 mapPartitions
使用Java8进行编写

public static void mapParations(){
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
        JavaRDD<Integer> listRDD = sc.parallelize(list, 2);

        listRDD.mapPartitions(iterator -> {
            ArrayList<String> array = new ArrayList<>();
            while (iterator.hasNext()){
                array.add("hello " + iterator.next());
            }
            return array.iterator();
        }).foreach(name -> System.out.println(name));
    }

使用scala进行编写

  def mapParations(): Unit ={
    val list = List(1,2,3,4,5,6)
    val listRDD = sc.parallelize(list,2)

    listRDD.mapPartitions(iterator => {
      val newList: ListBuffer[String] = ListBuffer()
      while (iterator.hasNext){
        newList.append("hello " + iterator.next())
      }
      newList.toIterator
    }).foreach(name => println(name))
  }

(4) 运行结果

图片说明

1.4 mapPartitionsWithIndex
每次获取和处理的就是一个分区的数据,并且知道处理的分区的分区号是啥?

使用Java8编写

public static void mapPartitionsWithIndex() {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
        JavaRDD<Integer> listRDD = sc.parallelize(list, 2);
        listRDD.mapPartitionsWithIndex((index,iterator) -> {
            ArrayList<String> list1 = new ArrayList<>();
            while (iterator.hasNext()){
                list1.add(index+"_"+iterator.next());
            }
            return list1.iterator();
        },true)
                .foreach(str -> System.out.println(str));
    }

使用scala编写

def mapPartitionsWithIndex(): Unit ={
    val list = List(1,2,3,4,5,6,7,8)
    sc.parallelize(list).mapPartitionsWithIndex((index,iterator) => {
      val listBuffer:ListBuffer[String] = new ListBuffer
      while (iterator.hasNext){
        listBuffer.append(index+"_"+iterator.next())
      }
      listBuffer.iterator
    },true)
      .foreach(println(_))
  }

(4)运行结果

图片说明

二、reduce、reduceByKey
2.1 reduce
reduce其实是讲RDD中的所有元素进行合并,当运行call方法时,会传入两个参数,在call方法中将两个参数合并后返回,而这个返回值回合一个新的RDD中的元素再次传入call方法中,继续合并,直到合并到只剩下一个元素时。
使用Java8编写

public static void reduce(){
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
        JavaRDD<Integer> listRDD = sc.parallelize(list);

        Integer result = listRDD.reduce((x, y) -> x + y);
        System.out.println(result);
    }

使用scala编写

 def reduce(): Unit ={
    val list = List(1,2,3,4,5,6)
    val listRDD = sc.parallelize(list)

    val result = listRDD.reduce((x,y) => x+y)
    println(result)
  }

(4)运行结果
图片说明
2.2 reduceByKey

reduceByKey仅将RDD中所有K,V对中K值相同的V进行合并。
使用Java8编写

public static void reduceByKey(){
        List<Tuple2<String, Integer>> list = Arrays.asList(
                new Tuple2<String, Integer>("武当", 99),
                new Tuple2<String, Integer>("少林", 97),
                new Tuple2<String, Integer>("武当", 89),
                new Tuple2<String, Integer>("少林", 77)
        );
        JavaPairRDD<String, Integer> listRDD = sc.parallelizePairs(list);

        JavaPairRDD<String, Integer> resultRDD = listRDD.reduceByKey((x, y) -> x + y);
        resultRDD.foreach(tuple -> System.out.println("门派: " + tuple._1 + "->" + tuple._2));
    }

使用scala编写

def reduceByKey(): Unit ={
    val list = List(("武当", 99), ("少林", 97), ("武当", 89), ("少林", 77))
    val mapRDD = sc.parallelize(list)

    val resultRDD = mapRDD.reduceByKey(_+_)
    resultRDD.foreach(tuple => println("门派: " + tuple._1 + "->" + tuple._2))
  }

(4)运行结果
图片说明

三、union,join和groupByKey **
**3.1 union

当要将两个RDD合并时,便要用到union和join,其中union只是简单的将两个RDD累加起来,可以看做List的addAll方法。就想List中一样,当使用union及join时,必须保证两个RDD的泛型是一致的。
使用Java8编写

public static void union(){
        final List<Integer> list1 = Arrays.asList(1, 2, 3, 4);
        final List<Integer> list2 = Arrays.asList(3, 4, 5, 6);
        final JavaRDD<Integer> rdd1 = sc.parallelize(list1);
        final JavaRDD<Integer> rdd2 = sc.parallelize(list2);

        rdd1.union(rdd2).foreach(num -> System.out.println(num));
    }

使用scala编写

def union(): Unit ={
    val list1 = List(1,2,3,4)
    val list2 = List(3,4,5,6)
    val rdd1 = sc.parallelize(list1)
    val rdd2 = sc.parallelize(list2)
    rdd1.union(rdd2).foreach(println(_))
  }

(4)运行结果
图片说明

3.2 groupByKey
union只是将两个RDD简单的累加在一起,而join则不一样,join类似于hadoop中的combin操作,只是少了排序这一段,再说join之前说说groupByKey,因为join可以理解为union与groupByKey的结合:groupBy是将RDD中的元素进行分组,组名是call方法中的返回值,而顾名思义groupByKey是将PairRDD中拥有相同key值得元素归为一组。即:
使用Java8编写

public static void groupByKey(){
        List<Tuple2<String,String>> list = Arrays.asList(
                new Tuple2("武当", "张三丰"),
                new Tuple2("峨眉", "灭绝师太"),
                new Tuple2("武当", "宋青书"),
                new Tuple2("峨眉", "周芷若")
        );
        JavaPairRDD<String, String> listRDD = sc.parallelizePairs(list);

        JavaPairRDD<String, Iterable<String>> groupByKeyRDD = listRDD.groupByKey();
        groupByKeyRDD.foreach(tuple -> {
            String menpai = tuple._1;
            Iterator<String> iterator = tuple._2.iterator();
            String people = "";
            while (iterator.hasNext()){
                people = people + iterator.next()+" ";
            }
            System.out.println("门派:"+menpai + "人员:"+people);
        });
    }

使用scala编写

def groupByKey(): Unit ={
    val list = List(("武当", "张三丰"), ("峨眉", "灭绝师太"), ("武当", "宋青书"), ("峨眉", "周芷若"))
    val listRDD = sc.parallelize(list)
    val groupByKeyRDD = listRDD.groupByKey()
    groupByKeyRDD.foreach(t => {
      val menpai = t._1
      val iterator = t._2.iterator
      var people = ""
      while (iterator.hasNext) people = people + iterator.next + " "
      println("门派:" + menpai + "人员:" + people)
    })
  }

(4)运行结果
图片说明

3.3 join
join是将两个PairRDD合并,并将有相同key的元素分为一组,可以理解为groupByKey和Union的结合
使用Java8编写

public static void join(){
        final List<Tuple2<Integer, String>> names = Arrays.asList(
                new Tuple2<Integer, String>(1, "东方不败"),
                new Tuple2<Integer, String>(2, "令狐冲"),
                new Tuple2<Integer, String>(3, "林平之")
        );
        final List<Tuple2<Integer, Integer>> scores = Arrays.asList(
                new Tuple2<Integer, Integer>(1, 99),
                new Tuple2<Integer, Integer>(2, 98),
                new Tuple2<Integer, Integer>(3, 97)
        );

        final JavaPairRDD<Integer, String> nemesrdd = sc.parallelizePairs(names);
        final JavaPairRDD<Integer, Integer> scoresrdd = sc.parallelizePairs(scores);

        final JavaPairRDD<Integer, Tuple2<String, Integer>> joinRDD = nemesrdd.join(scoresrdd);
        joinRDD.foreach(tuple -> System.out.println("学号:"+tuple._1+" 姓名:"+tuple._2._1+" 成绩:"+tuple._2._2));
    }

使用scala编写

def join(): Unit = {
    val list1 = List((1, "东方不败"), (2, "令狐冲"), (3, "林平之"))
    val list2 = List((1, 99), (2, 98), (3, 97))
    val list1RDD = sc.parallelize(list1)
    val list2RDD = sc.parallelize(list2)

    val joinRDD = list1RDD.join(list2RDD)
    joinRDD.foreach(t => println("学号:" + t._1 + " 姓名:" + t._2._1 + " 成绩:" + t._2._2))

  }

(4)运行结果

图片说明

四、sample、cartesian **
**4.1 sample

使用Java8编写

public static void sample(){
        ArrayList<Integer> list = new ArrayList<>();
        for(int i=1;i<=100;i++){
            list.add(i);
        }
        JavaRDD<Integer> listRDD = sc.parallelize(list);

        JavaRDD<Integer> sampleRDD = listRDD.sample(false, 0.1, 0);
        sampleRDD.foreach(num -> System.out.print(num + " "));
    }

使用scala编写

  def sample(): Unit ={
    val list = 1 to 100
    val listRDD = sc.parallelize(list)
    listRDD.sample(false,0.1,0).foreach(num => print(num + " "))
  }

(4)运行结果
图片说明

4.2 cartesian
cartesian是用于求笛卡尔积的
使用Java8编写

 public static void cartesian(){
        List<String> list1 = Arrays.asList("A", "B");
        List<Integer> list2 = Arrays.asList(1, 2, 3);
        JavaRDD<String> list1RDD = sc.parallelize(list1);
        JavaRDD<Integer> list2RDD = sc.parallelize(list2);
        list1RDD.cartesian(list2RDD).foreach(tuple -> System.out.print(tuple._1 + "->" + tuple._2));
    }

使用scala编写

def cartesian(): Unit ={
    val list1 = List("A","B")
    val list2 = List(1,2,3)
    val list1RDD = sc.parallelize(list1)
    val list2RDD = sc.parallelize(list2)
    list1RDD.cartesian(list2RDD).foreach(t => println(t._1 +"->"+t._2))
  }

(4)运行结果
图片说明

五、filter、distinct、intersection
5.1 filter
使用Java8编写

    public static void filter(){
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> listRDD = sc.parallelize(list);
        JavaRDD<Integer> filterRDD = listRDD.filter(num -> num % 2 ==0);
        filterRDD.foreach(num -> System.out.print(num + " "));
    }

使用scala编写

  def filter(): Unit ={
    val list = List(1,2,3,4,5,6,7,8,9,10)
    val listRDD = sc.parallelize(list)
    listRDD.filter(num => num % 2 ==0).foreach(print(_))
  }

(4)运行结果
图片说明

5.2 distinct
使用Java8编写

 public static void distinct(){
        List<Integer> list = Arrays.asList(1, 1, 2, 2, 3, 3, 4, 5);
        JavaRDD<Integer> listRDD  = (JavaRDD<Integer>) sc.parallelize(list);
        listRDD.distinct().foreach(num -> System.out.println(num));
    }

使用scala编写

 def distinct(): Unit ={
    val list = List(1,1,2,2,3,3,4,5)
    sc.parallelize(list).distinct().foreach(println(_))
  }

(4)运行结果
图片说明
5.3 intersection

使用Java8编写

 public static void intersection() {
        List<Integer> list1 = Arrays.asList(1, 2, 3, 4);
        List<Integer> list2 = Arrays.asList(3, 4, 5, 6);
        JavaRDD<Integer> list1RDD = sc.parallelize(list1);
        JavaRDD<Integer> list2RDD = sc.parallelize(list2);
        list1RDD.intersection(list2RDD).foreach(num ->System.out.println(num));
    }

使用scala编写

def intersection(): Unit ={
    val list1 = List(1,2,3,4)
    val list2 = List(3,4,5,6)
    val list1RDD = sc.parallelize(list1)
    val list2RDD = sc.parallelize(list2)
    list1RDD.intersection(list2RDD).foreach(println(_))
  }

(4)运行结果
图片说明

六、coalesce、repartition、repartitionAndSortWithinPartitions
6.1 coalesce
分区数由多 -》 变少

使用Java8编写

public static void coalesce() {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        JavaRDD<Integer> listRDD = sc.parallelize(list, 3);
        listRDD.coalesce(1).foreach(num -> System.out.println(num));
    }

使用scala编写

def coalesce(): Unit = {
    val list = List(1,2,3,4,5,6,7,8,9)
    sc.parallelize(list,3).coalesce(1).foreach(println(_))
  }

(4)运行结果
图片说明

6.2 replication
进行重分区,解决的问题:本来分区数少 -》 增加分区数
使用Java8编写

public static void replication(){
        List<Integer> list = Arrays.asList(1, 2, 3, 4);
        JavaRDD<Integer> listRDD = sc.parallelize(list, 1);
        listRDD.repartition(2).foreach(num -> System.out.println(num));
    }

使用scala编写

def replication(): Unit ={
    val list = List(1,2,3,4)
    val listRDD = sc.parallelize(list,1)
    listRDD.repartition(2).foreach(println(_))
  }

(4)运行结果
图片说明

6.3 repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。
使用Java8编写

public static void repartitionAndSortWithinPartitions(){
        List<Integer> list = Arrays.asList(1, 4, 55, 66, 33, 48, 23);
        JavaRDD<Integer> listRDD = sc.parallelize(list, 1);
        JavaPairRDD<Integer, Integer> pairRDD = listRDD.mapToPair(num -> new Tuple2<>(num, num));
        pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(2))
                .mapPartitionsWithIndex((index,iterator) -> {
                    ArrayList<String> list1 = new ArrayList<>();
                    while (iterator.hasNext()){
                        list1.add(index+"_"+iterator.next());
                    }
                    return list1.iterator();
                },false)
                .foreach(str -> System.out.println(str));
    }

使用scala编写

def repartitionAndSortWithinPartitions(): Unit ={
    val list = List(1, 4, 55, 66, 33, 48, 23)
    val listRDD = sc.parallelize(list,1)
    listRDD.map(num => (num,num))
      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
      .mapPartitionsWithIndex((index,iterator) => {
        val listBuffer: ListBuffer[String] = new ListBuffer
        while (iterator.hasNext) {
          listBuffer.append(index + "_" + iterator.next())
        }
        listBuffer.iterator
      },false)
      .foreach(println(_))

  }

(4)运行结果
图片说明

七、cogroup、sortBykey、aggregateByKey
7.1 cogroup
对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。
使用Java8编写
public static void cogroup(){

        List<Tuple2<Integer, String>> list1 = Arrays.asList(
                new Tuple2<Integer, String>(1, "www"),
                new Tuple2<Integer, String>(2, "bbs")
        );

        List<Tuple2<Integer, String>> list2 = Arrays.asList(
                new Tuple2<Integer, String>(1, "cnblog"),
                new Tuple2<Integer, String>(2, "cnblog"),
                new Tuple2<Integer, String>(3, "very")
        );

        List<Tuple2<Integer, String>> list3 = Arrays.asList(
                new Tuple2<Integer, String>(1, "com"),
                new Tuple2<Integer, String>(2, "com"),
                new Tuple2<Integer, String>(3, "good")
        );

        JavaPairRDD<Integer, String> list1RDD = sc.parallelizePairs(list1);
        JavaPairRDD<Integer, String> list2RDD = sc.parallelizePairs(list2);
        JavaPairRDD<Integer, String> list3RDD = sc.parallelizePairs(list3);

        list1RDD.cogroup(list2RDD,list3RDD).foreach(tuple ->
                System.out.println(tuple._1+" " +tuple._2._1() +" "+tuple._2._2()+" "+tuple._2._3()));
    }

使用scala编写

def cogroup(): Unit ={
    val list1 = List((1, "www"), (2, "bbs"))
    val list2 = List((1, "cnblog"), (2, "cnblog"), (3, "very"))
    val list3 = List((1, "com"), (2, "com"), (3, "good"))

    val list1RDD = sc.parallelize(list1)
    val list2RDD = sc.parallelize(list2)
    val list3RDD = sc.parallelize(list3)

    list1RDD.cogroup(list2RDD,list3RDD).foreach(tuple =>
      println(tuple._1 + " " + tuple._2._1 + " " + tuple._2._2 + " " + tuple._2._3))
  }

(4)运行结果
图片说明

7.2 sortBykey
sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,实现如下

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
    : RDD[(K, V)] =
{
  val part = new RangePartitioner(numPartitions, self, ascending)
  new ShuffledRDD[K, V, V](self, part)
    .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。下面对sortByKey的使用进行说明:
使用Java8编写

public static void sortByKey(){
        List<Tuple2<Integer, String>> list = Arrays.asList(
                new Tuple2<>(99, "张三丰"),
                new Tuple2<>(96, "东方不败"),
                new Tuple2<>(66, "林平之"),
                new Tuple2<>(98, "聂风")
        );
        JavaPairRDD<Integer, String> listRDD = sc.parallelizePairs(list);
        listRDD.sortByKey(false).foreach(tuple ->System.out.println(tuple._2+"->"+tuple._1));
    }

使用scala编写

def sortByKey(): Unit ={
    val list = List((99, "张三丰"), (96, "东方不败"), (66, "林平之"), (98, "聂风"))
    sc.parallelize(list).sortByKey(false).foreach(tuple => println(tuple._2 + "->" + tuple._1))
  }

(4)运行结果
图片说明

7.3 aggregateByKey
aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。
使用Java8编写

public static void aggregateByKey() {
        List<String> list = Arrays.asList("you,jump", "i,jump");
        JavaRDD<String> listRDD = sc.parallelize(list);
        listRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator())
                .mapToPair(word -> new Tuple2<>(word,1))
                .aggregateByKey(0,(x,y)-> x+y,(m,n) -> m+n)
                .foreach(tuple -> System.out.println(tuple._1+"->"+tuple._2));
    }

使用scala编写

def aggregateByKey(): Unit ={
    val list = List("you,jump", "i,jump")
    sc.parallelize(list)
      .flatMap(_.split(","))
      .map((_, 1))
      .aggregateByKey(0)(_+_,_+_)
      .foreach(tuple =>println(tuple._1+"->"+tuple._2))
  }

(4)运行结果
图片说明

转发自:https://www.cnblogs.com/qingyunzong/p/8922135.html