6.3 MapReduce案例集锦
6.3.1 数据去重
复习SQL:distinct去掉重复的数据,作用于后面所有的列,只要组合起来的数据不一样就可以
一个列:
select job from emp;
select distinct job from emp;
多个列:
select distinct deptno, job from emp;
案例:使用 MapReduce 实现 distinct 对一个列的去重
DistinctMapper.java
public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
String data = value1.toString();
String[] words = data.split(",");
context.write(new Text(words[2]),NullWritable.get());
}
}
DistinctReducer.java
public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key3, Iterable<NullWritable> values3, Context context) throws IOException, InterruptedException {
context.write(key3, NullWritable.get());
}
}
DistinctMain.java
public class DistinctMain {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(DistinctMain.class);
job.setMapperClass(DistinctMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(DistinctReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
6.3.2 多表查询:等值连接
复习多表查询:关系型数据库中的多表查询(子查询:在 Oracle 中,绝大多部分的组查询都是转换成多表查询来执行)
- 笛卡尔积:列数相加,行数相乘,如果不设置条件,查询出来的结果是笛卡尔积全集(列数相乘)
- 根据连接条件的不同
案例:等值连接实现下面的SQL语句
select ename, dname from emp, dept where emp.deptno = dept.deptno;
EqualJoinMapper.java
public class EqualJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
String data = value1.toString();
String[] words = data.split(",");
if (words.length == 3){
context.write(new IntWritable(Integer.parseInt(words[0])), new Text("*"+words[1]));
}else {
context.write(new IntWritable(Integer.parseInt(words[7])), new Text(words[1]));
}
}
}
EqualReducer.java
public class EqualReducer extends Reducer<IntWritable, Text ,Text, Text> {
@Override
protected void reduce(IntWritable key3, Iterable<Text> values3, Context context) throws IOException, InterruptedException {
String dname = "";
String empNameList = "";
for (Text value:values3) {
String str = value.toString();
int index = str.indexOf("*");
if (index >= 0){
dname = str.substring(1);
}else {
empNameList = str + ";" + empNameList;
}
}
context.write(new Text(dname), new Text(empNameList));
}
}
EqualJoinMain.java
public class EqualJoinMain {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(EqualJoinMain.class);
job.setMapperClass(EqualJoinMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(EqualReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
6.3.3 多表查询:自连接
自连接就是一张表的连接操作
举例:查询员工信息,要求显示:员工老版的名字 员工的名字
select b.ename, e.ename
from emp b, emp e
where b.empno = e.mgr;
在Oracle中,当查询的数据满足是一棵树的时候,可以使用层次查询来取代自连接
SelfJoinMapper.java
public class SelfJoinMapper extends Mapper<LongWritable, Text, IntWritable,Text> {
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
String data = value1.toString();
String[] words = data.split(",");
context.write(new IntWritable(Integer.parseInt(words[0])), new Text("*" + words[1]));
context.write(new IntWritable(Integer.parseInt(words[3])), new Text(words[1]));
}
}
SelfJoinReduce.java
public class SelfJoinReduce extends Reducer<IntWritable, Text, Text, Text> {
@Override
protected void reduce(IntWritable key3, Iterable<Text> values3, Context context) throws IOException, InterruptedException {
String bossName = "";
String empNameList = "";
for (Text t : values3){
String str = t.toString();
int index = str.indexOf("*");
if (index >= 0){
bossName = str.substring(1);
}else {
empNameList = str + ";" + empNameList;
}
}
if (bossName.length() > 0 && empNameList.length() > 0) {
context.write(new Text(bossName), new Text(empNameList));
}
}
}
SelfJoinMain.java
public class SelfJoinMain {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(SelfJoinMain.class);
job.setMapperClass(SelfJoinMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(SelfJoinReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
6.3.4 倒排索引
6.3.5使用单元测试
|