七、MapReduce经典案例
(一)好友推荐案例
1、需求
推荐好友的好友,比如给hadoop推荐cat、hello、mr。
(需求实际就是获取非好友的两个人有多少共同好友)
2、数据准备
双向好友关系
tom:hello hadoop cat
world:hadoop hello hive
cat:tom hive
mr:hive hello
hive:cat hadoop world hello mr
Hadoop:tom hive world
Hello:tom world hive mr
?3、思路
推荐者与被推荐者一定有一个或多个相同的好友,转变为找共同好友,但是,两人不能是直接好友,例如,针对第一行,可以给hello推荐hadoop,也可以给hadoop推荐hello,但是,两者不能为直接好友才可以,例如,hadoop跟world有共同好友hive,此时,是不能给他们互相推荐的,因为hadoop跟word已经是直接好友了。
?? 全局去寻找好友列表中两两关系,这种两两关系体现出来的他们是间接好友,并且只要他们组建了,就证明他们是有公共好友的,若共同好友是tom,可以给它们互相推荐,例如,第一行中的hello:hadoop、hello:cat、hadoop:cat,但是如果他们是直接好友的话,就不能推荐了,例如第5行中hadoop与world体现出了是间接好友,他们有共同好友hive,可以给他们互相推荐,但是在第二行里面,world与hadoop体现出的是直接好友,因此就不能给他们互相推荐了。所以要从这里面剔除直接好友。那么,直接好友去哪里找呢?
?? 全局去寻找好友依次与好友的两两关系,这种关系体现出来的就是直接好友。就是每行第一个与剩余的每个好友依次组建的两两关系。例如,tom:hello、 tom:hadoop、tom:cat
??? 因此所有这些两两关系中,既有直接好友关系,也有间接好友关系;要从间接好友中,去除直接好友;
??? 统计两两关系出现次数,即他们共同好友的个数。
代码思路:
??? map:按好友列表输出两俩关系
??? reduce:sum两两关系
结果:
cat:hadoop 2
cat:hello 2
cat:mr 1
cat:world 1
hadoop:hello 3
hadoop:mr 1
hive:tom 3
mr:tom 1
mr:world 2
tom:world 2
4、代码实现
(1)自定义RecommendMapper
package com.bigdata.recommendfriends;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class RecommendMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 读取每行数据 tom:hello hadoop cat
String line = value.toString();
// 2 挑出人,好友列表
// tom
String person = line.split(":")[0];
// [hello,hadoop,cat]
String[] friends = line.split(":")[1].split(" ");
// 3 遍历好友列表 组装间接好友 <友:友,1> 组装直接好友<友:人,0>
for(int i =0 ;i <= friends.length-1;i++){
// hello
String friend = friends[i];
// 组装直接好友 <友:人,0> 直接好友用valueout的0表示
context.write(new Text(getFd(person,friend)),new IntWritable(0));
for(int j = i+1;j<=friends.length-1;j++){
// 组装间接好友 <友:友,1> 间接好友用valueout的1表示
// 有时候 cat:hadoop 有时候 hadoop:cat 但是业务需要 cat:hadoop
context.write(new Text(getFd(friend,friends[j])),new IntWritable(1));
}
}
// 4 将kv写出
}
public static String getFd(String a,String b){
return a.compareTo(b) < 0? a+":"+b:b+":"+a;
}
}
(2)自定义RecommendReducer
package com.bigdata.recommendfriends;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
// keyout 是要推荐的人
// valueout 是他们之间认识的共同好友的数量
public class RecommendReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 同一个分组的数据有两大种可能性:一种是value含有0的,这种数据证明他们两人本来认识了,这种情况不能推荐
// <hadoop:world,0>
// <hadoop:world,1>
// 另一种是value全是1的,这种数据证明他们两人不认识,需要做推荐
// <hadoop:hello,1>
// <hadoop:hello,1>
// <hadoop:hello,1>
// 循环每个分组的value,只要遇到value是0的情况,就停下来
//定义共同好友的数量
int sum = 0;
for (IntWritable value : values) {
int i = value.get();
if(i == 0){
return;
}
sum = sum+i;
}
context.write(key,new IntWritable(sum));
}
}
?(3)自定义RecommendDriver
package com.bigdata.recommendfriends;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RecommendDriver {
public static void main(String[] args) throws Exception {
// 1 创建一个配置对象
Configuration conf = new Configuration();
// 2 通过配置对象创建一个job
Job job = Job.getInstance(conf);
// 3 设置job的mr的路径(jar包的位置)
job.setJarByClass(RecommendDriver.class);
// 4 设置job的mapper类 reduce类
job.setMapperClass(RecommendMapper.class);
job.setReducerClass(RecommendReduce.class);
// 5 设置job的mapper类的keyout,valueout
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 6 设置job的最终输出的keyout,valueout
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 7 设置job的输入数据的路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
// 8 设置job的输出数据的路径 得保证,输出目录不能事先存在,否则报错,
Path outPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)){
fs.delete(outPath,true);
}
FileOutputFormat.setOutputPath(job,outPath);
// 9 提交job到yarn集群
boolean b = job.waitForCompletion(true);
System.out.println("是否运行成功:"+b);
}
}
(二)数据清洗案例
????? 数据清洗:是指发现并纠正数据文件中可识别的错误的最后一道程序,包括检查数据一致性,处理无效值和缺失值等。与问卷审核不同,录入后的数据清理一般是由计算机而不是人工完成。
??? ETL:是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库。
1、需求
将日志按照空格分隔,去除每条日志中字段组成数组的长度小于等于11的日志
2、代码实现
(1)编写LogMapper
package com.bigdata.etl;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1 读取每行数据 194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
String line = value.toString();
// 2 按照空格切分,查看长度,解析日志
boolean flag = parseLog(line,context);
if(!flag){
return;
}
// 3 将合法的数据写出
context.write(value,NullWritable.get());
}
private boolean parseLog(String line,Context context) {
String[] split = line.split(" ");
if(split.length >11){
context.getCounter("logGroup","trueLogCounter").increment(1);
return true;
}else{
context.getCounter("logGroup","falseLogCounter").increment(1);
return false;
}
}
}
(2)编写LogDriver
package com.bigdata.etl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogDriver {
public static void main(String[] args) throws Exception {
// 1 创建一个配置对象
Configuration conf = new Configuration();
// 2 通过配置对象创建一个job
Job job = Job.getInstance(conf);
// 3 设置job的mr的路径(jar包的位置)
job.setJarByClass(LogDriver.class);
// 4 设置job的mapper类 reduce类
job.setMapperClass(LogMapper.class);
// 5 设置job的mapper类的keyout,valueout
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 6 设置job的最终输出的keyout,valueout
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 7 设置job的输入数据的路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
// 8 设置job的输出数据的路径 得保证,输出目录不能事先存在,否则报错,
Path outPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)){
fs.delete(outPath,true);
}
FileOutputFormat.setOutputPath(job,outPath);
// 因为数据清洗,不涉及数据的累加,因此就不需要reduce,这里设置reduce的数量为0
job.setNumReduceTasks(0);
// 9 提交job到yarn集群
boolean b = job.waitForCompletion(true);
System.out.println("是否运行成功:"+b);
}
}
|