推荐
Spark重点难点知识总结(一)https://blog.csdn.net/shuhaojie/article/details/75123953
Spark重点难点知识总结(二)https://blog.csdn.net/shuhaojie/article/details/74205393


Spark2.x和Spark1.x版本的区别
1、Spark2.x实现了对Spark SQL和Hive SQL操作API的统一
2、Spark2.x引入了SparkSession的概念,提供了一个统一的切入口来使用Spark的各项功能,统一了旧的SQLContext和HiveContext
3、统一了DataFrame和DataSets的API
4、Spark Streaming基于Spark SQL构建了high-level API,使得Spark Streaming更好的受益于Spark SQL的易用性和性能提升


Q:DataFrame与RDD的主要区别在于?
A:DataFrame带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得SparkSQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。
反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
Q:DataFrame 特性
1、支持从KB到PB级的数据量
2、支持多种数据格式和多种存储系统
3、通过Catalyst优化器进行先进的优化生成代码
4、通过Spark无缝集成主流大数据工具与基础设施
5、API支持Python、Java、Scala和R语言
Q:RDD,全称为?
A:Resilient Distributed Datasets,意为容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。同时,RDD还提供了一组丰富的操作来操作这些数据。
Q:RDD有什么特点?
A:它是在集群节点上的不可变的、已分区的集合对象。
通过并行转换的方式来创建如(map, filter, join, etc)。
失败自动重建。
可以控制存储级别(内存、磁盘等)来进行重用。
必须是可序列化的。是静态类型的。


Spark是根据shuffle类算子来进行stage的划分。
如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个Task可能都会从上一个stage的Task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。

spark中会导致shuffle操作的有以下几种算子、
1、repartition类的操作:比如repartition、repartitionAndSortWithinPartitions、coalesce等
2、byKey类的操作:比如reduceByKey、groupByKey、sortByKey等
3、join类的操作:比如join、cogroup等

重分区: 一般会shuffle,因为需要在整个集群中,对之前所有的分区的数据进行随机,均匀的打乱,然后把数据放入下游新的指定数量的分区内
byKey类的操作:因为你要对一个key,进行聚合操作,那么肯定要保证集群中,所有节点上的,相同的key,一定是到同一个节点上进行处理
join类的操作:两个rdd进行join,就必须将相同join
key的数据,shuffle到同一个节点上,然后进行相同key的两个rdd数据的笛卡尔乘积


https://blog.csdn.net/qq_39394264/article/details/90404851
Spark知识点总结导航


spark知识点总结
绪论
1、Spark简介
2、RDD(弹性分布式数据集)(重点)
3、Spark在集群中大概运行流程
4、提交Application的方式
5、搭建及测试集群
6、Spark的任务调度
7、Spark的资源调度
8、Spark的(任务调度+资源调度)整合
9、Spark Shuffer
10、Spark SQL
11、Spark案例一(Scala)
12、Spark案例二(Java)
绪论
  Spark 则是加州大学伯克利分校AMP实验室所开源的类Hadoop MapReduce的通用并行框架, 专门用于大数据量下的迭代式计算。是为了跟 Hadoop 配合而开发出来的,不是为了取代 Hadoop, Spark 运算比 Hadoop 的 MapReduce 框架快的原因是因为 Hadoop 在一次 MapReduce 运算之后,会将数据的运算结果从内存写入到磁盘中,第二次 Mapredue 运算时在从磁盘中读取数据,所以其瓶颈在2次运算间的多余 IO 消耗. Spark 则是将数据一直缓存在内存中,直到计算得到最后的结果,再将结果写入到磁盘,所以多次运算的情况下, Spark 是比较快的. 其优化了迭代式工作负载。

