分区的概念:
????????Map阶段处理的数据,在向环形缓冲区写的时候 是以分区的方式写的。 一般情况下,MR程序分区数有多少 reduceTask数量就应该有多少 ,一个分区的数据一个reduceTask去处理,reduceTask处理完成之后都会生成一个结果文件
举个例子:
mapper阶段输出的分区是5个 但是reduceTask数量是1,能否将MR运行成功呢?
可以运行成功 只有一个文件 五个分区 但是一个reduceTask可以去处理五个分区数据
????????那么我们衍生出这么几个问题:
若分区5个 但是reduceTask是2个, 则不能运行 运行报错 不患寡而患不均
若分区5个 reduceTask5个 ,百分百可以正常运行 ----最理想的状态
若分区5个 reduceTask大于6个 ,百分百也可以运行成功,但是会多出一个空白结果文件
【注意】:以后在工作中写的 我们的reduceTask数量最好和分区数保持一致 这样的话处理才是MR程序认位的最佳状态
一、MR的默认分区(不需要自定义)
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
????????默认分区是根据key的hashCode对reduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。默认类为HashPartitioner。
使用默认分区需要在Driver类中调用语句:
job.setNumReduceTasks(3);
二、自定义分区
??????? 默认分区机制HashPartitioner有个缺点,那就是不可控(不好控制把结果文件输出到哪里),所以我们需要自定义分区。
接下来我们做一个案例:
需求:将统计结果按照手机归属地不同省份输出到不同文件中(Partitioner)
文本数据:

?
分析
(1)Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。默认的分发规则为:根据key的hashcode%reducetask的数值来分发
(2)如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner
自定义一个CustomPartitioner继承抽象类:Partitioner
(3)在job驱动中,设置自定义
partitioner: job.setPartitionerClass(CustomPartitioner.class)
* 默认分区机制----5个区---设置5个reduceTask, 同时默认的分区机制是按照key的hashcode值
public class Driver {
public static void main(String[] args) throws Exception {
//定义配置项以及用来获取Job任务对象
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.100.3:9000");
Job job = Job.getInstance(conf);
//job关联Mapper reducer阶段
job.setJarByClass(Driver.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//定义我们的输入的数据路径以及输出的数据路径 以及输入的实现类
FileInputFormat.setInputPaths(job, new Path("/phone_data.txt"));
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.3:9000"), conf, "root");
Path outPath = new Path("/output3");
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
//定义不去使用默认的HashPartitioner分区 而是使用我们的自定义分区
job.setPartitionerClass(MyPartitioner.class);
//你的reduceTask数量必须是5
/**
* 你的reduceTask数量必须是5 原因就是在MR程序中一般【默认情况下】是一个分区要有一个reduceTask专门去处理
*但是在有些情况下 reduceTask我们可能少些或者多些 这样的话会出一下奇怪的问题
* mapper阶段输出的分区是5个 但是reduceTask数量是1 可以运行成功 只有一个文件 五个分区 但是一个reduceTask可以去处理五个分区数据
* 分区5个 但是reduceTask是2个 不能运行 运行报错 不患寡而患不均
* 分区5个 reduceTask5个 百分百可以正常运行 ----最理想的状态
* 分区5个 reduceTask大于6个 百分百也可以运行成功,但是会多出一个空白结果文件
*【注意】:以后在工作中写的 我们的reduceTask数量最好和分区数保持一致 这样的话处理才是MR程序认位的最佳状态
*/
job.setNumReduceTasks(3);
//提交运行
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
/**
* 把手机号的前三位当key 整条数据当作value输出到reducetask阶段
*/
class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
//13666666666
String phone = fields[1];
//拿到手机号的前三位
String phoneThree = phone.substring(0, 3);
context.write(new Text(phoneThree), value);
}
}
class MyReducer extends Reducer<Text, Text, NullWritable, Text> {
/**
* 136 【1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
* ,1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200】
*
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//第一个Text代表手机号
Iterator<Text> iterator = values.iterator();
while (iterator.hasNext()) {
context.write(NullWritable.get(), iterator.next());
}
}
}
将文件中的手机号按照归属地不同放到不同的文件中
* 136----分区1
* 137----分区2
* 138----分区3
* 139----分区4
* 其他----分区5
必须使用自定义分区:
自定义分区机制 1、继承Partitioner这个类 2、重写里面的getPartition方法 返回值是一个int类型 返回值就是我的分区 ? ??????? 继承Partitioner之后 需要区传递一个key-value键值对的泛型 代表的是我们的数据 ????????那么需要传递的是map阶段输出的key-value类型 因为分区是在map阶段执行结束输出数据的时候执行的
/**
* 自定义分区机制
* 1、继承我们的Partitioner这个类
* 2、重写里面的getPartition方法 返回值是一个int类型 返回值就是我的分区
*
*继承Partitioner之后 需要区传递一个key-value键值对的泛型 代表的是我们的数据
* 那么需要传递的是map阶段输出的key-value类型 因为分区是在map阶段执行结束输出数据的时候执行的
*/
public class MyPartitioner extends Partitioner<Text,Text> {
/**
*
* @param key map阶段输出的key值
* @param value map阶段输出的value值
* @param numReduceTasks 定义的reduceTask的任务数据 默认是1
* @return 数字 代表的是我要将当前的这条key-value数据输送到哪个分区?
*/
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
String s = key.toString();
switch(s){
case "136":
return 0;
case "137":
return 1;
case "138":
return 2;
case "139":
return 3;
default:
return 4;
}
}
}
接下来我们在上面的Driver类里需要添加如下代码:
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(5);
注意:
????????如果reduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
????????如果1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
????????如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;
|