package com.cn.demo_groupTopN;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 继承WritableComparator类,重写compare 方法 相同的订单ID认为相同
*/
public class MyGroupCompactor extends WritableComparator {
/**
* K1会根据这个规则判断是否需要合并
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean first = (OrderBean) a;
OrderBean second = (OrderBean) b;
return first.getOrder_id().compareTo(second.getOrder_id());
}
/**
* 设置分组类
*/
public MyGroupCompactor() {
super(OrderBean.class,true);
}
}
package com.cn.demo_groupTopN;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 自定义我们的订单类 实现序列化和排序方法
*/
public class OrderBean implements WritableComparable<OrderBean> {
private String order_id;
private Double price;
/**
* 定义排序方法,如果订单相同,用价格排序,不同没有排序的必要
*/
@Override
public int compareTo(OrderBean orderBean) {
//如果订单号相同比较价格,否则比较无意义
if (this.order_id.compareTo(orderBean.getOrder_id())==0) {
return this.price.compareTo(orderBean.getPrice());
}
return 0;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(order_id);
dataOutput.writeDouble(price);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.order_id = dataInput.readUTF();
this.price = dataInput.readDouble();
}
public String getOrder_id() {
return order_id;
}
public void setOrder_id(String order_id) {
this.order_id = order_id;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
/**
* 重写toString方便reduceTast输出
*/
@Override
public String toString() {
return this.order_id + "\t" + this.price;
}
}
package com.cn.demo_groupTopN;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* MAP端获取数据,并以订单类的形式存储,输出orderBean类,和订单价格
* 当输出时,K1合并会以MyGroupCompactor类的比较规则进行合并成K2
* V1会以类内的排序方法,组合进入集合V2
*/
public class MyGroupMap extends Mapper<LongWritable, Text, OrderBean, DoubleWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderBean, DoubleWritable>.Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split("\t");
OrderBean orderBean = new OrderBean();
orderBean.setOrder_id(splits[0]);
orderBean.setPrice(Double.parseDouble(splits[2]));
context.write(orderBean,new DoubleWritable(Double.parseDouble(splits[2])));
}
}
package com.cn.demo_groupTopN;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* reduce端如果 相同K1合并,K2以数组连接,此时数组内为相同订单的价格
*/
public class MyGroupReduce extends Reducer<OrderBean, DoubleWritable,OrderBean,DoubleWritable> {
@Override
protected void reduce(OrderBean key, Iterable<DoubleWritable> values, Reducer<OrderBean, DoubleWritable, OrderBean, DoubleWritable>.Context context) throws IOException, InterruptedException {
/**
* 获取TopN只需要写循环几次即可
*/
int i = 0;
for (DoubleWritable value: values) {
i++;
if(i<=2){
context.write(key,value);
}else {
break;
}
}
}
}
package com.cn.demo_groupTopN;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 自定义分区:如果订单号相同分到同一个reduce里面
*/
public class MyPartion extends Partitioner<OrderBean, DoubleWritable> {
@Override
public int getPartition(OrderBean orderBean, DoubleWritable doubleWritable, int i) {
/**
* 此处的与&是位运算,返回一个二进制数,再除以分区数取余,得到应该去的分区位置
*/
return (orderBean.getOrder_id().hashCode() & Integer.MAX_VALUE)%i;
}
}
package com.cn.demo_groupTopN;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyGroupMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(),"group_demo");
job.setJarByClass(MyGroupMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\dsj\\baishi课件\\hadoop\\5、大数据离线第五天\\5、大数据离线第五天\\自定义groupingComparator\\input"));
job.setMapperClass(MyGroupMap.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(DoubleWritable.class);
//设置分区类
job.setPartitionerClass(MyPartion.class);
//设置分区数量
job.setNumReduceTasks(2);
//设置分组类
job.setGroupingComparatorClass(MyGroupCompactor.class);
//设置reduce类
job.setReducerClass(MyGroupReduce.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(DoubleWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\dsj\\baishi课件\\hadoop\\5、大数据离线第五天\\5、大数据离线第五天\\自定义groupingComparator\\output_TOPN"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(),new MyGroupMain(),args);
System.exit(run);
}
}
|