一、Hive数据倾斜
一般都发生在Sql中group?by和join?on上
1.不同数据类型关联产生数据倾斜
例如:两个表进行关联,其中一个字段类型是int一个是String
解决:把数字类型转换成字符串类型
2.空值分布:经常有大量的空值数据进入到一个reduce中导致数据倾斜
解决:自定义分区,将为空的key转变为字符串+随机数,将因为空值造成的数据倾斜数据分不到不同的reducer中(如果一下特殊异常值不需要可以直接where过滤掉)
3.group?by代替distinct
4.开启mapjoin
5.开启负载均衡:先随机分发处理再按照key?group by来分发处理
6.设置多个reduce数量,默认为1
二、Spark数据倾斜
- Executor?lost,OOM,Shuffle过程出错
- Driver OOM
- 单个Executor执行时间特别久?整体任务卡住
- 正常运行的任务突然失败
1.快速定位到导致数据倾斜的代码
Spark数据倾斜只会发生在shuffle过程中
触发shuffle的算子有:distinct/groupByKey/reduceByKey/aggregateByKey/join/repatition/cogroup等
(出现数据倾斜可能就是使用了这些算子其中的某一个导致的)
1.1某个task执行的特别慢
首先看数据倾斜发生在第几个stage中:
1)yarn-cluster模式:通过Spark Web UI(4040)来查看当前运行到了第几个stage
2)yarn-client模式:通过在提交的机器本地是直接可以看到log,在log中查看运行情况
查看什么:
1)task运行时间,有的task几秒钟运行完有的task几分钟才运行完
2)task数据量,运行时间短的task处理处理几kb的数据,运行时间久的处理几千kb,数据量差值大
1.2查看导致数据倾斜的key分布情况:
可以pairs采样10%的样本数据使用countByKey算子统计出每个key的出现次数。
2.解决方案
1.频繁使用hive表,本身数据不均匀,某个key对应100万,某个key对应10条
可以通过hive进行预处理或预先和其他表进行join,避免了shuffle类算子(治标不治本)
2.导致数据倾斜的key就几个,99%的key对应10条数据,但是只有一个key对应100万
where子句过滤掉这些key或者使用fillter算子(使用场景不多)
3.提升shuffle并行度,默认200,缩短task执行时间(能够缓解不能根除)
4.聚合类的shuffle算子或在sparksql中使用group?by语句进行分组的(聚合类)
局部聚合+全局聚合,两次mr,每个key+随机值,进行局部聚合,去掉key前缀进行全局聚合
5.对rdd使用join类操作或spark?sql中使用join(rdd或表的数据比较小)
小表join大表?规避shuffle
不适用join算子进行连接操作,使用广播变量与map类算子实现join,规避掉shuffle类操作,小表进行广播(常用最优方案)
6.两个rdd或表join时数据量都比较大,其中一个因为rdd/表中的少数几个key数据量过大,而另一个分布都比较均匀
通过simple算子采样研究计算数据量最大的是哪几个key,将这几个key对应的数据从原rdd中拆分出来形成单独rdd并给每个key+随机数前缀打散成n份去进行join,此时这几个key对应的数据不会集中在少数几个task上而是分散到多个task进行join了
7.进行join操作时RDD中有大量的key导致数据倾斜进行拆分key也没意义
打散大表?扩容小表
使用随机前缀+扩容rdd进行join(缓解数据倾斜对内存要求高)
总结:如果只是处理简单的小场景使用上述方法基本可以解决,如果复杂问题建议多种组合一起使用!
|