0. 项目结构
训练数据
phone address name consum
13877779999 bj zs 2145
13766668888 sh ls 1028
13766668888 sh ls 9987
13877779999 bj zs 5678
13544445555 sz ww 10577
13877779999 sh zs 2145
13766668888 sh ls 9987
13877779999 bj zs 2184
13766668888 sh ls 1524
13766668888 sh ls 9844
13877779999 bj zs 6554
13544445555 sz ww 10584
13877779999 sh zs 21454
13766668888 sh ls 99747
目标:统计不同地方的购物金额总和,在shuffle阶段将数据按地区进行分区。
1. FlowBean
package hadoop_test.partition_test_06.domain;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {
private String phone;
private String addr;
private String name;
private int consum;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phone);
out.writeUTF(addr);
out.writeUTF(name);
out.writeInt(consum);
}
@Override
public void readFields(DataInput in) throws IOException {
this.phone=in.readUTF();
this.addr=in.readUTF();
this.name=in.readUTF();
this.consum=in.readInt();
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getFlow() {
return consum;
}
public void setFlow(int flow) {
this.consum = flow;
}
@Override
public String toString() {
return "FlowBean [phone=" + phone + ", addr=" + addr + ", name=" + name + ", consum=" + consum + "]";
}
}
2. FlowDriver
package hadoop_test.partition_test_06.flow;
import hadoop_test.Utils_hadoop;
import hadoop_test.partition_test_06.domain.FlowBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowDriver {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
job.setNumReduceTasks(3);
job.setPartitionerClass(FlowPartitioner.class);
FileInputFormat.setInputPaths(job,new Path("/hadoop_test/avro/avro.txt"));
FileOutputFormat.setOutputPath(job,new Path("/hadoop_test/avro/result"));
job.waitForCompletion(true);
}
}
3. FlowMapper
package hadoop_test.partition_test_06.flow;
import hadoop_test.partition_test_06.domain.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line=value.toString();
FlowBean flowBean=new FlowBean();
flowBean.setPhone(line.split(" ")[0]);
flowBean.setAddr(line.split(" ")[1]);
flowBean.setName(line.split(" ")[2]);
flowBean.setFlow(Integer.parseInt(line.split(" ")[3]));
context.write(new Text(flowBean.getName()), flowBean);
}
}
4. FlowPartitioner
package hadoop_test.partition_test_06.flow;
import hadoop_test.partition_test_06.domain.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.Random;
public class FlowPartitioner extends Partitioner<Text,FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
if(value.getAddr().equals("sh")){
return 0;
}
if(value.getAddr().equals("bj")){
return 1;
}
else{
return 2;
}
}
}
Partition
分区主要的作用就是将相同的数据发送到同一个reduceTask里面去,从而将不同分区的Key交由不同的Reduce处理。在MapReduce中有一个抽象类叫做Partitioner,默认使用的实现类是HashPartitioner (按照key的hashCode % reduceTask 数量 = 分区号,默认reduce Task = 1)。
(1)配置Partition
设置相应的数量的ReduceTask
job.setNumReduceTasks(reduceTask数量);
- 如果ReduceTask的数量 > getPartition的数量,则会多产生几个空的输出文件
part-r-000xx 。 - 如果ReduceTask的数量 < getPartition的数量,则有一部分
数据会丢失,会抛出异常 。 - 如果ReduceTask的数量 1 ,则不管MapTask输出多少个分区文件,最终都交给一个ReduceTask,
最终也就只会产生一个结果文件 。 分区号必须从零开始,逐一累加。
指定自定义的Partitioner
job.setPartitionerClass(实现Partition类的名字.class);
自定义的类,要捆绑到job中,否则job还会走默认的分区代码块,使用HashPartitioner实现。
(2)Partitioner类
所有partitioner都继承自抽象类Partitioner ,实现getPartition(KEY var1, VALUE var2, intvar3) 方法
package org.apache.hadoop.mapreduce;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@Public
@Stable
public abstract class Partitioner<KEY, VALUE> {
public Partitioner() {
}
public abstract int getPartition(KEY var1, VALUE var2, int var3);
}
hadoop自带Partitioner有: HashPartitioner类
package org.apache.hadoop.mapreduce.lib.partition;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.mapreduce.Partitioner;
@Public
@Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public HashPartitioner() {
}
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & 2147483647) % numReduceTasks;
}
}
其中2147483647为32位的最大正整数01111111111111111111111111111111
参考资料: Hadoop MapReduce Shuffle机制之Partition分区 | 及分区案例实操 Hadoop自定义分区 MapReduce 进阶:Partitioner 组件
5. FlowReducer
package hadoop_test.partition_test_06.flow;
import hadoop_test.partition_test_06.domain.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text, FlowBean,Text,FlowBean> {
@Override
protected void reduce(Text name, Iterable<FlowBean> values,
Context context)
throws IOException, InterruptedException {
FlowBean tmp=new FlowBean();
for(FlowBean flowbean:values){
// flowbean [phobe=13766668888,add=sh,name=ls,consum=9844]
tmp.setAddr(flowbean.getAddr());
tmp.setPhone(flowbean.getPhone());
tmp.setName(flowbean.getName());
// tmp.getComsum(初始化是0)+flowbean.getConsum()[9844]
tmp.setFlow(tmp.getFlow()+flowbean.getFlow());
// 在第一轮 tmp.Consum = tmp.getConsum()=[9844]
// 在第二轮时,FlowBean [phobe=13766668888,add=sh,name=ls,consum=1000]
}
context.write(name, tmp);
}
}
输出结果
|