Spark压缩文件性能分析
HDFS上分布式文件存储,成为大数据平台首选存储平台。而Spark往往以HDFS文件为输入,为保持兼容性,Spark支持多种格式文件读取,大数据场景下,性能瓶颈往往是IO,而不是CPU算力,所以对文件的压缩处理成为了很必要的手段。Spark为提供兼容性,同时支持多种压缩包直接读取,方便于用户使用,不用提前对压缩格式处理,但各种压缩格式各有优缺点,若不注意将导致Spark的能力无法发挥出来。故对Spark计算压缩文件做一个分析。
支持的压缩格式
首先来看一下Spark读取HDFS文件常用的压缩格式:
执行对比分析
实验数据:同一个文件包,json格式文件数据
处理逻辑:增加列,然后发送到kafka中。
DAG逻辑划分:两个job(read动作一个job,foreach动作一个job),每个job下面各一个stage,每个stage下面task若干
程序执行参数:–master yarn --deploy-mode client --executor-cores 4 --executor-memory 4G --num-executors 4
非压缩文件
文件大小:33.7GB
运行时间:9min
read阶段:
可以看到所有节点都在读取,分布式读取,速度很快。
Stage里面共计分成了252个task,每个读取128MB数据。
foreach阶段
依然并行全力计算
每个执行节点上4个core都在并发运行。
GZIP
文件大小:10.6GB
运行时间:2.2h
read阶段
只有单节点读取
同时该节点上也只有一个核心在运行
foreach阶段
也是只有单节点、单core运行
BZIP2
文件大小:7.7GB
运行时间:12min
read阶段
与非压缩一致,并行进行
foreach阶段
同样并发执行
SNAPPY
文件大小:16.5GB
运行时间:2.1h
这里直接采用的整文件压缩,所以文件不可分割。
read阶段
单节点单核心读取,非并行。
foreach阶段
同上类似,单节点单核心运行
结果
gzip和snappy无法采用并行计算,也就是说在spark平台上,这两种格式只能采用串行单进程执行,于本文开头表格对应,无法分割(splittable)的压缩格式只能顺序一个进程读取,而读取后多文件又在一个executor上,其他executor无文件导致无法并行的foreach。
bz2和非压缩格式支持分割,也就是说可以并行读取以及计算。
不可分割的压缩格式文件不可并行读取,完全无法发挥spark的并行计算优势,并且若压缩包过大,对单节点的物理性能要求较高。
建议
snappy采用分块压缩方式使其可以并行读取计算。
gzip格式最好提前进行分割成小文件或者换格式,因多个文件可以并行读取。另一个办法是read文件后调用repartition操作强制将读取多数据重新均匀分配到不同的executor上,但这个操作会导致大量单节点性能占用,因此该格式建议不在spark上使用。
bz2表现相同于非压缩,但解压操作需要耗费时间。
非压缩性能表现最佳,但会占用过大HDFS存储。
spark输出压缩文件
实际生产环境需要spark输出文件到HDFS,并且为了节省空间会使用压缩格式,以下介绍几种常用的压缩格式
文本文件压缩
bzip2
压缩率最高,压缩解压速度较慢,支持split。
import org.apache.hadoop.io.compress.BZip2Codec // rdd.saveAsTextFile("codec/bzip2",classOf[BZip2Codec])
snappy
json文本压缩率 38.2%,压缩和解压缩时间短。
import org.apache.hadoop.io.compress.SnappyCodec // rdd.saveAsTextFile("codec/snappy",classOf[SnappyCodec])
gzip
压缩率高,压缩和解压速度较快,不支持split,如果不对文件大小进行控制,下次分析可能可能会造成效率低下的问题。
json文本压缩率23.5%,适合使用率低,长期存储的文件。
import org.apache.hadoop.io.compress.GzipCodec // rdd.saveAsTextFile("codec/gzip",classOf[GzipCodec])
parquet文件压缩
列式存储布局(比如 Parquet)可以加速查询,因为它只检查所有需要的列并对它们的值执行计算,因此只读取一个数据文件或表的小部分数据。Parquet 还支持灵活的压缩选项,因此可以显著减少磁盘上的存储。
如果您在 HDFS 上拥有基于文本的数据文件或表,而且正在使用 Spark SQL 对它们执行查询,那么强烈推荐将文本数据文件转换为 Parquet 数据文件,以实现性能和存储收益。当然,转换需要时间,但查询性能的提升在某些情况下可能达到 30 倍或更高,存储的节省可高达 75%!
转换 1 TB 数据将花费多长时间?
50 分钟,在一个 6 数据节点的 Spark v1.5.1 集群上可达到约 20 GB/分的吞吐量。使用的总内存约为 500GB。HDFS 上最终的 Parquet 文件的格式为:
... /user/spark/data/parquet/catalog_page/part-r-00000-9ff58e65-0674-440a-883d-256370f33c66.gz.parquet /user/spark/data/parquet/catalog_page/part-r-00001-9ff58e65-0674-440a-883d-256370f33c66.gz.parquet ...
存储节省
以下 Linux 输出显示了 TEXT 和 PARQUET 在 HDFS 上的大小比较:
% hadoop fs -du -h -s /user/spark/hadoopds1000g 897.9 G /user/spark/hadoopds1000g % hadoop fs -du -h -s /user/spark/data/parquet 231.4 G /user/spark/data/parquet
1 TB 数据的存储节省了将近 75%!
parquet为文件提供了列式存储,查询时只会取出需要的字段和分区,对IO性能的提升非常大,同时占用空间较小,即使是parquet的uncompressed存储方式也比普通的文本要小的多。
spark中通过对parquet文件进行存储,spark2.0后默认使用snappy压缩,1.6.3及以前版本默认使用的gzip压缩方式。
dataset.write().parquet("path");
可以通过
spark.sql.parquet.compression.codec
参数或是在代码中进行修改配置压缩方式。
sparkConf.set("spark.sql.parquet.compression.codec","gzip")
parquet存储提供了
lzo gzip snappy uncompressed
参考文章
https://zturn.cc/?p=24
https://blog.csdn.net/bajinsheng/article/details/100031359
https://www.ibm.com/developerworks/cn/analytics/blog/ba-parquet-for-spark-sql/index.html