排序
MapReduce程序中数据都会被排序,不管逻辑上是否需要; 排序是MapReduce框架中最重要的操作之一;
MapTask中有两次排序 1.环形缓冲区达到阈值,对缓冲区中数据进行快排,然后再溢写到磁盘 2.Map处理完数据后,对磁盘上所有文件进行合并 采用归并排序
ReduceTask中有一次排序 从MapTask中远程拷贝相应的数据文件到内存中,当文件大小达到阈值,溢写到磁盘; 如果磁盘文件数目达到阈值,进行一次归并排序; 如果内存中文件大小或者数目达到阈值,进行合并溢出到磁盘上; 数据copy完毕后,ReduceTask同一对内存和磁盘上所有数据进行一次归并排序;
排序分类
(1)部分排序
指的是每个Reduce的输出文件内是有序的
(2)全局排序
全局排序意味着只能有一个Reduce,这样才能保证输出文件只有一个,文件内部有序; 效率极低,没有利用分布式的优点
(3)辅助排序
在reduce端需要对key进行分组;默认情况下,就是按Map输出的key分组;但是如果有特殊需求,比如在进入reduce()方法之前,重新定义分组的key,让一个或者多个字段进入同一个reduce的时候,就需要分组排序;
(4)二次排序
在自定义排序过程中,如果compareTo的判断条件为两个就是二次排序
WritableComparable接口
此接口即能够序列化,又能够排序; 如果要自定义数据类型作为key,必须实现这个接口;
全排序案例实操
(1)数据源
1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
2 13846544121 192.196.100.2 264 0 200
3 13956435636 192.196.100.3 132 1512 200
4 13966251146 192.168.100.1 240 0 404
5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
7 13590439668 192.168.100.4 1116 954 200
8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
9 13729199489 192.168.100.6 240 0 200
10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
13 13560439638 192.168.100.10 918 4938 200
14 13470253144 192.168.100.11 180 180 200
15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
20 13768778790 192.168.100.17 120 120 200
21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
22 13568436656 192.168.100.19 1116 954 200
(2)需求 对总流量进行倒序排序;期望输出如下:
总流量
13509468723 7335 110349 117684
13736230513 2481 24681 27162
13956435636 132 1512 1644
13846544121 264 0 264
(3)实现步骤 1.由于需要先统计每个手机号的总流量,因此第一步就是以手机号作为key,以FlowBean作为Value进行统计; 2.第一个MapReduce的输出结果为: key=手机号 value= flowBean 3.第二个MapReduce以flowBean作为key,手机号作为value作为Mapper的输入
第一个MR输出数据格式: 目录下所有文件都会参与运算,因此这里删除无关文件SUCESS (1) Bean 实现writableComparable接口
package com.fantasy.mapreduce.writableComparable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
private long phone;
private int upFlow;
private int downFlow;
private int sumFlow;
public FlowBean() {
}
public long getPhone() {
return phone;
}
public void setPhone(long phone) {
this.phone = phone;
}
public int getUpFlow() {
return upFlow;
}
public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}
public int getDownFlow() {
return downFlow;
}
public void setDownFlow(int downFlow) {
this.downFlow = downFlow;
}
public int getSumFlow() {
return sumFlow;
}
public void setSumFlow(int sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public int compareTo(FlowBean o) {
return o.getSumFlow() - this.getSumFlow() ;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readInt();
downFlow = in.readInt();
sumFlow = in.readInt();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
注意:实现此接口,如果返回0 ,则认为key是相同的,就会进入同一组;
(2) Mapper
package com.fantasy.mapreduce.writableComparable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ComparableMapper extends Mapper<LongWritable, Text,FlowBean,Text> {
FlowBean outK = new FlowBean();
Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\t");
String phone = split[0];
int downFlow = Integer.parseInt(split[1]);
int upFlow = Integer.parseInt(split[2]);
int sumFlow = Integer.parseInt(split[3]);
outK.setDownFlow(downFlow);
outK.setUpFlow(upFlow);
outK.setSumFlow(sumFlow);
outV.set(phone);
context.write(outK,outV);
}
}
(2) Reducer
package com.fantasy.mapreduce.writableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ComparableReducer extends Reducer<FlowBean, Text,Text,FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value,key);
}
}
}
(4)Driver
package com.fantasy.mapreduce.writableComparable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ComparableDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ComparableDriver.class);
job.setMapperClass(ComparableMapper.class);
job.setReducerClass(ComparableReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
输出结果:
13509468723 110349 7335 117684
13975057813 48243 11058 59301
13568436656 25635 3597 29232
13736230513 24681 2481 27162
18390173782 2412 9531 11943
13630577991 690 6960 7650
15043685818 3538 3659 7197
13992314666 3720 3008 6728
15910133277 2936 3156 6092
13560439638 4938 918 5856
84188413 1432 4116 5548
13682846555 2910 1938 4848
18271575951 2106 1527 3633
15959002129 180 1938 2118
13590439668 954 1116 2070
13956435636 1512 132 1644
13470253144 180 180 360
13846544121 0 264 264
13729199489 0 240 240
13768778790 120 120 240
13966251146 0 240 240
二次排序
需求:order by sumFLow desc ,upFlow asc
重写compareTo即可:
@Override
public int compareTo(FlowBean o) {
int result = o.getSumFlow() - this.getSumFlow();
if(result == 0){
return this.getUpFlow() - o.getUpFlow();
}
return result;
}
13509468723 110349 7335 117684
13975057813 48243 11058 59301
13568436656 25635 3597 29232
13736230513 24681 2481 27162
18390173782 2412 9531 11943
13630577991 690 6960 7650
15043685818 3538 3659 7197
13992314666 3720 3008 6728
15910133277 2936 3156 6092
13560439638 4938 918 5856
84188413 1432 4116 5548
13682846555 2910 1938 4848
18271575951 2106 1527 3633
15959002129 180 1938 2118
13590439668 954 1116 2070
13956435636 1512 132 1644
13470253144 180 180 360
13846544121 0 264 264
13729199489 0 240 240
13966251146 0 240 240
13768778790 120 120 240
分区内排序
只需要在全排序案例中,添加一个分区器即可; key和value是mapper输出的key和value
public class partitioner extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean flowBean, Text text, int i) {
String phone = text.toString();
String preNum = phone.substring(0, 3);
int partition = 4 ;
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
|