作者:刘旭晖 Raymond 转载请注明出处
Email:
BLOG:
随着Spark的逐渐成熟完好, 越来越多的可配置參数被加入到Spark中来, 在Spark的官方文档http://spark.apache.org/docs/latest/configuration.html 中提供了这些可配置參数中相当大一部分的说明.
可是文档的更新总是落后于代码的开发的, 另一些配置參数没有来得及被加入到这个文档中, 最重要的是在这个文档中,对于很多的參数也仅仅能简单的介绍它所代表的内容的字面含义, 假设没有一定的实践基础或者对其背后原理的理解, 往往无法真正理解该怎样针对详细应用场合进行合理配置。
本文试图通过阐述这当中部分參数的工作原理和配置思路, 和大家一起探讨一下怎样依据实际场合对Spark进行配置优化。须要注意的是。理论上。没有绝对正确的配置(否则也就不须要相应的配置參数了。Spark框架内部直接写死就好了)。所以请结合自己的实际情况,辩证的看以下的内容。
因为本文主要针对和性能相关的一些配置參数进行阐述,所以基本不会覆盖其他和性能没有太多关系的配置參数。
因为篇幅较长,所以在这里分篇组织,假设要看最新完整的网页版内容,能够戳这里:。主要是便于更新内容
Shuffle 相关
Shuffle操作大概是对Spark性能影响最大的步骤之中的一个(因为可能涉及到排序,磁盘IO,网络IO等众多CPU或IO密集的操作),这也是为什么在Spark 1.1的代码中对整个Shuffle框架代码进行了重构,将Shuffle相关读写操作抽象封装到Pluggable的Shuffle Manager中,便于试验和实现不同的Shuffle功能模块。
比如为了解决Hash Based的Shuffle Manager在文件读写效率方面的问题而实现的Sort Base的Shuffle Manager。
spark.shuffle.manager
用来配置所使用的Shuffle Manager,眼下可选的Shuffle Manager包含默认的org.apache.spark.shuffle.sort.HashShuffleManager(配置參数值为hash)和新的org.apache.spark.shuffle.sort.SortShuffleManager(配置參数值为sort)。
这两个ShuffleManager怎样选择呢,首先须要了解他们在实现方式上的差别。
HashShuffleManager,故名思义也就是在Shuffle的过程中写数据时不做排序操作,仅仅是将数据依据Hash的结果,将各个Reduce分区的数据写到各自的磁盘文件里。带来的问题就是假设Reduce分区的数量比較大的话,将会产生大量的磁盘文件。
假设文件数量特别巨大,对文件读写的性能会带来比較大的影响,此外因为同一时候打开的文件句柄数量众多,序列化。以及压缩等操作须要分配的暂时内存空间也可能会迅速膨胀到无法接受的地步,对内存的使用和GC带来非常大的压力,在Executor内存比較小的情况下尤为突出。比如Spark on Yarn模式。
SortShuffleManager。是1.1版本号之后实现的一个试验性(也就是一些功能和接口还在开发演变中)的ShuffleManager,它在写入分区数据的时候,首先会依据实际情况对数据採用不同的方式进行排序操作。底线是至少依照Reduce分区Partition进行排序,这样来至于同一个Map任务Shuffle到不同的Reduce分区中去的所有数据都能够写入到同一个外部磁盘文件里去。用简单的Offset标志不同Reduce分区的数据在这个文件里的偏移量。
这样一个Map任务就仅仅须要生成一个shuffle文件,从而避免了上述HashShuffleManager可能遇到的文件数量巨大的问题
两者的性能比較,取决于内存,排序,文件操作等因素的综合影响。
对于不须要进行排序的Shuffle操作来说。如repartition等,假设文件数量不是特别巨大,HashShuffleManager面临的内存问题不大,而SortShuffleManager须要额外的依据Partition进行排序,显然HashShuffleManager的效率会更高。
而对于本来就须要在Map端进行排序的Shuffle操作来说,如ReduceByKey等,使用HashShuffleManager虽然在写数据时不排序,但在其他的步骤中仍然须要排序,而SortShuffleManager则能够将写数据和排序两个工作合并在一起运行。因此即使不考虑HashShuffleManager的内存使用问题,SortShuffleManager依然可能更快。
spark.shuffle.sort.bypassMergeThreshold
这个參数仅适用于SortShuffleManager,如前所述,SortShuffleManager在处理不须要排序的Shuffle操作时,因为排序带来性能的下降。这个參数决定了在这样的情况下,当Reduce分区的数量小于多少的时候,在SortShuffleManager内部不使用Merge Sort的方式处理数据,而是与Hash Shuffle相似。直接将分区文件写入单独的文件,不同的是,在最后一步还是会将这些文件合并成一个单独的文件。这样通过去除Sort步骤来加快处理速度,代价是须要并发打开多个文件,所以内存消耗量添加,本质上是相对HashShuffleMananger一个折衷方案。 这个參数的默认值是200个分区,假设内存GC问题严重,能够降低这个值。
spark.shuffle.consolidateFiles
这个配置參数仅适用于HashShuffleMananger的实现,相同是为了解决生成过多文件的问题,採用的方式是在不同批次运行的Map任务之间重用Shuffle输出文件。也就是说合并的是不同批次的Map任务的输出数据,可是每一个Map任务所须要的文件还是取决于Reduce分区的数量。因此,它并不降低同一时候打开的输出文件的数量,因此对内存使用量的降低并没有帮助。仅仅是HashShuffleManager里的一个折中的解决方式。
须要注意的是。这部分的代码实现虽然原理上说非常easy,可是涉及究竟层详细的文件系统的实现和限制等因素,比如在并发訪问等方面,须要处理的细节非常多,因此一直存在着这样那样的bug或者问题。导致在比如EXT3上使用时。特定情况下性能反而可能下降。因此从Spark 0.8的代码開始,一直到Spark 1.1的代码为止也还没有被标志为Stable,不是默认採用的方式。此外因为并不降低同一时候打开的输出文件的数量,因此对性能详细能带来多大的改善也取决于详细的文件数量的情况。所以即使你面临着Shuffle文件数量巨大的问题,这个配置參数是否使用,在什么版本号中能够使用,也不妨实际測试以后再决定。
spark.shuffle.spill
shuffle的过程中。假设涉及到排序,聚合等操作,势必会须要在内存中维护一些数据结构,进而占用额外的内存。假设内存不够用怎么办,那仅仅有两条路能够走。一就是out of memory 出错了。二就是将部分数据暂时写到外部存储设备中去。最后再合并到终于的Shuffle输出文件里去。
这里spark.shuffle.spill 决定是否Spill到外部存储设备(默认打开),假设你的内存足够使用。或者数据集足够小,当然也就不须要Spill。毕竟Spill带来了额外的磁盘操作。
spark.shuffle.memoryFraction/ spark.shuffle.safetyFraction
在启用Spill的情况下,spark.shuffle.memoryFraction(1.1后默觉得0.2)决定了当Shuffle过程中使用的内存达到总内存多少比例的时候開始Spill。
通过spark.shuffle.memoryFraction能够调整Spill的触发条件,即Shuffle占用内存的大小,进而调整Spill的频率和GC的行为。
总的来说,假设Spill太过频繁,能够适当添加spark.shuffle.memoryFraction的大小。添加用于Shuffle的内存,降低Spill的次数。当然这样一来为了避免内存溢出,相应的可能须要降低RDD cache占用的内存。即减小spark.storage.memoryFraction的值。这样RDD cache的容量降低,有可能带来性能影响,因此须要综合考虑。
因为Shuffle数据的大小是估算出来的,一来为了降低开销。并非每添加一个数据项都完整的估算一次。二来估算也会有误差。所以实际暂用的内存可能比估算值要大,这里spark.shuffle.safetyFraction(默觉得0.8)用来作为一个保险系数,降低实际Shuffle使用的内存阀值。添加一定的缓冲,降低实际内存占用超过用户配置值的概率。
spark.shuffle.spill.compress/ spark.shuffle.compress
这两个配置參数都是用来设置Shuffle过程中是否使用压缩算法对Shuffle数据进行压缩。前者针对Spill的中间数据,后者针对终于的shuffle输出文件,默认都是True
理论上说。spark.shuffle.compress设置为True通常都是合理的,因为假设使用千兆以下的网卡。网络带宽往往最easy成为瓶颈。此外,眼下的Spark任务调度实现中,以Shuffle划分Stage,下一个Stage的任务是要等待上一个Stage的任务所有完毕以后才干開始运行,所以shuffle数据的传输和CPU计算任务之间通常不会重叠。这样Shuffle传输数据量的大小和所需的时间就直接影响到了整个任务的完毕速度。可是压缩也是要消耗大量的CPU资源的,所以打开压缩选项会添加Map任务的运行时间,因此假设在CPU负载的影响远大于磁盘和网络带宽的影响的场合下,也可能将spark.shuffle.compress 设置为False才是最佳的方案
对于spark.shuffle.spill.compress而言,情况相似,可是spill数据不会被发送到网络中,仅仅是暂时写入本地磁盘。并且在一个任务中同一时候须要运行压缩和解压缩两个步骤。所以对CPU负载的影响会更大一些,而磁盘带宽(假设标配12HDD的话)可能往往不会成为Spark应用的主要问题,所以这个參数相对而言,也许更有机会须要设置为False。
总之,Shuffle过程中数据是否应该压缩,取决于CPU/DISK/NETWORK的实际能力和负载。应该综合考虑。