Storage模块详解
Storage模块负责管理Spark计算过程中产生的数据,包括基于Disk的和基于Memory的。
用户在实际编程中,面对的是RDD,可以将RDD的数据通过cache持久化,持久化的动作都是由Storage模块完成的,包括Shuffle过程中的数据,也都是由Storage模块管理的。
可以说RDD实现用户的逻辑,而Storage管理用户的数据。在Driver端和Executor端,都会有Storage模块。
8.1 模块整体架构
8.1.1 整体架构
- Storage模块采用的是Master/Slave的架构。Master负责整个Application的Block的元数据信息的管理和维护;而Slave需要将Block的更新等状态上报到Master,同时接收Master 的命令,比如删除一个RDD、Shuffle相关的数据或者广播变量。而Master与Slave之间通过AKKA消息传递机制进行通信。
- Master和Slave之间并没有专门的心跳,而是通过Driver和Executor之间的心跳来间接完成的。
- Master持有整个Application的Block的元数据信息,包括Block所在的位置,Block所占的存储空间的大小(包含三种类型:内存、Disk和Tachyon)。
8.1.2 源码组织结构
- org.apache.spark.storage.BlockManager是Storage模块与其他模块交互最主要的类,他提供了读和写Block的接口。
这里的Block,实际上就对应了RDD中提到的Partition,每一个Partition都会对应一个Block。每个Block由唯一的Block ID标识,格式是”rdd_“ + rddId + “_” + partitionId。 - BlockManager会运行在Driver和每个Executor上。而运行在Driver上的BlockManager负责整个Application的Block的管理工作;运行在Executor上的BlockManager负责管理该Executor上的Block,并且向Driver的BlockManager汇报Block的信息和接收来自他的命令。
8.1.3 Master和Slave的消息传递详情
-
这里的Master指的是org.apache.spark.store.BlockManagerMasterActor,它运行在Driver端,通过保存Slave的Actor Reference向Slave发送消息;Slave指的是 org.apache.spark.store.BlockManagerSlaveActor,它运行在Driver端,每个Executor都有一个,主要的功能是接收来自Master 的命令,做一些清理工作并且响应Master获取Block状态的请求,同时Executor都会保存有org.apache.spark.storage.BlockManagerMasterActor的Actor Reference,Executor通过这个Reference和Master进行通信。
-
1.Master到Slave消息详解
- Slave有任何的Block状态更新,都会主动通过Master Actor Reference上报到Master。而Master到Slave的消息主要是由Master向Slave发送的控制信息或者获取状态的请求。控制信息主要指删除Block、RDD相关的Block、广播变量相关的数据和Shuffle相关的数据。获取状态的请求主要是获取Block的状态信息和匹配的Block ID等信息。
-
2.Slave到Master消息详解
- Slave Actor不会主动向Master Actor发送消息。这里的Slave到Master的消息,主要还是由Slave节点上的BlockManager发出的。
8.2 存储实现详情
8.2.1 存储级别
-
存储级别,对于用户来说是RDD相关的持久化和缓存。这实际上也是Spark最终要的特征之一。每个节点都将RDD的Partition的数据保存在内存中,后续的计算将会变得非常快。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。
-
前面也介绍过,只有触发了一个Action后,计算才会提交到集群中开始真正的运算。因此,RDD只有经过一次Acttion之后,才能将RDD缓存到内存中以供以后的计算使用。这个缓存也有容错机制,如果某个缓存丢失了,那么会通过原来的计算过程进行重算。
-
1.存储级别的定义
- RDD的Partition和Storage模块的Block是一一对应的关系。
- NONE:
不会保存任何数据 - DISK_ONLY:
直接将RDD的Partition保存在该节点的Disk上。 - MEMORY_ONLY:
将RDD的Partition对应的原生Java Object保存在JVM中。如果RDD太大导致它的部分Partition不能存储在内存中,那么这些Partition将不会被缓存,并且在需要的时候被重新计算。这还是默认的级别。 - MEMORY_ONlY_SER:
将RDD的Partition序列化后的对象(每一个Partition占用一个字节数组)存储在JVM中。通常来说,这将比直接保存院士对象的空间利用率更高,尤其当使用fast seralizer(快速序列化)时。但在读取时由于需要反序列化会比较占用CPU。 - DISK_ONLY_2
MEMORY_ONLY_2
MEMORY_ONLY_SER_2
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER_2:
同上书的存储级别。不同就是在其他节点上会保存一个相同的备份。从集群的角度看,一共有两个备份。 - OFF_HEAP:
将RDD的Partition序列化后存储到Tachyon中。相比MEMORY_ONLY_SER,OFF_HEAP有几个优势:
1)减少了GC带来的性能损耗
2)使得Executor内存使用更加轻量级
3)在集群的角度共享一个内存池,非常有利于对于内存有超大需求的Application;而且使得在每个Executor中间共享内存数据成为可能
-
2.选择合适的存储级别
- Spark不同的存储级别是内存使用和CPU效率的折中。Spark官网建议按照以下步骤来选择合适的存储级别:
- 1)如果你的RDD可以和默认的存储级别有很好的的契合,那么就无需任何特殊的设定了。默认的存储级别是CPU最高效的选项,也是运算能够最快完成的选项。
- 2)如果不行,那么需要减少内存的使用,可以使用MEMORY_ONLY_SER。这个时候需要选择一个合适的序列化方案。需要在空间效率和反序列化是所需要的CPU中做一个合适的选择。
- 3)尽量不要落在硬盘上,除非是计算逻辑非常复杂,或者是需要从一个超大规模的数据集过滤出一小部分数据。否则重新计算一个Partition的速度可能和从硬盘读差不多(考虑到出错的概率和写硬盘的开销,因此采用失败重算要比读硬盘持久化的数据要好)。
- 4)如果你需要故障的快速恢复能力(比如使用Spark来处理Web的请求),那么可以考虑使用存储级别的多副本机制。实际上所有的存储级别都提供了Partition数据丢失时的重算机制,只不过有备份的话可以让Application直接使用副本而无需等待重新计算丢失的Partition数据。
- 5)如果集群有大量的内存或者有很多的运行任务,则选择OFF_HEAP。现在处于试验阶段的OFF_HEAP有以下的优势:
a) 它使得多个Executor可以共享一个内存池。
b) 它显著地减少了GC的开销。
c) 缓存在内存中的数据即使是产生它的Executor异常退出了也不会丢失。
8.2.2 模块类图
- org.apache.spark.storage.BlockStore:存储Block的抽象类。
他的实现有:
1)org.apache.spark.storage.DiskStore
2)org.apache.spark.storage.MemoryStore
3)org.apache.spark.storage.Tachyon
8.2.3 org.apache.spark.storage.DiskStrore实现详解
- DiskStore通过org.apache.spark.DiskBlockManager来管理文件。前面介绍过,DiskBlockManager管理和维护了逻辑上的Block存储和存储在Disk上物理的Block的映射。一般来说,一个逻辑的Block会根据他的blockId生成的名字映射到一个物理的文件。
8.2.4 org.apache.spark.storage.MemoryStore实现详解
- MemoryStore实际上是使用了一个HashMap来保存Block的数据
8.2.5 org.apache.spark.storage.TachyonStore实现详解
- Tachyon实现了Spark将缓存的数据放到Tachyon中。这个实现可以看作是实现了一个Tachyon的客户端,通过这个客户端Spark可以读写Tachyon的数据。
8.2.6 Block存储的实现
- org.apache.spark.storage.BlockManager对外屏蔽了存储实现的方式,而是通过用户传入的存储级别来确定最终的持久化方案。
8.3 性能调优
8.3.1 spark.local.dir
- 这个目录用于写中间数据,如RDD Cache、Shuffle时存储数据的位置,在8.2.3节已经详细介绍了它的使用方式。
- 首先,最基本的是我们可以配置多个路径(用逗号分割)到多个磁盘上增加整体IO带宽。
其次,在8.2.3也提过,一个逻辑的Block会根据他的blockId生成的名字映射到一个物理上的文件。这些物理文件会被hash到由spark.local.dir(或者SPARK_LOCAL_DIRS)设置的目录中。因此,如果存储设备的读写速度不一样,那么可以在较快的存储设备上配置更多的目录来增加它被使用的比例,从而更好地利用快速存储设备。在Spark能够感知具体的存储设备类型前,这个变通方法可以取得一个不错的效果。 - 还需要注意的是,在Spark1.0以后,SPARK_LOCAL_DIRS(Standalone,Mesos)或者LOCAL_DIRS(YARN)参数会覆盖这个配置。比如高YARN模式的时候,Spark Executor的本地路径依赖于YARN的配置,而不取决于这个参数。
8.3.2 spark.executor.memory
- 与6.3.1一致。
8.3.3 spark.storage.memoryFraction
- spark.executor.memory 决定了每个Executor可用内存的大小,那么spark.storage.memoryFraction则决定了这部分内存中有多少可以用于Memory Store管理RDD cache数据,多少内存用来满足任务运行时各种其他内存空间的需要。
默认值0.6。调整这个比值,取决于你的应用对数据的使用模式和数据的规模,粗略的来说如果频繁发生全量的垃圾回收,可以考虑降低这个值,这样RDD Cache可用的内存空间减少(剩下的部分Cache数据就需要通过Disk Store写到磁盘上了),虽然会带来一定的性能损失,但是腾出更多的内存空间用于执行任务,减少全量的垃圾回收发生的次数,反而可能改善程序运行的整体性能。
8.3.4 spark.streaming.blockInterval
- 这个参数用来设置Spark Streaming里Stream Receiver生成Block的时间间隔,默认为200ms。具体表现为Receiver所接收的数据,以相同的时间间隔,同期性地从Buffer中生成一个StreamBlock放进队列,等待进一步被存储到BlockManager中供后续计算过程使用。
- 从理论上来说为了保证每个StreamingBatch间隔里的数据是均匀的,这个时间间隔应该能被Batch的时间间隔长度整除。
总体来说,如果内存大小够用,Streaming 的数据来得及处理,这个时间间隔的影响不大,当然,如果数据存储的级别是Memory+Ser,即做了序列化处理,那么时间间隔的大小将会影响序列化后数据块的大小,对于Java的垃圾回收有一定的影响