源码包: org.apache.spark.rdd

def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[(K, V)] = null): RDD[(K, V)]

Return a new RDD that is reduced into numPartitions partitions.

This results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions is requested, it will stay at the current number of partitions.

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

译文:

        返回一个经过简化到numPartitions个分区的新RDD。这会导致一个窄依赖,例如:你将1000个分区转换成100个分区,这个过程不会发生shuffle,相反如果10个分区转换成100个分区将会发生shuffle。然而如果你想大幅度合并分区,例如合并成一个分区,这会导致你的计算在少数几个集群节点上计算(言外之意:并行度不够)。为了避免这种情况,你可以将第二个shuffle参数传递一个true,这样会在重新分区过程中多一步shuffle,这意味着上游的分区可以并行运行。

 

注意:

        第二个参数shuffle=true,将会产生多于之前的分区数目,例如你有一个个数较少的分区,假如是100,调用coalesce(1000, shuffle = true)将会使用一个  HashPartitioner产生1000个分区分布在集群节点上。这个(对于提高并行度)是非常有用的。

def repartition(numPartitions: Int)(implicit ord: Ordering[(K, V)] = null): RDD[(K, V)]

Return a new RDD that has exactly numPartitions partitions.

Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data.

If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle.

TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.

译文:

        返回一个恰好有numPartitions个分区的RDD,可以增加或者减少此RDD的并行度。内部,这将使用shuffle重新分布数据,如果你减少分区数,考虑使用coalesce,这样可以避免执行shuffle

参考:https://blog.csdn.net/dax1n/article/details/53431373 

coalescerepartition:重分区
    (*)都是重分区
    (*)区别:coalesce 默认不会进行shuffle(false)
                       repartition 就会进行shuffle
                   
    (*)举例:
             val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
             查看分区个数:rdd1.partitions.length
             重新分区: val rdd2 = rdd1.repartition(3)
                       val rdd3 = rdd1.coalesce(3,false)  --->  分区数:2
                       val rdd4 = rdd1.coalesce(3,true)   --->  分区数:3