想提高MapReduce的执行效率,MapReduce是分为Map阶段和Reduce阶段,其实提高执行效率就是提高这两个阶段的执行效率
默认情况下Map阶段中Map任务的个数是和数据的InputSplit相关的,InputSplit的个数一般是和Block块是有关联的,所以可以认为Map任务的个数和数据的block块个数有关系,针对Map任务的个数我们一般是不需要干预的,除非是前面我们说的海量小文件,那个时候可以考虑把小文件合并成大文件。其他情况是不需要调整的, 那就剩下Reduce阶段了,咱们前面说过,默认情况下reduce的个数是1个,所以现在MapReduce任务的压力就集中在Reduce阶段了,如果说数据量比较大的时候,一个reduce任务处理起来肯定是比较慢的,所以我们可以考虑增加reduce任务的个数,这样就可以实现数据分流了,提高计算效率了。
那就剩下Reduce阶段了,咱们前面说过,默认情况下reduce的个数是1个,所以现在MapReduce任务的压力就集中在Reduce阶段了,如果说数据量比较大的时候,一个reduce任务处理起来肯定是比较慢的,所以我们可以考虑增加reduce任务的个数,这样就可以实现数据分流了,提高计算效率了。
如果增加Reduce的个数,那肯定是要对数据进行分区的,分区之后,每一个分区的数据会被一个reduce任务处理
我们可以通过job.setPartitionerClass来设置分区类,不过目前我们是没有设置的,那框架中是不是有默认值啊,是有的,我们可以通过job.getPartitionerClass方法看到默认情况下会使用HashPartitioner这个分区类
@Public
@Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public HashPartitioner() {
}
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & 2147483647) % numReduceTasks;
}
}
HashPartitioner继承了Partitioner,这里面其实就一个方法,getPartition,其实map里面每一条数据都会进入这个方法来获取他们所在的分区信息,这里的key就是k2,value就是v2
return (key.hashCode() & 2147483647) % numReduceTasks;
其实起决定性的因素就是numReduceTasks的值,这个值默认是1,通过job.getNumReduceTasks()可知。 所以最终任何值%1 都返回0,那也就意味着他们都在0号分区,也就只有这一个分区。
如果想要多个分区,很简单,只需要把numReduceTasks的数目调大即可,这个其实就是reduce任务的数量,那也就意味着,只要redcue任务数量变大了,对应的分区数也就变多了,有多少个分区就会有多少个reduce任务,那我们就不需要单独增加分区的数量了,只需要控制好Redcue任务的数量即可。
增加reduce任务个数在一定场景下是可以提高效率的,但是在一些特殊场景下单纯增加reduce任务个数是无法达到质的提升的。
假设我们有一个文件,有1000W条数据,这里面的值主要都是数字,1,2,3,4,5,6,7,8,9,10,我们希望统计出来每个数字出现的次数 其实在私底下我们是知道这份数据的大致情况的,这里面这1000w条数据,值为5的数据有910w条左右,剩下的9个数字一共只有90w条,那也就意味着,这份数据中,值为5的数据比较集中,或者说值为5的数据属于倾斜的数据,在这一整份数据中,它占得比重比其他的数据多得多。
那根据我们前面的分析,我们可以增加reduce任务的数量,看下面这张图,我们把reduce任务的数量调整到10个,这个时候就会把1000w条数据让这10 个reduce任务并行处理了,这个时候效率肯定会有一定的提升,但是最后我们会发现,性能提升是有限的,并没有达到质的提升,那这是为什么呢? 我们来分析一下,刚才这份数据中,值为5的数据有910w条,这就占了整份数据的90%了,那这90%的数据会被一个reduce任务处理,在这里假设是让reduce5处理了,reduce5这个任务执行的是比较慢的,其他reduce任务都执行结束很长时间了,它还没执行结束,因为reduce5中处理的数据量和其他reduce中处理的数据量规模相差太大了,所以最终reduce5拖了后腿。咱们mapreduce任务执行消耗的时间是一直统计到最后一个执行结束的reduce任务,所以就算其他reduce任务早都执行结束了也没有用,整个mapreduce任务是没有执行结束的。
那针对这种情况怎么办? 这个时候单纯的增加reduce任务的个数已经不起多大作用了,如果启动太多可能还会适得其反。 其实这个时候最好的办法是把这个值为5的数据尽量打散,把这个倾斜的数据分配到其他reduce任务中去计算,这样才能从根本上解决问题。 这就是我们要分析的一个数据倾斜的问题 MapReduce程序执行时,Reduce节点大部分执行完毕,但是有一个或者几个Reduce节点运行很慢,导致整个程序处理时间变得很长 具体表现为:Ruduce阶段一直卡着不动
根据刚才的分析,有两种方案
1.增加reduce任务个数,这个属于治标不治本,针对倾斜不是太严重的数据是可以解决问题的,针对倾斜严重的数据,这样是解决不了根本问题的
2.把倾斜的数据打散
这种可以根治倾斜严重的数据。
package com.helloworld.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* 数据倾斜-增加reduce任务个数
*
* Created by jinzhida
*/
public class WordCountJobSkew {
/**
* map阶段
*/
public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 需要实现map函数
* 这个map函数就是可以接<k1,v1> 产生k2 v2
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
//输出k1,v1的值
//System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString());
//logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//k1 代表的是每一行数据的行首偏移量 v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(words[0]);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2,v2);
}
}
/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
Logger logger = LoggerFactory.getLogger(MyReducer.class);
/**
* 针对<>k2,{v2...}> 的数据进行累加求和,并且最终把数据转化为k3,v3写出去
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
//创建一个sum变量, 保存v2s的值
long sum = 0L;
//对v2s中的数据累加求和
for(LongWritable v2:v2s){
//输出k2,v2的值
//System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+"");
//logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
sum += v2.get();
//模拟Reduced的复杂计算消耗的时间
if(sum % 200 == 0){
Thread.sleep(1);
}
}
//组装k3, v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
//输出k3,v3的值
//System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+"");
//logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
//把结果写出去
context.write(k3,v3);
}
}
/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args){
try{
if(args.length!=3){
System.exit(100);
}
//指定Job需要的配置参数
Configuration conf = new Configuration();
//创建一个Job
Job job = Job.getInstance(conf);
//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordConutJob这个类的
job.setJarByClass(WordCountJobSkew.class);
//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//指定map相关的代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
job.getPartitionerClass();
//指定reduce相关的代码
job.setReducerClass(MyReducer.class);
//指定k3类型
job.setOutputKeyClass(Text.class);
//指定v3类型
job.setOutputValueClass(LongWritable.class);
//设置reduce任务个数
job.setNumReduceTasks(Integer.parseInt(args[2]));
//提交job
job.waitForCompletion(true);
}catch (Exception e){
e.printStackTrace();
}
}
}
对项目代码进行重新编译、打包、提交到集群去执行 第一次先使用一个reduce任务执行 然后我们再到yarn的web界面查看任务的执行情况 任务总的执行消耗时间为:Elapsed: 2mins, 46sec
具体分析Reduce任务的执行时间 Shuffle执行的时间为18秒,Reduce执行的时间为1分37秒
接下来增加reduce任务的数量,增加到10个 任务总的执行消耗时间为:Elapsed: 2mins, 43sec 仅提升了3秒,所以从这可以看出来,性能提升并不大
具体分析Reduce任务的执行时间 其中这里面有一个reduce任务消耗的时间比较长,其他reduce任务的执行时间都是4~5秒,这个reduce任务的执行时间是1分26秒,那就意味着值为5的那910w数据进入到这个reduce了,所以它执行的比较慢。
那我们再把reduce任务的个数提高一下,会不会提高性能呢?不会了,刚才从1个reduce任务提高到10个reduce任务时间也就减少了三四秒钟,所以再增加reduce任务的个数就没有多大意义了。 那接下来就需要使用我们的绝招了,把倾斜的数据打散,在这里就是把5这个数字打散, 怎么打散呢?其实就是给他加上一些有规律的随机数字就可以了 在这里我们这样处理,我把5这个数值的数据再分成10份,所以我就在这个数值5后面拼上一个0~9的随机数即可。
针对这个操作我们需要去修改代码,在这里我们再重新复制一个类,基于WordCountJobSkew复制,新的类名是WordCountJobSkewRandKey
package com.helloworld.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Random;
/**
* 数据倾斜-把倾斜的数据打散
*
* Created by jinzhida
*/
public class WordCountJobSkewRandkey {
/**
* map阶段
*/
public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
Random random = new Random();
/**
* 需要实现map函数
* 这个map函数就是可以接<k1,v1> 产生k2 v2
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
//输出k1,v1的值
//System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString());
//logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//k1 代表的是每一行数据的行首偏移量 v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//把迭代出来的单词封装成<k2,v2>的形式
String key = words[0];
if("5".equals(key)){
//把倾斜的key打散,分成10份
key = "5" + "_" + random.nextInt(10);
}
Text k2 = new Text(key);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2,v2);
}
}
/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
Logger logger = LoggerFactory.getLogger(MyReducer.class);
/**
* 针对<>k2,{v2...}> 的数据进行累加求和,并且最终把数据转化为k3,v3写出去
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
//创建一个sum变量, 保存v2s的值
long sum = 0L;
//对v2s中的数据累加求和
for(LongWritable v2:v2s){
//输出k2,v2的值
//System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+"");
//logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
sum += v2.get();
//模拟Reduced的复杂计算消耗的时间
if(sum % 200 == 0){
Thread.sleep(1);
}
}
//组装k3, v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
//输出k3,v3的值
//System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+"");
//logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
//把结果写出去
context.write(k3,v3);
}
}
/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args){
try{
if(args.length!=3){
System.exit(100);
}
//指定Job需要的配置参数
Configuration conf = new Configuration();
//创建一个Job
Job job = Job.getInstance(conf);
//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordConutJob这个类的
job.setJarByClass(WordCountJobSkewRandkey.class);
//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//指定map相关的代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
job.getPartitionerClass();
//指定reduce相关的代码
job.setReducerClass(MyReducer.class);
//指定k3类型
job.setOutputKeyClass(Text.class);
//指定v3类型
job.setOutputValueClass(LongWritable.class);
//设置reduce任务个数
job.setNumReduceTasks(Integer.parseInt(args[2]));
//提交job
job.waitForCompletion(true);
}catch (Exception e){
e.printStackTrace();
}
}
}
只需要在map中把k2的值修改一下就可以了,这样就可以把值为5的数据打散了。 编译打包,提交到集群 执行成功之后查看结果 注意,这个时候获取到的并不是最终的结果,因为我们把值为5的数据随机分成多份了,最多分成10份 任务总的执行消耗时间为:Elapsed: 1mins, 39sec 这次任务执行时间节省了1分钟多的左右,在这就属于质的提升了,相当于节省了将近一半的时间了 但是这个时候我们获取到的最终结果是一个半成品,还需要进行一次加工,前面把这个倾斜的数据打散之后相当于做了一个局部聚合,现在还需要再开发一个mapreduce任务再做一次全局聚合,其实也很简单,获取到上一个map任务的输出,在map端读取到数据之后,对数据先使用空格分割,然后对第一列的数据再使用下划线分割,分割之后总是取第一列
|