目录
1.排序概述
2.WritableConparable排序案例实操
2.1需求
2.2 需求分析
2.3 数据准备
2.3代码实现
3.结果展示
1.排序概述
排序是Mapreduce中最重要的操作之一。无论是MapTask还是ReduceTask均会对数据按照key进行排序。该操作数据hadoop的默认行为。任何逻辑上的数据均会被排序,而不管业务逻辑上是否需要。那么如何根据业务需求,对数据进行排序呢?本文将基于下面这篇博文的基础上进行改进。
Hadoop案例:Partition类控制文件输出个数https://mp.csdn.net/mp_blog/creation/editor/121602844
2.WritableConparable排序案例实操
2.1需求
情景展现:客户经理拿着需求跟你说,这份文件除了将电话按不同省份文件输出以外,每个文件内部还要按照总流量进行排序。也就是说输出的文件首先按照总流量排序,再按照上行流量的顺序排序。
2.2 需求分析
(1)确定map函数的输入输出<key,value>的类型。这里输入是<LongWritable Text> 因为要对总流量排序,而mapreduce只能对key进行排序,因此map输出的类型为<Flowbean,Text>
(2)确定reduce函数的输入输出<key,value>的类型。map的输出为reduce的输入,因此reduce的输入类型<Flowbean,Text> 输出<Text,Flowbean>
(3)因为要分区,因此需要自定义分区函数继承Partition类实现getPatition()方法。分区是在map开始之后做的,因此这里的输入数据类型为map的输出数据类型<Flowbean,Text>
(4)因为要实现自定义排序,而排序的内容是Flowbean对象中的总流量,因此定义Flowbean类需要继承WritableComparable重写compareTo()方法、重写toString()方法、做序列化。
(5)在Driver类中的main方法中,把自定义的相关类关联起来
2.3 数据准备
2.3代码实现
自定义Flowbean包装上行流量、下行流量以及总流量
package com.yangmin.mapreduce.partitionandwritableComparable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //总流量
//提供无参构造
public FlowBean() {
}
//生成三个get/set方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
//实现序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.upFlow);
out.writeLong(this.downFlow);
out.writeLong(this.sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public int compareTo(FlowBean o) {
// 按总流量倒序排列
if (this.sumFlow > o.sumFlow){
return -1;
}else if (this.sumFlow < o.sumFlow){
return 1;
}else {
//按照上行流量的正序排
if (this.upFlow > o.upFlow){
return -1;
}else if (this.upFlow < o.upFlow){
return 1;
}else {
return 0;
}
}
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
自定义ProvincePartion类继承WritableComparable,注意这里的key,value输入类型
package com.yangmin.mapreduce.partitionandwritableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartition2 extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
int partition;
String phone = text.toString();
String sub = phone.substring(0, 3);
//分支判断
if ("136".equals(sub)) {
partition = 0;
}else if ("137".equals(sub)) {
partition = 1;
}else if ("138".equals(sub)) {
partition = 2;
}else if ("139".equals(sub)) {
partition = 3;
}else {
partition = 4;
}
return partition;
}
}
Mapp类,这里数据的输入输出类型。
package com.yangmin.mapreduce.partitionandwritableComparable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean,Text> {
private FlowBean outk = new FlowBean();
private Text outv = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行数据
String s = value.toString();
//切割数据
String[] split = s.split("\t");
//封装outk outv
outk.setUpFlow(Long.parseLong(split[1]));
outk.setDownFlow(Long.parseLong(split[2]));
outk.setSumFlow();
outv.set(split[0]);
//写出outk outv
context.write(outk,outv);
}
}
reducer类,输入是map的输出,输出的时候需要将key 与value调转过来。输出到文件的格式才是: 电话? 上行总流量 下行总流量 全部总流量。
package com.yangmin.mapreduce.partitionandwritableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<FlowBean,Text,Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//遍历values集合,循环写出,避免总流量相同的情况
for (Text value : values) {
//调整k,v位置,反向写出
context.write(value, key);
}
}
}
Driver类:关联分区、设置reduceTask的个数
package com.yangmin.mapreduce.partitionandwritableComparable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//关联map和reduce
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//设置map段输出kv
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//设置程序最终输出kv值
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//关联分区
job.setPartitionerClass(ProvincePartition2.class);
//设置reducetask的个数
job.setNumReduceTasks(5);
//设置程序的输入输出路径
FileInputFormat.setInputPaths(job,new Path("C:\\ZProject\\bigdata\\output\\output_writable"));
FileOutputFormat.setOutputPath(job,new Path("C:\\ZProject\\bigdata\\output\\output_writable_comparable_partition"));
//提交job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
3.结果展示
|