1、Spark简介
 1)、Spark的历史:2012年发布初始版本0.6版本,已经有6年的历史了。
 2)、Spark的创始人:美国加州大学的伯克利分校的AMP实验室。
 3)、Spark比MR快的原因:
   ①Spark是粗粒度的资源调度,资源复用。
   ②Spark支持基于内存迭代,MR不支持。
   ③Spark支持DAG有向无环图 task pipleline。
   ④Spark可以根据不同场景选择不同shuffle,spark shuffle 比MR性能高(sortShuffle)

 4)、AMP数据分析栈:

5)、spark的运行模式:local、standalone、yarn、mesos。
6)、开发Spark的语言:scala、java、python、R。(Scala和Java兼容性和效率都是一样的)

2、RDD(弹性分布式数据集)(重点)
1)、RDD五大特性:(重点)
1. RDD是由一系列的Paratition组成的。(partition个数=split切片数 约等于 block数;Spark没有读文件的方法,依赖MR读文件的方法)
2. RDD提供的每一个算子实际上是作用在每一个Paratition上的。
3. RDD实际上是有一系列的依赖关系的,依赖于其他的RDD。(计算的容错性;体现了RDD的弹性;父RDD不一定知道子RDD是谁,子RDD一定知道父RDD是谁)
4. 可选:分区器作用在内部计算逻辑的返回值是kv格式的RDD上。
5. 可选:RDD会提供一系列的最佳计算位置。(计算找数据)

2)、算子
     1. taransformation类算子
         map(一对一)、flatMap(一对多)、filter(一对N(0、1))、join、leftouterJoin、rightouterJoin、fullouterJoin、sortBy、sortByKey、gorupBy、groupByKey、reduceBy、reduceByKey、sample、union、mappatition、mappatitionwithindex、zip、zipWithIndex。

     2. action类算子
        count、collect(将task的计算结果拉回到Driver端)、foreach(不会回收所有task计算结果,原理:将用户传入的参数推送到各个节点上去执行,只能去计算节点找结果)、saveAsTextFile(path)、reduce、foreachPatition、take、first。

(查看计算结果的方式:WEBUI、去各个节点的Worker工作目录查看)

     3. 控制类算子
        cache(相当于MEMOORY_ONLY)、
        persist(MEMORY_ONLY、DISK_ONLY、MEMORY_AND_DISK)
        控制类算子注意点:
            1)控制类算子后不能紧跟action类算子
            2)缓存单元是partition
            3)懒执行、需要action类算子触发执行。(如果application中只有一个job,没必要使用控制类算子)

3、Spark在集群中大概运行流程
1. Driver分发task到节点运行(计算找数据)。
2. task执行结果拉回到Driver(有可能发生OOM)。
Driver的作用:
1)、分发任务到计算节点运行。
2)、监控task(thread)的运行情况。
3)、如果task失败,会重新发送(有限制)。
4)、可以拉回结果到Driver进程。
结论:Driver进程会和集群频繁通信。

4、提交Application的方式
1、Client
提交方式:spark-submit --deploy-mode client --class jarPath args
特点:Driver进程在客户端节点启动
适用场景:测试环境
大概运行流程:
1)、在Client本地启动Driver进程。
2)、Driver会向Master为当前Application申请资源。
3)、Master接收到请求后,会在资源充足的节点上启动Executor进程。
4)、Driver分发task到Executor执行。
2、Cluster
提交方式:spark-submit --deploy-mode cluster --class jarPath args
特点:每次启动application,Driver进程在随机一台节点启动
适用场景:生产环境
大概运行流程:
1)、客户端执行spark-submit --deploy-mode cluster --class jarPath args命令,启动一个sparksubmit进程。
2)、为Driver向Master申请资源。Driver进程默认需要1G内存,1core。
3)、master会随机找一台Worker节点启动Driver进程。
4)、Driver进程启动成功后,spark-submit进程关闭,然后Driver会向Master为当前Application申请资源。
5)、Master接收到请求后,会在资源充足的节点上启动Executor进程。
6)、Driver分发task到Executor执行。

