1.Mapper阶段
 每一个<k,v>调用一次,也即每一行调用一次
2.Reducer阶段

3.Driver阶段

WordCount案例实操
1.需求
给定数据,统计每个单词出现的次数
2.输入
word.txt文件 
3.输出
单词 数量 单数 数量 …
分析:按照Mapreduce编程规范
Mapper 1.1.将MapTask传给我们的文本内容转换成String 1.2.根据空格将每一行都切分为单词 1.3将单词输出为<单词,1>Reducer 2.1.汇总各个key的个数 2.2输出该key的总次数Driver 3.1.获取配置信息,获取job对象实例 3.2.指定本程序的jar包所在的本地路径 3.3.关联Mapper/Reducer业务类 5.4.指定Mapper输出数据的key类型 5.5.指定最终输出的数据的key类型 5.6.指定job的输入原数文件所在目录 5.7.指定job的输出结果所在目录 5.8.提交作业
1.查看Mapper源码,重写map()
可以看到run方法,有一个while循环,每次读取一个数据之后执行map方法:  所以map每次处理一行数据,map的参数就是每一个数据的key,value和context
重写map方法:
public static class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
StringTokenizer tonken = new StringTokenizer(line);
while(tonken.hasMoreElements()) {
word.set(tonken.nextToken());
context.write(word, one);
}
}
}
}
2.查看Reducer源码,重写reduce()
同样看run方法,也是每次处理一批数据,但不是一行,是key值相同的一批数据,因为map的时候,会执行shuffle操作,key值相同的数据会挨在一起  重写reduce方法:
public static class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable v=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable value: values) {
sum+=value.get();
}
v.set(sum);
context.write(key, v);
}
}
}
3.定义Driver
直接在main方法里定义即可
public static void main(String []args ) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://hadoop1:9000");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount.class);
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("/user/pp/input/word.txt"));
FileOutputFormat.setOutputPath(job, new Path("/user/pp/output/wordCount"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

|