超详细的MapReduce WordCount 统计微博评论最多的用户
使用fastjson解析每一行的json
List<Map<String,Object>> parses = (List<Map<String,Object>>) JSON.parse(value.toString());
提取userId
for (Map<String, Object> pars : parses) {
String new_value = (String) pars.get("userId");
context.write(new IntWritable(1),new Text(new_value));
}
Mapper完整代码
package anu.mapereduce;
import com.alibaba.fastjson.JSON;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class MainMapper extends Mapper<LongWritable, Text,IntWritable,Text >{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
List<Map<String,Object>> parses = (List<Map<String,Object>>) JSON.parse(value.toString());
for (Map<String, Object> pars : parses) {
String new_value = (String) pars.get("userId");
context.write(new IntWritable(1),new Text(new_value));
}
}
}
reduce查找每个用户的出现数量
Map<String,Integer> navs = new HashMap<>();
for (Text value : values) {
Integer integer = navs.get(value.toString());
if (integer == null){
navs.put(value.toString(),1);
}else {
navs.put(value.toString(),integer+1);
}
}
把所有用户的评论数量的信息做排序
List<String> llas = new ArrayList<>();
for (String keys_l : navs.keySet()) {
Integer is_v = 0;
String nname = "null";
Map<String,Integer> new_navs=new HashMap<>();
for (String keyaa : navs.keySet()) {
if (! llas.contains(keyaa)){
new_navs.put(keyaa,navs.get(keyaa));
}
}
for (String keys : new_navs.keySet()) {
if(new_navs.get(keys)>is_v){
is_v = new_navs.get(keys);
nname = keys;
}
}
llas.add(nname);
}
输出数据
for (String lla : llas) {
context.write(new Text(lla),new IntWritable(navs.get(lla)));
}
Reduce完整代码
package anu.mapereduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;
public class MainReduce extends Reducer< IntWritable,Text,Text, IntWritable> {
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Map<String,Integer> navs = new HashMap<>();
for (Text value : values) {
Integer integer = navs.get(value.toString());
if (integer == null){
navs.put(value.toString(),1);
}else {
navs.put(value.toString(),integer+1);
}
}
List<String> llas = new ArrayList<>();
for (String keys_l : navs.keySet()) {
Integer is_v = 0;
String nname = "null";
Map<String,Integer> new_navs=new HashMap<>();
for (String keyaa : navs.keySet()) {
if (! llas.contains(keyaa)){
new_navs.put(keyaa,navs.get(keyaa));
}
}
for (String keys : new_navs.keySet()) {
if(new_navs.get(keys)>is_v){
is_v = new_navs.get(keys);
nname = keys;
}
}
llas.add(nname);
}
for (String lla : llas) {
context.write(new Text(lla),new IntWritable(navs.get(lla)));
}
}
}
为了方便调试不依赖集群运行,使用本地运行,具体的方法可自己百度
WordCountRunner 启动类的完整代码
package anu.mapereduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class WordCountRunner{
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
System.setProperty("hadoop.home.dir","D:\\LocalServer\\hadoop-2.9.2");
Configuration configuration = new Configuration();
Job myWordCount = Job.getInstance(configuration, "MyWordCount");
myWordCount.setInputFormatClass(TextInputFormat.class);
myWordCount.setMapperClass(MainMapper.class);
myWordCount.setMapOutputKeyClass(IntWritable.class);
myWordCount.setMapOutputValueClass(Text.class);
myWordCount.setReducerClass(MainReduce.class);
myWordCount.setOutputKeyClass(Text.class);
myWordCount.setOutputValueClass(IntWritable.class);
myWordCount.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(myWordCount,
new Path("D:\\javaproject\\20210722_GOUP_11_GYC\\MapperReuceDemo01\\src\\main\\resources\\datas.json"));
FileOutputFormat.setOutputPath(myWordCount,
new Path("D:\\javaproject\\20210722_GOUP_11_GYC\\MapperReuceDemo01\\src\\main\\resources\\input"));
boolean b = myWordCount.waitForCompletion(true);
System.exit(b?0:1);
}
}
输出结果:
新手第一次写博客勿喷!
|