5、搭建及测试集群
  由于Spark的执行效率要比MapReduce快的多,所以我们有必要搭建Spark集群。搭建详细过程请参考我的另一篇博客:Spark集群的搭建及测试。

6、Spark的任务调度
  在Spark中一切的任务都是由Driver申请资源,然后在申请到资源的worker节点启动Executor进程。最后Driver将任务分发到这些Executor进程中执行。详情请参考我的另一篇博客:Spark的任务调度。

7、Spark的资源调度
  有任务调度自然也会有资源的调度,任务的运行离不开资源。一个Application想要运行,首先要向Master为Driver申请资源,然后Driver向Master为Application申请资源。详情请参考我的另一篇博客:Spark的资源调度。

8、Spark的(任务调度+资源调度)整合
  上面虽然分开介绍了任务调度与资源调度。但是资源调度和任务调度在计算机中是密不可分的,详情请参考我的另一篇博客:Spark的调度流程(任务调度+资源调度)。

9、Spark Shuffer
  与MapReduce一样,在Spark中也有shuffle。Sparkshuffle包含hashShuffle和sortShuffle。详情请参考我的另一篇博客:Spark Shuffer。
在Spark中Shuffle的目的是为了保证每一个key所对应的value都会汇聚到同一个节点上去处理和聚合。
https://blog.csdn.net/qq_33247435/article/details/84140659
Driver 跟 Executor 进行网络传输,另一方面是Task要从 Driver 抓取其他上游的 Task 的数据结果,所以有这个过程中就不断的产生网络结果。其中,下一个 Stage 向上一个 Stage 要数据这个过程,我们就称之为 Shuffle。

10、Spark SQL
  详情请参考我的另一篇博客:Spark SQL。

11、Spark案例一(Scala)
  学完了Spark的理论知识,下面介绍一下在代码中如何应用Spark。由于Scala语言编写Spark程序比较简单,这里首先用Scala语言写一个Scala小案例。详情请参考我的另一篇博客:Spark日志分析案例。

12、Spark案例二(Java)
  虽然Scala语言写Spark比较简单,但是我们平时大部分还是使用的Java比较多,这里也使用Java语言来写了一个小案例。详情请参考我的另一篇博客:Java操作Spark简单案例《好友推荐》。


MapReduce Shuffle 和 Spark Shuffle 区别看这篇就够了https://blog.csdn.net/b6ecl1k7BS8O/article/details/90167129
Shuffle简介
Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。而在MapReduce中,Shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便reduce端接收处理。其在MapReduce中所处的工作阶段是map输出后到reduce接收前,具体可以分为map端和reduce端前后两个部分。
在shuffle之前,也就是在map阶段,MapReduce会对要处理的数据进行分片(split)操作,为每一个分片分配一个MapTask任务。接下来map会对每一个分片中的每一行数据进行处理得到键值对(key,value)此时得到的键值对又叫做“中间结果”。此后便进入reduce阶段,由此可以看出Shuffle阶段的作用是处理“中间结果”。
由于Shuffle涉及到了磁盘的读写和网络的传输,因此Shuffle性能的高低直接影响到了整个程序的运行效率。

Spark与MapReduce Shuffle的异同
1.从整体功能上看,两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce(Spark 里可能是后续的一系列操作)。
2.从流程的上看,两者差别不小。 Hadoop MapReduce 是 sort-based,进入 combine和 reduce的 records 必须先 sort。这样的好处在于 combine/reduce可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。以前 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行合并,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey的操作。在Spark 1.2之后,sort-based变为默认的Shuffle实现。
3.从流程实现角度来看,两者也有不少差别。 Hadoop MapReduce 将处理流程划分出明显的几个阶段:map, spill, merge, shuffle, sort, reduce等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation,所以 spill, merge, aggregate 等操作需要蕴含在 transformation中。

一、MapReduce Shuffle
Hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心。
shuffle的主要工作是从Map结束到Reduce开始之间的过程。shuffle阶段又可以分为Map端的shuffle和Reduce端的shuffle。

