Spark数据倾斜
一、定义
不同的Key对应的数据量不一样,少量的Task被分配到了绝大多数的数据可能会发生数据倾斜
数据倾斜的表现:
- Spark的大部分Task作业正常,只有几个比较慢
- Spark的某个Task在运行过程中出现了OOM,作业无法正常运行
定位数据倾斜问题:
- 检查Shuffle算子的逻辑有没有问题,会不会出现数据倾斜的问题
- 检查Spark的Log文件,会定位到代码的某一行,根据报错去检查对应的Stage的shuffle算子
二、解决方案一:聚合原始数据
- 避免Shuffle过程
- 作业的来源是Hive表,数据经过ETL的昨天的数据:在Hive中进行聚合操作,例如按照key进行分组,将同一个key对应的value写成一个拼接字符串的形式
- 此时的数据就是一条数据了,之后对一个key的value只需要进行map操作就可以了
- 增大key的力度(减少数据倾斜的可能性,但是每个Task的数量会增加)
三、解决方案二:过滤导致倾斜的Key
? 在作业中允许丢弃某些数据的话,就可以直接丢去那些可能导致数据倾斜的Key
四、解决方案三:提高Shuffle中reduce的并行度
? 方案一和方案二不行的时候,可以考虑提高reduce的并行度,reduce的并行度提高就增加了Reduce端的Task 的数量,每个Task分到的数量就相对较少了,缓解了数据倾斜的问题。
? 不是根本上解决数据倾斜的问题,上面的几个方法是从根本上解决了。
五、解决方案四:使用随机Key实现双重聚合
六、解决方案五:将reduce join转换成map join
**原先过程:**join操作都是会执行reduce join:将所有的key对应的value汇聚到一个reduce task中
**优化过程:**广播RDD全量数据,+map操作实现join
? 注:RDD不能进行广播,只有将RDD内部的数据通过collect拉取到Driver内存然后再进行广播。
**核心思想:**避免Shuffle过程,将较小的数据保存到broadcast中,将大数据进行map操作,在算子函数内,获取Broadcast变量,按照自己想要的操作进行join操作
**使用场景:**一个较大的RDD和一个较小的RDD,两个都是大的数据,由于广播变量在每一个Excutor中都要保存一个备份,可能出现OOM的问题
七、解决方案六:sample采样,对倾斜的key进行单独的join
**处理流程:**某个RDD中某一个Key的数据量特别的大,将改key单独提取出来作为一个RDD,再和其他的key进行join操作。
? 注:可以使用sample采样10%的数据,观察需要单独写成RDD的key
**不适用场景:**引发数据倾斜的数据是多个Key导致
八、解决方案七:使用随机数扩容进行join
核心思想:对一个RDD进行扩容,再对另外一个RDD进行稀释
不适用场景:两个RDD都大
进阶方案:方案七+方案六
- 对包含几个少数数据量过大的Key的RDD,进行sample采样,确定出最大的数据量的那几个key
- 将这几个Key从原来的RDD中分离出来,形成单独的RDD,并给每个key打上随机的前缀、不会发生数据倾斜的数据形成另外一个RDD。
- 将需要join的另外一个RDD,过滤出来那几个倾斜的key对应的数据形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按照顺序附加一个0-n的前缀,不会导致数据倾斜的大部分的key形成一个RDD。
- 再将附加了随机前缀的独立RDD与另外一个膨胀了n倍的独立RDD进行join,此时就可以将原先相同的key打撒成n份了。
- 另外两个普通的RDD直接进行join就行了
- 最后将两次join的数据使用union合并起来就行,就是会最终的join的结果
|