学习目标:
掌握MapReduce单词统计原理
学习内容:
-
文字描述
-
读取的数据为
-
hello,word,kafka,mapreduce,hadoop
hello,word,kafka,mapreduce,hadoop
hello,word,kafka,mapreduce,hadoop
hello,word,kafka,mapreduce,hadoop
hello,word,kafka,mapreduce,hadoop
-
1.首先利用InputFormat抽象类的子类TextOutputFormat从文件中读取数据
- TextOutputFormat,会一行一行的读取数据
- 读后的数据是一个一个的键值对形式
- <k1,v1>
- <第1行的偏移量,第1行>
- <0,“hello,word,kafka,mapreduce,hadoop”>
- <第2行的偏移量,第2行>,假设第二行的偏移量为23
- <23,“hello,word,kafka,mapreduce,hadoop”>
- 。。。
-
2.map阶段将每个<k1,v1>进行处理,map阶段需要编写java程序,实现处理的逻辑
-
3.shuffle阶段,会把<k2,v2> ==>新<k2,v2>,
-
shuffle阶段有四个阶段:分区、排序、规约、分组,这里不对shuflle阶段编写逻辑,使用默认逻辑(默认逻辑不需要编写代码),默认逻辑处理后的<k2,v2>
-
<k2,v2> -
<hello,<1,1,1,1>> -
<word,<1,1,1,1>> -
。。。 -
把相同key的value保存到同一集合<1,1,1,1> -
4.reduce阶段,把<k2,v2>转换为<k3,v3>,reduce阶段也是需要编写代码把每个键值的values集合<1,1,1,1>变为4,key不变
-
<k3,v3> -
<hello,4> -
<word,4> -
。。。 -
代码逻辑
-
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
context.write(key, new LongWritable(count));
}
}
-
5.输出为文本文件,需要OutputFormat的子类TextOutputFormat -
完整代码(含注释)
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class WordCount {
public static class MyMapper extends Mapper<LongWritable,Text, Text,LongWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split(",");
for (String split : splits) {
context.write(new Text(split),new LongWritable(1));
}
}
}
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
context.write(key, new LongWritable(count));
}
}
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "WordCount");
job.setJarByClass(WordCount.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path("/wordcount"));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileSystem fileSystem = FileSystem.get(new Configuration());
Path path = new Path("/wordcount_out");
if (fileSystem.exists(path)){
fileSystem.delete(path, true);
}
TextOutputFormat.setOutputPath(job, path);
job.waitForCompletion(true);
}
}
|