1.Map端的shuffle
2.Reduce端的shuffle
当mapreduce任务提交后,reduce task就不断通过RPC从JobTracker那里获取map task是否完成的信息,如果获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程就开始启动。Reduce端的shuffle主要包括三个阶段,copy、merge和reduce。

二、Spark的Shuffle
是在MapReduce Shuffle基础上进行的调优。其实就是对排序、合并逻辑做了一些优化。在Spark中Shuffle write相当于MapReduce 的map,Shuffle read相当于MapReduce 的reduce。

Spark Shuffle笔记整理
一、Spark Shuffle介绍?
1、Shuffle—>“洗牌”,
在Spark中Shuffle的目的是为了保证每一个key所对应的value都会汇聚到同一个节点上去处理和聚合。
2、Spark中,什么情况下会发生shuffle?
reduceByKey、groupByKey、sortByKey、countByKey、join等操作。
3、Spark中的Shuffle包括两种:
HashShuffle
SortShuffle
二、HashShuffle运行原理
hashShuffle的运行机制分成两种:
普通运行机制:
Shuffle Write阶段:
由于Stage后面紧跟了另一个Stage,所以数据落地会发生在Shuffle Write阶段。为了将分区中数据相同的key写入一个分区文件中,需要将task计算结果的key的hashcode值与Reduce task个数取模,从而确定将结果写入哪个分区文件中,这样即可保证相同的key在一个分区文件中。为了加快向磁盘写文件的速度,需要事先设置一个buffer作为缓存,每个buffer的大小是32K。
Shuffle Red阶段:
Reduce task从上个Stage的task节点中拉取属于自己的分区文件,这样即可保证每一个Key所对应的Value都会在同一个节点上。拉取的过程属于现拉现用。

合并机制:
倘若Executor只有一个core,所以每次只能有一个task执行,当第一个task执行完成后,第二个task会复用第一个task所创建的buffer和磁盘文件,从而减少磁盘文件的个数。

三、SortShuffle运行原理
SortShuffle的运行机制也分成两种:

1.普通运行机制:

由于Spark只是一个计算框架,没有办法严格控制Executor内存,只能采取监控的方式去监控内存的情况,内存数据初始值的大小大约默认为5M,当超出5M的时候,例如5.02M,监控内存数据的对象就会再去申请5.02*2-5=5.04M内存,如果申请到内存就不必进行溢写,否则再进行溢写。

Shuffle Write阶段:
溢写到磁盘的过程与MapReduce的ShuffleWrite阶段一样。
区别:
内存数据初始值是5M大小,它可以申请扩大,而MapReduce中的Buffer是固定的100M。
溢写除了生成磁盘文件,还有索引文件,索引文件是对磁盘文件的描述,还有记录每个分区的起始位置以及终止位置。

Shuffle Red阶段:
Reduce Task首先会解读索引文件,然后拉取相应分区的数据,然后再处理。

bypass运行机制

bypass运行机制与普通运行机制相比,缺少了忘内存数据存放、排序。除此之外,其余流程一样。
bypass运行机制的触发条件:
shuffle reduce task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。

四、Shuffle过程中,磁盘小文件寻址问题
1、Map task执行完毕后,将自己的执行结果信息(比如:磁盘小文件的位置、最终执行状态信息)封装到mapstatus中,然后调用本进程中的MapOutputTrackerWorker,将mapstatus对象发送给Driver中的MapOutputTrackerWorker。
2、Reduce task会调用自己进程中的MapOutputTrackerWorker对象去向MapOutputTrackerWorker获取磁盘小文件的位置信息,然后将位置信息给BlockManagerSlave去拉取磁盘小文件。(默认会启动5个子线程去拉取数据,但5个子线程总共拉取的数据量不能超过64M)。拉来的数据会存在shuffle聚合内存中(20%x80%),以供Reduce task计算。
注:Reduce task中的计算模式也是pipeline管道计算模式。

