Spark参数优化
- spark conf 中加入一些参数,会加速shuffle过程,这里根据数据量实际大小可以调节下面一些参数,实践中会对于大数据量JOIN能加速20%以上
spark.default.parallelism 是控制每个stage的默认stage数量,一般是对RDD有效,而参数spark.sql.shuffle.partitions是控制spark sql的shuffle分区数量
spark.default.parallelism=1000
spark.sql.shuffle.partitions=1000
- shuffle数据量大或者硬盘空间不足,造成节点异常,可以①增加shuffle等待时间②划分更多的partitions③增加executor的内存
但是由shuffle导致的异常的根本解决办法还是想办法减少shuffle的数据量
spark.rpc.askTimeout=30000
- 数据倾斜是同一个stage中极少数task处理的数据量远远比其他task要大或者慢,网上各种解决方法很多,就不啰嗦了。有一个参数对于少数task运行时间长比较有效,就是开启推测执行,如果一个task比较慢,会开启几个相同的task任务去执行,减少task因为网络传输等各种原因失败的情况,加快执行,防止因为一个task卡住导致运行极慢。
spark.speculation=true
适当调整推测执行的参数,可以使任务跑的更快
spark.speculation.interval=500
spark.speculation.quantile=0.85
spark.speculation.multiplie=1.6
- 数据量特别大(比方说超过100G)的时候,
① 不要使用persist和unperisist函数,容易失败,可能spark对于超大数据的persist不太支持,而且大量数据persist存在磁盘IO,本身比较耗时 ② 尽量不要使用count函数 ③ 大数据尽快落地 - 如果读取的数据源有很多小文件,读取会产生大量的task来处理,可以设置参数使得每个task处理的数据量变大,这样能减少task数量.spark.sql.files.openCostInBytes默认是4M,表示小于4M的会合并成一个文件,减少读取的task。
spark.sql.files.openCostInBytes=33554432
spark.sql.files.maxPartitionBytes=268435456
- 集群如果不稳定,经常有机器挂掉,需要设置黑名单机制,某台机器上面的任务失败数量超过一定值后,调度器就会记住这台机器,后面的task就不会再往这个机器上面下发
spark.blacklist.enabled=true
- 如果driver有一些像collect这样的action操作,而拉取到driver上面的数据量又比较大,需要调整driver的maxResultSize值,不然任务会失败
spark.driver.maxResultSize=10g
- Spark任务有时候会失败自动重试,如果希望失败了不重试,可以制定尝试的次数
spark.yarn.maxAppAttempts=1
- 小文件太多,处理的时候也可以使用制定文件分块大小的方式来减少读取的task数量
spark.hadoop.mapreduce.input.fileinputformat.split.minsize=10240000
spark.hadoop.mapreduce.input.combinefileinputformat.split.minsize=10240000
spark.mapreduce.input.fileinputformat.split.maxsize=51200000
spark.mapreduce.input.fileinputformat.split.minsize=51200000
- 当程序jar包里面的依赖和集群里面相同依赖的版本不同时,会出现版本不兼容的情况,需要指定使用程序里面的jar包
spark.driver.userClassPathFirst=false
spark.executor.userClassPathFirst=false
spark.driver.extraClassPath=__app__.jar
spark.executor.extraClassPath=__app__.jar
- 程序如果对于资源稳定性要求较高,可以指定不进行动态分配
spark.dynamicAllocation.enabled=false
|