重要概念

  • 架构

    • Standalone模式下才有MasterWorker的叫法。
    • Master:类似YARN中的ResourceManager。作用:①监控Worker,接收对Worker的注册;②接收Client提交的Application,调度等待的Application并向Worker提交。
    • Worker:类似YARN中的NodeManager,掌管着所在节点的资源信息。作用:①通过RegisterWorker注册到Master;②发送心跳信息;③根据Master发送的Application配置进程环境,启动ExecutorBackend(执行Task所需的临时进程)。
    • ExecutorSparkContext对象一旦连接到集群管理器,就可以获取到集群中每个节点上的Executor。Executor是一个进程(进程名:ExecutorBackend,运行在Work节点上),用来执行计算和为应用程序存储数据,然后Spark会发送应用程序代码(比如Jar包)到每个Ececutor,最后SparkContext对象发送任务到执行器开始执行程序。
    • Driver Program:运行应用程序的main()函数并创建SparkContext的线程。每个Spark应用程序都包括一个驱动程序,驱动程序负责把并行操作发布到集群中。Driver会将用户程序划分为不同的执行阶段,每个执行阶段由一组完全相同的Task组成,这些Task分别作用于待处理数据的不同分区。在阶段划分完成和Task创建后,Driver会向Executor发送Task。
  • 重要类:

    • SparkContext:负责连接到集群管理器。
  • RDD(弹性分布式数据集)

    • RDD类中的特点
      • 分区列表,即 一个RDD中有多个分区,每个分区中有多条数据,内部有方法getPartitions: Array[Partition],默认等于核心数
      • 切片
      • 依赖列表,每个RDD都会依赖于其他RDD,内部有方法getDependencies: Seq[Dependency[_]],窄依赖(NarrowDependency)分为OneToOneDependencyRangeDependency,宽依赖,没有对应的类,就只有ShuffleDependency。并且宽依赖一定会有shuffle阶段。
      • RDD是一个泛型类
  • 弹性:

    • 存储的弹性:内存和磁盘的自动切换
    • 容错的弹性:数据丢失可以自动恢复
    • 计算的弹性:计算出错重试机制,即出错时可能会切换Executor
    • 分片的弹性:可根据需要重新分片
      • 只读
    • RDD是只读的
    • RDD包括两类算子
      • transformation:将RDD进行转化,构建RDD的血缘关系
      • action:触发RDD的计算,得到结果或者保存文件系统,比如countcollectsavaAsTextFile
      • 依赖:RDDs通过操作算子开始进行转换,转换之后得到新的RDD包含从其他RDDs衍生所必须的信息,RDDs之间维护者这种血缘关系
      • 隐式转换:RDD的伴生类中有一些隐式转换,
  • 转换算子

    • 序列

      • 单值

        • mapmapPartitions的区别:map是对每个分区内的每个元素都执行,mapPartitions是对每个分区都执行一次
        • flatMap:一进多出,即flatMap中的方法返回的应该是一个可遍历的集合
        • glom:将分区中的所有元素合并成一个Array
      • 多值

        • zip:将两个RDD中索引相同的值合并为一个二元组
    • 键值对

      • 单值

        • partitionBy:重分区
        • reduceByKey:聚合算子,在shuffle之前聚合(即预聚合),并且分区内和分区间聚合逻辑相同
        • groupByKey:分组算子,
        • foldByKey:聚合算子,可以有0值,在分区内会先进行聚合,并且分区内和分区间聚合逻辑相同
        • aggregateByKey:聚合算子,三个参数,可以有0值,分区内调用func1,分区间调用func2
        • combineByKey:聚合算子,比aggregateByKey更加灵活
        • sortByKey:对key进行排序
        • mapValues:对value进行map操作
      • 多值

        • joinleftOuterJoinrightOuterJoin
  • 行动算子

    • collect
    • count
    • take
    • takeOrdered
    • countByKey
    • reduce:聚合,无0值,分区内和分区间相同
    • fold:聚合,有0值,分区内和分区间相同
    • aggregate:聚合,有0值,分区内和分区间不同
    • foreach:这个遍历是在executor上完成的,
  • 概念:

    • Application:一个Driver Program一个Application。基于Spark 构建的用户程序。 由集群上的驱动程序和执行程序组成。
    • Job:一个Application中,每个Job包含多个Stage,每个action会产生一个Job。由多个Task组成的并行计算,这些任务响应 Spark action而产生;你会在驱动程序的日志中看到这个术语。
    • Stage:每个宽依赖会产生一个新stage,也和分区器有关系(如果分区器相同,不产生新的Stage)。每个Job被划分为更小的task集,称为stage,彼此依赖(类似于MapReduce中的map和reduce阶段);您将在驱动程序日志中看到这个术语。
    • Task:一个Stage包含多个Task,Task是一个线程,是执行代码的最小单位,Task和分区数相等。将发送给一个Executor的工作单元去执行。
  • RDD缓存级别,聚合算子默认会进行缓存

  • 划分DAG:

    • 首先,根据依赖关系的不同将DAG划分为不同的阶段(Stage)。
      • 对于窄依赖,由于Partition依赖关系的确定性,Partition的转换处理就可以在同一个线程里完成,窄依赖被Spark划分到同一个执行阶段;
      • 对于宽依赖,由于Shuffle的存在,只能在parent RDD(s)Shuffle处理完成后,才能开始接下来的计算,因此宽依赖就是Spark划分Stage的依据,即Spark根据宽依赖将DAG划分为不同的Stage。
    • 在一个Stage内部,每个Partition都会被分配一个计算任务(Task),这些Task是可以并行执行的。
    • Stage之间根据依赖关系变成了一个大粒度的DAG,这个DAG的执行顺序也是从前向后的。也就是说,Stage只有在它没有parent Stage或者parent Stage都已经执行完成后,才可以执行。
    • 举例
      • 图片说明
      • 图片说明
  • 分区算法:

    • 代码

      private object ParallelCollectionRDD {
        /**
         * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
         * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
         * it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
         * is an inclusive Range, we use inclusive range for the last slice.
         */
        def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
          if (numSlices < 1) {
            throw new IllegalArgumentException("Positive number of slices required")
          }
          def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
            (0 until numSlices).iterator.map { i =>
              val start = ((i * length) / numSlices).toInt
              val end = (((i + 1) * length) / numSlices).toInt
              (start, end)
            }
          }
          seq match {
            case r: Range =>
              positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
                // If the range is inclusive, use inclusive range for the last slice
                if (r.isInclusive && index == numSlices - 1) {
                  new Range.Inclusive(r.start + start * r.step, r.end, r.step)
                }
                else {
                  new Range(r.start + start * r.step, r.start + end * r.step, r.step)
                }
              }.toSeq.asInstanceOf[Seq[Seq[T]]]
            case nr: NumericRange[_] =>
              // For ranges of Long, Double, BigInteger, etc
              val slices = new ArrayBuffer[Seq[T]](numSlices)
              var r = nr
              for ((start, end) <- positions(nr.length, numSlices)) {
                val sliceSize = end - start
                slices += r.take(sliceSize).asInstanceOf[Seq[T]]
                r = r.drop(sliceSize)
              }
              slices
              // 如果是一个Array,会将Array分割为多个Seq  
            case _ =>
              val array = seq.toArray 
              positions(array.length, numSlices).map { case (start, end) =>
                  array.slice(start, end).toSeq
              }.toSeq
          }
        }
      }