五、Shuffle可能面临的问题
由HashShuffle的流程图可知:磁盘小文件(分区文件)的个数=m(map task num)*r(reduce task num)
当磁盘小文件过多时,带来的问题有:

write阶段会创建大量写文件的对象
read阶段拉取数据需要进行多次网络传输
read阶段会创建大量读文件的对象
读写对象过多造成JVM内存不足,从而导致内存溢出

六、Executor的内存管理
Excutor内存管理方式有两种:

1.静态的内存管理:
unroll:用来进行反序列化。
两块防止OOM的内存,可以供JVM使用

2.统一的内存管理:
统一内存管理中,75%中的两块内存可以互相借用,但是有最大限度。

七、Shuffle调优
1、spark.shuffle.file.buffer
默认值:32k
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

2、spark.reducer.maxSizeInFlight
默认值:48M
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

3、spark.shuffle.io.maxRetries
默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
shuffle file not find taskScheduler不负责重试task,由DAGScheduler负责重试stage

4、spark.shuffle.io.retryWait
默认值:5s
参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

5、spark.shuffle.memoryFraction
默认值:0.2
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

6、spark.shuffle.manager
默认值:sort
参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

7、spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

问题思考
Application在执行的过程中,出现了类似reduce OOM的错误原因以及解决办法?
三种解决方案:
1、提高Executor的内存
2、提高shuffle聚合的内存比例
3、减少每次拉取的数据量
原文链接:https://blog.csdn.net/qq_40262690/article/details/84202599


Spark-Core常用算子总结(干货)
前段时间又重新回顾了一下Spark的知识,真的是 蓦然回首,那人却在,灯火阑珊处,第一遍迷迷糊糊的走马观花了一遍,一点感觉都没有,第二遍,第三遍,就会发现不一样的东西,真的是印证了孔子的那句话 温故而知新

Spark分为
1、Spark-Core:Spark最核心的部分,所有的都基于Core,比如RDD的转换

2、SparkSql:Spark对结构化数据的SQl语法支持、可以结合Hive、Hbase等

3、Spark-Streaming:Spark对实时计算、流式计算的支持,Spark-Streaming仅仅是微批处理,本质还是批处理,只是将处理时间缩小到秒级
还有最近很火爆的Flink,和SparkStreaming相反,它是将流处理看成批处理,将无界的数据当成有界的来处理,Flink可以批处理,也可以实时处理。但是SparkStreaming相反,将有界数据当成无界来实时处理,要说两者的区别,这一点就是天差地别了。

1、Action算子
spark是懒加载的,只有action算子才可以触发一系列的rdd操作,如果其中一个rdd要重新计算,那么rdd链前面的rdd全部要重新计算,如果设置了检查点或者暂时落地到磁盘,就可以加快速度

collect:打印rdd所有元素,一般数据较大不会用这个,一般用于调试

count:rdd内数据个数还有countBykey 道理一样

first:返回第一个元素

take(n):返回前n个元素,不排序

top(n):排序,返回最大的前n个

reduce:归约
reduceByKey:用于元祖类型归约,根据键,将键值对的值进行归约

val r1 = sc.makeRDD(Array(1,43,21,4,5,6,89))
r1.reduce(*) //数组元素全部相乘 14321......
r1.reduce(+) //同理 相加
注意:scala函数入参可以用 _ 代替 前提是返回值只有一个,
第一个下划线代表第一个参数以此类推

foreach:不说了

储存的action算子
saveAsTextFile:储存为文本文件,spark会对每个元素执行toString操作
saveAsSequenceFile:储存为SequenceFile格式
saveAsObjectFile:以对象形式储存

2、Transformations算子
从已知rdd转换成新的rdd

一元转换算子

map:一进一出,如果对 [“a b c” , “d e f g”]进行按空格拆分,则会变成
【[a,b,c],[d,e,f,g]】,这样一个二维数组

floatMap:一进多出 上面的例子就会变成【a,b,c,d,e,f,g】一维数组

