在
Shufflle阶段 输入到
reduce阶段 之前,会进行分组
默认分组规则 就是同一个
key 就会进入同一个
reduce方法 中,并且这些
同一个key 的所有的值将会存储在一个
迭代器values 之中,也就是
reduce方法 的
第二个参数
既然同一个
key 会进入到同一个比较器之中,那么判断同一个
key 就会涉及到
比较 ,也就是
分组比较 。也就是通过比较判断这个
key 是否同一个然后将所对应值整合到一个迭代器
values 中,然后被同一个
reduce 方法处理
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
1 默认分组比较器
我们一般运行实例(没有配置任何分组相关的配置)都会看到数据已经进行分组,hadoop 的分组比较事实上是通过分组比较器实现的,存在默认的分组比较器。从ReduceTask.class 可以看到该比较器
通过getOutputValueGroupingComparator方法 可以拿到该默认比较器 在getOutputValueGroupingComparator方法 中可以看到,首先是获取配置类中设置的类
这里获取的是mapreduce.job.output.group.comparator.class 对应的值,可以在mapred-site.xml 文件中进行配置该分组比较器类,而默认配置文件(mapred-default.xml )中并没有配置该类 如果在配置文件中配置了该分组比较器,那么直接反射方法创建该分组比较器并返回
如果没有配置该类那么就调用getOutputKeyComparator方法 获取比较器类
关于更多getOutputKeyComparator方法 获取比较器可以参考 :MapReduce学习4-1:排序
2 分组案例
默认分组是通过默认的分组比较器实现的,也可以通过自定义分组比较器,自定义进入同一个组的数据的规则,而不限于比较整个key 相同才进入同一个分组
1、需求:一个订单中有会有不同的商品,不同商品会产生一定的成交额,求出一堆订单中每个订单中最高的成交额,并且按订单id 进行升序排序
2、分析:将整体数据按订单id 升序排序,并且在在同一个订单内按金额降序排序。也就是整体升序排序,局部降序排序
3、输入数据
订单id 商品 id 成交金额
10000001 pdt_01 222.8
10000002 Pdt_03 522.8
10000002 pdt_04 122.4
10000003 pdt_06 232.8
10000003 pdt_02 33.8
10000001 pdt_02 33.8
10000002 pdt_05 722.4
4、期望输出:每个订单中成交额最大的记录
10000001 222.8
10000002 722.4
10000003 232.8
5、GroupCompareDriver.class
package com.groupCompare.maven;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class GroupCompareDriver {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(GroupCompareDriver.class);
job.setMapperClass(GroupCompareMapper.class);
job.setReducerClass(GroupCompareReducer.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(OrderGroupCpmparator.class);
FileInputFormat.setInputPaths(job, new Path("E:\\bigdata\\study\\test_files\\groupinput"));
FileOutputFormat.setOutputPath(job, new Path("E:\\bigdata\\study\\test_files\\groupoutput"));
job.waitForCompletion(true);
}
}
6、GroupCompareMapper.class
package com.groupCompare.maven;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class GroupCompareMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] infos = line.split("\\W+");
OrderBean outK = new OrderBean();
outK.setOrderId(infos[0]);
outK.setPrice(Double.parseDouble(infos[2]));
context.write(outK, NullWritable.get());
}
}
7、OrderBean.class
package com.groupCompare.maven;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
private String orderId;
private Double price;
public OrderBean(){
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.orderId);
out.writeDouble(this.price);
}
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.price = in.readDouble();
}
@Override
public int compareTo(OrderBean o) {
int order_compare_result = this.orderId.compareTo(o.getOrderId());
return order_compare_result == 0? -this.price.compareTo(o.getPrice()):order_compare_result;
}
@Override
public String toString() {
return this.getOrderId()+"\t"+this.getPrice();
}
}
8、GroupCompareReducer.class
package com.groupCompare.maven;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class GroupCompareReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, values.iterator().next());
}
}
这里compareTo 是实现排序的一种方式,这里实现同一个分区内的数据的排序比较,而不会分组比较,这里注意区分,例如该排序会发生在Map阶段输出数据到环形缓冲区,在数据将要输出到磁盘之前,会对每个分区的数据进行快速排序,这里的快速排序就会调用上述比较。相关Shfflle原理 可以参考:MapReduce学习4:框架原理详解
这里compareTo是实现排序 的一种方式,更多可以参考:MapReduce学习4-1:排序
9、OrderGroupCpmparator.class :实现分组比较
package com.groupCompare.maven;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class OrderGroupCpmparator extends WritableComparator {
public OrderGroupCpmparator(){
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean aBean = (OrderBean)a;
OrderBean bBean = (OrderBean)b;
return aBean.getOrderId().compareTo(bBean.getOrderId());
}
}
对应传入的两个数据进行比较,如果返回0 ,那么就会进入同一个分组,其他值不会进入同一个分组
分组是在有序基础上实现的,对于上述测试数据,对于订单id还是成交金额都是无序的
订单id 商品 id 成交金额
10000001 pdt_01 222.8
10000002 Pdt_03 522.8
10000002 pdt_04 122.4
10000003 pdt_06 232.8
10000003 pdt_02 33.8
10000001 pdt_02 33.8
10000002 pdt_05 722.4
经过reduce之前的归并排序,就会整理成如下,按orderId聚集,并且按orderId升序排序,聚集的部分按成交金额降序排序
10000001 pdt_01 222.8
10000001 pdt_02 33.8
10000002 pdt_05 722.4
10000002 Pdt_03 522.8
10000002 pdt_04 122.4
10000003 pdt_06 232.8
10000003 pdt_02 33.8
那么分组的一句就是不是实现分好的,而是调用reduce方法 之前首先是进行比较的而比较的规则就是我们的设定,本次案例就是比较ordreId ,orderId 相同就会进入同一个分组
如上述已经有序的数据,他会首先获取第1行 数据,然后用第1行 数据进行对比,使用跟我们的规则,发现第2行 的orderId 跟自己相同,但是第3行 不相同,那么前两行分为一个组,然后被redcue方法 处理。下一次如法炮制,从第3行 开始
假设经过reduce 之前的归并排序后变成了以下
10000001 pdt_01 222.8
10000001 pdt_02 33.8
10000002 pdt_05 722.4
10000002 Pdt_03 522.8
10000003 pdt_06 232.8
10000003 pdt_02 33.8
10000002 pdt_04 122.4
那么第3、4行 的数据会进入一个分组并被reduce方法 处理,最后一行 的数据会被单独当成一个分组,即使orderId 是相同的
在reduce方法 中,key 事实上指向一个栈中的地址 ,指向同一块内存,而内存在栈中,也就是说reduce方法 中,key是会被重复利用 的,而改变的是堆内存的内容 ,因而更可以获取“不同的key”
|