Hadoop-MapReduce左外连接(通用型)
来源篇
书名
数据算法-Hadoop/Spark大数据处理技巧
作者
Mahmoud Parsian著;
苏金国 杨健康两位老师翻译的一本书
序号
书中96页的左外连接介绍
原理篇
两张表T1和T2,T1与T2在连接键Key的作用下,相交的部分包含两张表的完整属性,不相交的部分,包含T1的所有属性,T2的属性则全为Null值。
此处左表为用户表
数据源
用户表(用户Id,交易地址)
u9242,CA
u4671,UT
u4195,UT
u3836,GA
u7507,CA
u3177,UT
u6598,CA
u2283,CA
u4815,CA
u7403,GA
u5967,GA
u3028,CA
u6277,CA
u6791,CA
交易表(交易id,商品id,用户id,商品数量,单价)
t8414,p5885,u5830,4,147
t5213,p9339,u5670,3,274
t7187,p8204,u8341,8,202
t1039,p1637,u8589,7,52
t861,p3175,u2609,0,342
t5492,p8767,u8417,2,330
t4467,p3445,u1200,0,253
t89,p7055,u8402,2,61
t6323,p9525,u2155,8,33
t3655,p9556,u3555,8,280
t6377,p9949,u7538,1,209
t766,p2543,u7881,9,14
t5667,p5268,u8852,7,286
关键实现点
MapReduce的规约器是根据Key的compareTo方法进行排序和合并操作**(等于0时)**
故本次实现定义了JoinKeyWritable来替换书中的PairOfString**(主要是找不到)**
这次Join时的key为userId,故JoinKey的compareTo方法应该在userId相等时传递到同一个规约器的value迭代器中(Iterable values)
左外连接目标
用户表左外连接交易表
得到所有用户交易的商品以及用户的地址
实现类
类名 | 类描述 |
---|
UserMapper | 用户信息映射类 | TransactionMapper | 交易信息映射类 | ProductLocationReducer | 用户信息和交易信息规约类 | LeftOuterJoin | 功能聚合类 | LeftOuterJoinTest | 测试类 | JoinKeyWritable | 映射阶段输出自定义类 |
源码篇
文件生成类
模拟生成user_id和transaction_id两个文件,方便模拟
import lombok.Data;
import org.junit.Test;
import java.io.*;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class LeftOuterJoinGenerate {
@Data
public static class UserInfo {
private String userId;
private String locationId;
public static List<String> getLocationList() {
return Stream.of("UT", "GA", "CA").collect(Collectors.toList());
}
@Override
public String toString() {
return String.join(",", userId, locationId);
}
}
@Data
public static class TransactionInfo {
private String transactionId;
private String productId;
private String userId;
private int quantity;
private int amount;
@Override
public String toString() {
return String.format("%s,%s,%s,%s,%s", transactionId, productId, userId, quantity, amount);
}
}
@Test
public void generateUserInfoList() throws IOException {
File userInfoFile = new File("D:\\Destop\\hadooplearn\\src\\data\\user_info");
FileOutputStream fileOutputStream = new FileOutputStream(userInfoFile);
System.out.println(userInfoFile.getAbsolutePath());
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
int userNum = 10000;
for (int i = 0; i < userNum; i++) {
UserInfo userInfo = new UserInfo();
userInfo.setUserId(String.format("u%s", (int) (Math.random() * userNum)));
List<String> locationList = UserInfo.getLocationList();
userInfo.setLocationId(locationList.get((int) (Math.random() * locationList.size())));
bufferedOutputStream.write(String.format("%s\n", userInfo.toString()).getBytes());
}
bufferedOutputStream.close();
fileOutputStream.close();
}
@Test
public void generateTransaction() throws IOException {
File userInfoFile = new File("D:\\Destop\\hadooplearn\\src\\data\\transaction_info");
FileOutputStream fileOutputStream = new FileOutputStream(userInfoFile);
System.out.println(userInfoFile.getAbsolutePath());
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
int transactionNum = 10000;
int userNum = 10000;
int productNum = 10000;
for (int i = 0; i < transactionNum; i++) {
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTransactionId(String.format("t%s", (int) (Math.random() * transactionNum)));
transactionInfo.setProductId(String.format("p%s", (int) (Math.random() * productNum)));
transactionInfo.setUserId(String.format("u%s", (int) (Math.random() * userNum)));
transactionInfo.setQuantity((int) (Math.random() * 10));
transactionInfo.setAmount((int) (Math.random() * 400));
bufferedOutputStream.write(String.format("%s\n", transactionInfo.toString()).getBytes());
}
bufferedOutputStream.close();
fileOutputStream.close();
}
}
用户信息映射器类(内部类)
public static class UserMapper extends Mapper<LongWritable, Text, JoinKeyWritable, JoinKeyWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String inputString = value.toString();
String[] split = inputString.split(",");
String userId = split[0];
String location = split[1];
JoinKeyWritable outputKey = new JoinKeyWritable();
outputKey.setName(userId);
outputKey.setValue("1");
JoinKeyWritable outputValue = new JoinKeyWritable();
outputValue.setName("location");
outputValue.setValue(location);
context.write(outputKey, outputValue);
}
}
交易信息映射器类(内部类)
public static class TransactionMapper extends Mapper<LongWritable, Text, JoinKeyWritable, JoinKeyWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String inputString = value.toString();
String[] split = inputString.split(",");
String product = split[1];
String userId = split[2];
JoinKeyWritable outputKey = new JoinKeyWritable();
outputKey.setName(userId);
outputKey.setValue("2");
JoinKeyWritable outputValue = new JoinKeyWritable();
outputValue.setName("product");
outputValue.setValue(product);
context.write(outputKey, outputValue);
}
}
用户信息和交易信息规约连接类(内部类)
public static class ProductLocationReducer extends Reducer<JoinKeyWritable, JoinKeyWritable, Text, Text> {
@Override
protected void reduce(JoinKeyWritable key, Iterable<JoinKeyWritable> values, Context context) throws IOException, InterruptedException {
ArrayList<String> locationList = new ArrayList<>();
boolean hasProduct = false;
for (JoinKeyWritable keyWritable : values) {
String name = keyWritable.getName();
if ("location".equals(name)) {
locationList.add(keyWritable.getValue());
} else {
hasProduct = true;
for (String aLocationList : locationList) {
context.write(new Text(aLocationList), new Text(keyWritable.getValue()));
}
}
}
if (!hasProduct) {
for (String aLocationList : locationList) {
context.write(new Text(aLocationList), new Text("null"));
}
}
}
}
左连接MapReduce流程驱动方法
public void LeftJoinDriver(String userPath, String transactionPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf, "LeftJoinDriver");
Path userFilePath = new Path(userPath);
Path transactionFilePath = new Path(transactionPath);
MultipleInputs.addInputPath(job, userFilePath, TextInputFormat.class, UserMapper.class);
MultipleInputs.addInputPath(job, transactionFilePath, TextInputFormat.class, TransactionMapper.class);
job.setMapOutputKeyClass(JoinKeyWritable.class);
job.setMapOutputValueClass(JoinKeyWritable.class);
job.setReducerClass(ProductLocationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
中间输出键类
import lombok.Data;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@Data
public class JoinKeyWritable implements WritableComparable<JoinKeyWritable> {
private String name;
private String value;
@Override
public int compareTo(JoinKeyWritable o) {
return this.name.compareTo(o.name);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeUTF(value);
}
@Override
public void readFields(DataInput in) throws IOException {
this.name = in.readUTF();
this.value = in.readUTF();
}
}
功能聚合类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import writable.JoinKeyWritable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
public class LeftOuterJoin {
public static class UserMapper extends Mapper<LongWritable, Text, JoinKeyWritable, JoinKeyWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String inputString = value.toString();
String[] split = inputString.split(",");
String userId = split[0];
String location = split[1];
JoinKeyWritable outputKey = new JoinKeyWritable();
outputKey.setName(userId);
outputKey.setValue("1");
JoinKeyWritable outputValue = new JoinKeyWritable();
outputValue.setName("location");
outputValue.setValue(location);
context.write(outputKey, outputValue);
}
}
public static class TransactionMapper extends Mapper<LongWritable, Text, JoinKeyWritable, JoinKeyWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String inputString = value.toString();
String[] split = inputString.split(",");
String product = split[1];
String userId = split[2];
JoinKeyWritable outputKey = new JoinKeyWritable();
outputKey.setName(userId);
outputKey.setValue("2");
JoinKeyWritable outputValue = new JoinKeyWritable();
outputValue.setName("product");
outputValue.setValue(product);
context.write(outputKey, outputValue);
}
}
public static class ProductLocationReducer extends Reducer<JoinKeyWritable, JoinKeyWritable, Text, Text> {
@Override
protected void reduce(JoinKeyWritable key, Iterable<JoinKeyWritable> values, Context context) throws IOException, InterruptedException {
ArrayList<String> locationList = new ArrayList<>();
boolean hasProduct = false;
for (JoinKeyWritable keyWritable : values) {
String name = keyWritable.getName();
if ("location".equals(name)) {
locationList.add(keyWritable.getValue());
} else {
hasProduct = true;
for (String aLocationList : locationList) {
context.write(new Text(aLocationList), new Text(keyWritable.getValue()));
}
}
}
if (!hasProduct) {
for (String aLocationList : locationList) {
context.write(new Text(aLocationList), new Text("null"));
}
}
}
}
public static class LocationCountMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String inputString = value.toString();
String[] split = inputString.split("\t");
String location = split[0];
String product = split[1];
context.write(new Text(product), new Text(location));
}
}
public static class LocationCountReducer extends Reducer<Text, Text, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
HashSet<Text> locationSet = new HashSet<>();
for (Text location : values) {
locationSet.add(location);
}
context.write(key, new IntWritable(locationSet.size()));
}
}
public void LeftJoinDriver(String userPath, String transactionPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf, "LeftJoinDriver");
Path userFilePath = new Path(userPath);
Path transactionFilePath = new Path(transactionPath);
MultipleInputs.addInputPath(job, userFilePath, TextInputFormat.class, UserMapper.class);
MultipleInputs.addInputPath(job, transactionFilePath, TextInputFormat.class, TransactionMapper.class);
job.setMapOutputKeyClass(JoinKeyWritable.class);
job.setMapOutputValueClass(JoinKeyWritable.class);
job.setReducerClass(ProductLocationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
public void LocationCountDriver(String inputPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf, "LocationCountDriver");
job.setMapperClass(LocationCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setReducerClass(LocationCountReducer.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
}
测试启动类(注意hdfs的命名空间等环境区别)
import org.junit.Test;
import java.io.IOException;
public class LeftOuterJoinTest {
@Test
public void testLeftOuterJoin() throws InterruptedException, IOException, ClassNotFoundException {
LeftOuterJoin leftOuterJoin = new LeftOuterJoin();
leftOuterJoin.LeftJoinDriver("D:\\Destop\\hadooplearn\\src\\data\\user_info",
"D:\\Destop\\hadooplearn\\src\\data\\transaction_info",
"hdfs://spark01:9000/test/leftOuterJoinResult");
}
@Test
public void testLocationCountDriver() throws InterruptedException, IOException, ClassNotFoundException {
LeftOuterJoin leftOuterJoin = new LeftOuterJoin();
leftOuterJoin.LocationCountDriver("hdfs://spark01:9000/test/leftOuterJoinResult",
"hdfs://spark01:9000/test/locationCountResult");
}
}
运行截图
用户信息数据
交易信息数据
运行成功数据
总结
总体来说,左连接的实现还是特别复杂的。而且本次实现在Spark的Join策略中属于时间复杂度最高的NPJ方式(双重for循环实现)。
如果有厉害的XD可以挑战一下MapReduce版本的**SMJ(排序双指针)和HJ(哈希)**方式。
谢谢各位,奈何拙笔水平有限,有问题望各位指点迷津。
|