mapValues:用于键值对的值操作
flatMapValues:同理

mapPartitions:不同于map对每个元素操作,这个是对每个分区
mapPartitionsWithIndex:看代码

def MyFunc(index:Int,iter:Iterator[Int]): Iterator[Any] = //第一个参数代表分区位置
{
println("Index:"+index)
val res = for(e<-iter) yield (e,e*10)
res
}

val c = r1.mapPartitionsWithIndex(MyFunc).collect

groupByKey:对key进行分组
sortBKey:根据key排序

filter:过滤元素,传入一个函数,返回布尔值

val g = sc.makeRDD(List(('a',1),('a',2),('b',5),('c',23) ))

g.filter(.2%2==0).foreach(print)
//返回值为偶数的键值对 .2 第一个_代表传入的键值对
_2代表键值对的值

二元转换

union:合并
intersection:交集
subtract:看代码
join:链接操作 类型sql的join

object T4 extends App
{
val conf = new SparkConf().setMaster("local").setAppName("word")
val sc = new SparkContext(conf)

val r1 = sc.makeRDD(1 to 3)
val r2 = sc.makeRDD(2 to 5)
r1.union(r2).foreach(print) //合在一起 不会去重
println()
r1.intersection(r2).foreach(print) //交集
println()
r1.subtract(r2).foreach(print) //在r1 同时不在r2

println()
val r3 = sc.makeRDD(List( ('a',1),('d',2),('b',2) ))
val r4 = sc.makeRDD(List( ('d',1),('e',2),('b',2) ))
r3.join(r4).foreach(print)
r3.leftOuterJoin(r4).foreach(print) //以r1中的key为依据 结果包含r1的key
println()
r3.rightOuterJoin(r4).foreach(print) //相反

println()
r3.cogroup(r4).foreach(print)//所有键都包括

println(r3.collectAsMap())

}

运行结果
1232345
32
1

(d,(2,1))(b,(2,2))
(d,(2,Some(1)))(a,(1,None))(b,(2,Some(2)))
(d,(Some(2),1))(e,(None,2))(b,(Some(2),2))
(d,(CompactBuffer(2),CompactBuffer(1)))(e,(CompactBuffer(),CompactBuffer(2)))(a,(CompactBuffer(1),CompactBuffer()))(b,(CompactBuffer(2),CompactBuffer(2)))Map(b -> 2, d -> 2, a -> 1)

6
Scala 使用 Option[String] 来告诉你:「我会想办法回传一个 String,但也可能没有 String 给你」。
Option 有两个子类别,一个是 Some,一个是 None,当他回传 Some 的时候,代表这个函式成功地给了你一个 String,而你可以透过 get() 这个函式拿到那个 String,如果他返回的是 None,则代表没有字符串可以给你。

3、持久化
默认情况,rdd经过一系列转换后得到rdd,多次提交会从新计算rdd转换链
,为了提高效率,可对rdd进行持久化操作

cache=persist(MEMORY_ONLY) //持久化入内存
DISK_ONLY:磁盘

其他还有很多 自行查看

4、共享变量
广播变量、累加器

//广播变量和累加器
object T5 extends App
{
val conf = new SparkConf().setMaster("local").setAppName("word")
val sc = new SparkContext(conf)
val broad = sc.broadcast(Array(1,2,3))
println(broad.value)

val count = sc.longAccumulator("pb")
count.add(1)
count.add(2)
println(count.value)

}

11
广播变量注意事项:

不能讲rdd广播,只能讲结果广播
只能在Driver中定义,不能在Executor定义
Driver可修改广播变量,Executor不可
如果在Executor中用到了Driver的变量,如果不使用广播,则有多少Task就有多少数据的副本

总结:
RDD是Spark的核心,称为弹性分布式数据集,是整个Spark架构的基础。


Spark Core: RDD
https://blog.csdn.net/qq_39394264/article/details/90580557