1. 本文讲讲Hadoop的mapreduce之分区Partitioner
1.1默认情况下MR输出文件个数
在默认情况下,不管map阶段有多少个并发执行task,到reduce阶段,所有的结果都将有一个reduce来处理,并且最终结果输出到一个文件中。
1.2 修改reducetask个数
在MapReduce程序的驱动类中,通过job提供的方法,可以修改reducetask的个数。 就可以得到六个分区
1.3 数据量
这个是等会用到的数据量可以下载供参考 链接:https://pan.baidu.com/s/1wByTj5jzKuQmQ9HpBWq00Q 提取码:rdcv
2. 程序代码
2.1 StatePartitioner程序
package com.niit.covid.MeiTuanstater;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.HashMap;
public class StatePartitioner extends Partitioner<Text,Text> {
public static HashMap<String,Integer> stateMap= new HashMap<String, Integer>();
static {
stateMap.put("广西壮族自治区",0);
stateMap.put("广东省",1);
stateMap.put("云南省",2);
stateMap.put("湖南省",3);
stateMap.put("贵州省",4);
}
@Override
public int getPartition(Text key, Text value, int i) {
Integer code = stateMap.get(key.toString());
if(code !=null ){
return code;
}else{
return 5;
}
}
}
2.2 mapper程序
package com.niit.covid.MeiTuanstater;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class StatePartitionMapper extends Mapper<LongWritable, Text,Text,Text> {
Text outKey = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] lines = value.toString().split(",");
String state = lines[0];
outKey.set(state);
context.write(outKey,value);
}
}
2.3 reduce程序
package com.niit.covid.MeiTuanstater;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class StatePartitionReducer extends Reducer<Text, Text,Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values){
context.write(value,NullWritable.get());
}
}
}
2.4 Driver程序
package com.niit.covid.MeiTuanstater;
import com.niit.covid.statePartition.StatePartitionMapper;
import com.niit.covid.statePartition.StatePartitionReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class StatePartitionDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, StatePartitionDriver.class.getSimpleName());
job.setJarByClass(StatePartitionDriver.class);
job.setMapperClass(StatePartitionMapper.class);
job.setReducerClass(StatePartitionReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(StatePartitioner.class);
job.setNumReduceTasks(6);
Path inputPath = new Path("input/covid/meituan");
Path outputPath = new Path("output/covid/meituanpartitoner");
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job,outputPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outputPath)){
fs.delete(outputPath,true);
}
boolean resultFlag = job.waitForCompletion(true);
System.exit(resultFlag ? 0: 1);
}
}
mapreduce运行结果
运行完成即可得出六个分区结果,程序将相同的省份分在一起,其他省份放在第六分区
至此apreduce分区就结束了,有疑问的到评论区讨论!😚😚😚
|