一、概述
分区其实就是分类,比如数据中有很多电话号码,我们想根据电话号码的前三位将数据写入不同的文件中,就需要用到分区。分区是Shuffle阶段中的,往后还会对各个分区中的数据进行排序、归并、压缩等操作。
默认采用的分区是HashPartitioner (继承了 Partitioner ),它会根据键的Hash值进行分区,具体放到那个分区中由分区数 numReduceTasks 决定。例如设置 numReduceTasks 为2,那么一个Hash值模2的结果只会是0和1,所以最终会产生两个分区。 其实这样说也不太对,因为默认的 ReduceTask 数 numReduceTasks 是1。通过看源码我们知道,如果 ReduceTask 数不是大于1的话,是不会走具体的 Partitioner 类的,而是直接new了一个内部类返回-1,所以它其实并没有走 HashPartitioner ,只有当你设置了 ReduceTask 数大于1并且没有指定分区类的话才会走 HashPartitioner。 所以我们如果想自定义分区的话,不仅要继承 Partitioner 类实现 getPartition 方法,还要指定 ReduceTask 数 numReduceTask。
二、自定义分区类
新建 Java 类文件 ProvincePartitioner,按手机号前三位进行分区。
package com.pineapple.mapreduce.partitioner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
String phone = text.toString();
String prePhone = phone.substring(0, 3);
switch (prePhone) {
case "136":
return 0;
case "137":
return 1;
case "138":
return 2;
case "139":
return 3;
default:
return 4;
}
}
}
因为 Partition 分区是属于 Shuffle 阶段,也就是在 Map 之后,所以 Partitioner 的泛型必须是 Map 的后两位泛型 ,即 Map 输出 KV 的类型。关于返回值,MR 是有严格的规定的,只能从0开始依次往后加,不能跳,例如:return 4 后是 return 10,这是不行的。
三、Driver 的设置
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);
如果忘记设置 ReduceTask 数,前面说过了默认 ReduceTask 数是1,它不会用我们定义的Partitioner。 那如果分区数小于实际 Partitioner 返回值的个数会怎样?例如设置 ReduceTask 数为4,这时候会发现报了个 IOException 错误 因为这种情况 Shuffle 后产生了 5 个分区,却只开了4个MapTask,剩下的一个分区无法处理了,于是报错。
如果设置 ReduceTask 数大于分区数会怎么样?例如设置它为6,这时候会发现多了一个空文件
四、总结
- Partitioner 的泛型必须是 Map 的后两位泛型
- 返回值必须从0开始依次往后加,不可跳过。
- 如果不设置 ReduceTask ,其值默认为1,将不会使用我们的Partitioner
- 若 ReduceTask 个数小于 getPartition 返回值个数,将报 IOException
- 若 ReduceTask 个数大于 getPartition 返回值个数,将产生多余的空文件
Github 仓库地址:https://github.com/pineapple-cpp/MapReduceDemo
喜欢我的文章的话,欢迎关注 👇点赞 👇评论 👇收藏 👇 谢谢支持!!!
|