前言
Spark作业的优化其实是泛的话题,因为往往有时候表现出来都是慢,但是解法却不一样,我想把优化的方方面盘点出来,以便系统性地去制定整体的优化方案。
优化思路梳理
到底怎样去看待所谓慢的问题呢,我做了一个整理:
主题 |
---|
资源优化 | 并行度优化 | 代码优化 | Shuffle优化 | 内存优化 | 堆外内存优化 | 数据倾斜处理 | 读写介质优化 |
资源优化
绝大部分作业变慢其实就是资源吃紧导致的,这就是为什么啥都没变怎么就慢了呢,去查问题的时候又查不出个所以然来。换句话来说绝大部分作业其实加资源可以解决问题,甚至有些其他问题的时候加了资源也可以抗过去。资源优化涉及两块,一块是集群的资源优化,一个是作业内部的资源分配。
集群资源优化
1.资源模式选择 Standalone or Yarn,大部分情况在实际看到的都是Yarn的模式,这个是因为Yarn在整个企业承担着统一分配资源的任务,历史上大部分Spark作业是Hive切换过来的,Yarn的调度方式是比较合理的,但是Yarn其实分配需要增加调度的开销,在生产上,涉及到高频的调度,需要去掉Yarn的申请资源延迟,往往是单独搭建一个Standalone环境的。 资源使用的上限是在配置中决定的,在分配资源初期需要做一次压测,保证分配的资源没有超配; Standalone中的资源配置:
SPARK_WORKER_CORES
SPARK_WORKER_MEMORY
Yarn中的资源配置:
yarn.nodemanager.resource.cpu-vcores
yarn.nodemanager.resource.memory-mb
2.提交任务时候的资源分配,这个在提交的时候指定参数
./spark-submit
--master spark://xxx
--executor-cores xxx
--executor-memory xx
–executor-cores : 启动一个Executor使用多少core –executor-memory : 启动一个Executor使用多少内存 –total-executor-cores : 启动一个Application一共使用多少core -standalone集群下 –num-executor : 指定启动一个Application 启动多少executor – yarn集群 建议以上参数设置在提交任务命令中
另外,Spark作业其实是支持动态参数调整的,在做作业测试的时候建议暂时关闭,真正上线之后开启即可 spark.dynamicAllocation.enabled
并行度优化
并行度优化其实是有两个点,一个是并行度过低,带来大量的慢task以及shuffle溢出,一个是并行度过高,带来大量小任务,资源消耗巨大,调度成本高导致还是慢。主要通过编码时候参赛控制即可,常见的控制点在控制rdd的并行参数上:
sc.textFile(xx,minnumpartitions)
sc.parallelize(xx,num)
sc.markRDD(xx,num)
sc.parallelizePairs(List[Tuple2<String,String>],num)
reduceByKey(xx,num),distinct(xx,num),join(xx,num),groupByKey(xx,num),repartition(num)
spark.default.parallelism 调整默认的并行度
spark.sql.shuffle.partitions = 200
自定义分区器
SparkStreaming中 Direct模式 : 与读取的topic的分区数保持一致
代码优化
这个就是Spark程序员要的基本功了,也是有规律可循:
RDD复用
我们都知道rdd是一连串血缘操作计算出来的,所谓的复用其实就是不要重复去做计算,自然会减少消耗,一般是下面两招: 1.尽量复用同一个RDD,避免创建重复的RDD 2.对常用的RDD进行持久化,这样子下次重复计算直接从已经计算好的结果拉取数据了,常用的代码如下:
cache() = persist() = persist(StorageLevel.MEMORY_ONLY)
persist策略枚举:
MEMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
尽量避免使用shuffle算子
Shuffle本身带来消耗巨大,我们其实尽量不进行shuffle,常见的手段主要是使用map 类的算子 + 广播变量代替 join
使用map端有预聚合的操作
使用map端预聚合有以下好处 1.减少map端的shuffle数据量 2.减少reduce端读取的数据量 3.减少 reduce端 聚合次数 但是也需要保障计算结果一致性的前提,常见算子:
reduceByKey
aggreagateByKey
combineByKey
使用高性能的算子
在保存数据或者插入数据库中数据时,可以有以下操作: 1.使用mapPartition代替map,这样子其实可以按照 partition为单位批量写入,减少jdbc链接次数 2.foreachPartition代替foreach,也是批量操作 3.对于大量小文件处理,可以先使用coalesce来减少分区 4.对与大量的数据过滤之后可以考虑使用coalesce减少分区 5.对与数据量多,分区少情况下的数据可以使用repartition来增多分区 6.使用reduceByKey 代替groupByKey
使用广播变量
当Exeutor端使用到Driver端的变量时,可以使用广播变量来减少Executor端内存的占用 如果不使用广播变量,那每个Executor中有多少task就有多少Driver端的变量副本 注意 1.Executor端的内存需要够用 2.广播变量不能在Executor中修改
使用Kryo序列化
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class[]{_KryoBean.class});
1.RDD[自定义类型] 2.数据持久化 : StorageLevel.MEMORY_ADK_DISK_SER 3.task 节点之间发送 注意问题: 如果一个类使用了java序列化就不能使用kryo序列化了
优化数据结构
数据结构的优化其实就是为了减少存储,举例说明int 类型0 和1 和字符串的"0","1"空间其实差异很大的,在做序列化传输的时候成本同时上去了,所以数据结构的定义也有以下建议: 1.尽量使用原生数据类型代替字符串 2.尽量使用字符串代替对象 3.尽量使用数组代替Map集合
总之,代码优化的层面其实就是性能的直接体现,目标就是高效,,减少内存使用、减少shuffle,减少节点之间数据传输,最优的方式达到需要的结果。
Shuffle优化
这里提到的Shuffle是说在shuffle过程中,我们还可以做一些调整,需要跟踪shuffe的元数据来进行调整:
spark.reducer.maxSizeInFlight 48M shuffle一次拉取数据的缓存
spark.shuffle.io.maxRetries 3 : shuffle 拉取数据task 失败重试次数
spark.shuffle.io.retryWait 5s : shuffle拉取数据task 重试等待间隔
spark.shuffle.sort.bypassMergeThreshold : 200 ,bypass机制开启的条件之一,另一个使用bypass条件是map端不能有预聚合
内存优化
Spark一直号称内存计算,但是实际上来说数据体量大的时候内存也成为一个负担,内存的合理使用才能达到最优,相反的话会开始使用磁盘进行数据交换,整体性能下降,内存分配有以下原则: 1.task运行内存多一些,减少磁盘溢出 2.合理调整Spark内存分布 静态内存分布–过去的使用方式,导致内存不能合理使用,造成很大程度的浪费 统一内存分布–2.x之后的方式,目的是为了统一内存的分配 3. 堆外内存调整 spark.executor.memoryOverhead=2048M 举例说明如下:
总共300M预留,会按照如下配置进行分配
(总-300M) * 0.6 -- spark.memory.fraction
0.5 : RDD缓存和广播变量 --spark.memory.storageFraction
0.5 : shuffle聚合内存
(总-300M) * 0.4 task运行内存
数据倾斜处理
数据倾斜出现有以下的场景: MapReduce :reduce task处理的数据相对于其他 task来说处理的数据量多 – shuffle导致 Hive :Hive中某列下对应的相同key非常多,这张Hive表有数据倾斜 Spark : Spark rdd中某个分区的数据相对于其他分区来说数据量多–shuffle导致
解决方法
1.使用Hive ETL预处理
2.过滤少数倾斜的key
3.增加并行度
场景:不同的key多,分区少,可以直接增加并行度
4.双重聚合
场景:相同的key多,分区少
解决思路:将key 打散(随机加前缀),聚合,再去前缀,再聚合
5.使用map join 代替reduce join
场景:两个RDD要join, 一个RDD大,有数据倾斜,一个RDD小
解决方式:可以考虑直接将小的RDD回收广播,对有数据倾斜的RDD直接使用map类的算子操作
6.找出倾斜key ,分拆join
场景:两个RDD要join,一个RDD大,少量的key有数据倾斜,另一个RDD小,但是无法采用第五种方案
解决方式:找出倾斜的key,分成倾斜的RDD与不倾斜的RDD,倾斜RDD中,一个随机加前缀一个膨胀处理,正常RDD正常join,最后结果union一起
7.使用随机前缀和扩容RDD进行join
场景:两个RDD要join,一个RDD大,大量的key有数据倾斜,另一个RDD小,无法采用第五种方案解决
解决:直接对RDD进行随机加前缀与膨胀处理
存储介质优化
对于spark内存计算来说,存储的rpc请求往往是瓶颈,在spark的作业中其实目标就是优化读取速度,我们会按照不同的读写频率存储,我们生产实践中把数据直接cache内存,用的就是alluxio,存储的优化归纳如下:
StoragePolicies
存储类型
ARCHIVE
DISK
SSD
RAM_DISK
存储策略
hot
Cold
Warm
资源隔离
这点在生产上体会比较多,那就是我们需要发现热点节点,经常出的问题就是在某一台的节点上一直task很慢,这种时候需要去怀疑这台机器了,热点的数据,长任务,高io都会影响,这种时候可以考虑做一些资源隔离的方案,yarn的node-lables进行打标处理,做到在大数据层面的灰度机制。
yarn.scheduler.capacity.root.accessible-node-labels=*
yarn.scheduler.capacity.root.xxxx.accessible-node-labels=label_xx
yarn.scheduler.capacity.root.yyyy.accessible-node-labels=label_xx,label_yy
yarn.scheduler.capacity.root.default.default-node-label-expression=
yarn.scheduler.capacity.root.xxxx.default-node-label-expression=label_xx
yarn.scheduler.capacity.root.yyyy.default-node-label-expression=label_yy
后记
原始的RDD级别的优化其实大家接触不会太多,就其原因是这个难度其实是比较大的,引擎级别也在不断改进,到了SparkSQL的时代,多了很多智能化的调整机制,例如AQE等,但是spark内核级别的调整有更加广泛的适应性,期待能带来一些有效的帮助。
|