IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> MapReduce左外连接 -> 正文阅读

[大数据]MapReduce左外连接

Hadoop-MapReduce左外连接(通用型)

来源篇

书名

数据算法-Hadoop/Spark大数据处理技巧

作者

Mahmoud Parsian著;

苏金国 杨健康两位老师翻译的一本书

序号

书中96页的左外连接介绍

原理篇

两张表T1和T2,T1与T2在连接键Key的作用下,相交的部分包含两张表的完整属性,不相交的部分,包含T1的所有属性,T2的属性则全为Null值。

此处左表为用户表

数据源

用户表(用户Id,交易地址)

u9242,CA
u4671,UT
u4195,UT
u3836,GA
u7507,CA
u3177,UT
u6598,CA
u2283,CA
u4815,CA
u7403,GA
u5967,GA
u3028,CA
u6277,CA
u6791,CA

交易表(交易id,商品id,用户id,商品数量,单价)

t8414,p5885,u5830,4,147
t5213,p9339,u5670,3,274
t7187,p8204,u8341,8,202
t1039,p1637,u8589,7,52
t861,p3175,u2609,0,342
t5492,p8767,u8417,2,330
t4467,p3445,u1200,0,253
t89,p7055,u8402,2,61
t6323,p9525,u2155,8,33
t3655,p9556,u3555,8,280
t6377,p9949,u7538,1,209
t766,p2543,u7881,9,14
t5667,p5268,u8852,7,286

关键实现点

MapReduce的规约器是根据Key的compareTo方法进行排序和合并操作**(等于0时)**

故本次实现定义了JoinKeyWritable来替换书中的PairOfString**(主要是找不到)**

这次Join时的key为userId,故JoinKey的compareTo方法应该在userId相等时传递到同一个规约器的value迭代器中(Iterable values)

左外连接目标

用户表左外连接交易表

得到所有用户交易的商品以及用户的地址

实现类

类名类描述
UserMapper用户信息映射类
TransactionMapper交易信息映射类
ProductLocationReducer用户信息和交易信息规约类
LeftOuterJoin功能聚合类
LeftOuterJoinTest测试类
JoinKeyWritable映射阶段输出自定义类

源码篇

文件生成类

模拟生成user_id和transaction_id两个文件,方便模拟

import lombok.Data;
import org.junit.Test;

import java.io.*;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * 左连接两张表的生成
 * 1.用户表生成
 * 2.交易表生成
 */
public class LeftOuterJoinGenerate {
    @Data
    public static class UserInfo {
        /**
         * 用户标识
         */
        private String userId;
        /**
         * 坐标标识
         */
        private String locationId;

        public static List<String> getLocationList() {
            return Stream.of("UT", "GA", "CA").collect(Collectors.toList());
        }

        @Override
        public String toString() {
            return String.join(",", userId, locationId);
        }
    }

    @Data
    public static class TransactionInfo {
        /**
         * 交易标识
         */
        private String transactionId;
        /**
         * 产品标识
         */
        private String productId;
        /**
         * 用户标识
         */
        private String userId;
        /**
         * 数量
         */
        private int quantity;
        /**
         * 金额
         */
        private int amount;

        @Override
        public String toString() {
            return String.format("%s,%s,%s,%s,%s", transactionId, productId, userId, quantity, amount);
        }
    }

    @Test
    public void generateUserInfoList() throws IOException {
        File userInfoFile = new File("D:\\Destop\\hadooplearn\\src\\data\\user_info");
        FileOutputStream fileOutputStream = new FileOutputStream(userInfoFile);
        System.out.println(userInfoFile.getAbsolutePath());
        BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
        int userNum = 10000;
        for (int i = 0; i < userNum; i++) {
            UserInfo userInfo = new UserInfo();
            userInfo.setUserId(String.format("u%s", (int) (Math.random() * userNum)));
            List<String> locationList = UserInfo.getLocationList();
            userInfo.setLocationId(locationList.get((int) (Math.random() * locationList.size())));
            bufferedOutputStream.write(String.format("%s\n", userInfo.toString()).getBytes());
        }
        bufferedOutputStream.close();
        fileOutputStream.close();
    }

    @Test
    public void generateTransaction() throws IOException {
        File userInfoFile = new File("D:\\Destop\\hadooplearn\\src\\data\\transaction_info");
        FileOutputStream fileOutputStream = new FileOutputStream(userInfoFile);
        System.out.println(userInfoFile.getAbsolutePath());
        BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
        int transactionNum = 10000;
        int userNum = 10000;
        int productNum = 10000;
        for (int i = 0; i < transactionNum; i++) {
            TransactionInfo transactionInfo = new TransactionInfo();
            transactionInfo.setTransactionId(String.format("t%s", (int) (Math.random() * transactionNum)));
            transactionInfo.setProductId(String.format("p%s", (int) (Math.random() * productNum)));
            transactionInfo.setUserId(String.format("u%s", (int) (Math.random() * userNum)));
            transactionInfo.setQuantity((int) (Math.random() * 10));
            transactionInfo.setAmount((int) (Math.random() * 400));
            bufferedOutputStream.write(String.format("%s\n", transactionInfo.toString()).getBytes());
        }
        bufferedOutputStream.close();
        fileOutputStream.close();
    }
}

用户信息映射器类(内部类)

    /**
     * 用户信息映射器
     */
    public static class UserMapper extends Mapper<LongWritable, Text, JoinKeyWritable, JoinKeyWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String inputString = value.toString();
            String[] split = inputString.split(",");
            String userId = split[0];
            String location = split[1];
            JoinKeyWritable outputKey = new JoinKeyWritable();
            outputKey.setName(userId);
            outputKey.setValue("1");
            JoinKeyWritable outputValue = new JoinKeyWritable();
            outputValue.setName("location");
            outputValue.setValue(location);
            context.write(outputKey, outputValue);
        }
    }

交易信息映射器类(内部类)

/**
 * 交易信息映射器
 */
public static class TransactionMapper extends Mapper<LongWritable, Text, JoinKeyWritable, JoinKeyWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String inputString = value.toString();
        String[] split = inputString.split(",");
        String product = split[1];
        String userId = split[2];
        JoinKeyWritable outputKey = new JoinKeyWritable();
        outputKey.setName(userId);
        outputKey.setValue("2");
        JoinKeyWritable outputValue = new JoinKeyWritable();
        outputValue.setName("product");
        outputValue.setValue(product);
        context.write(outputKey, outputValue);
    }
}

用户信息和交易信息规约连接类(内部类)

/**
 * 商品地址信息映射器
 */
public static class ProductLocationReducer extends Reducer<JoinKeyWritable, JoinKeyWritable, Text, Text> {
    @Override
    protected void reduce(JoinKeyWritable key, Iterable<JoinKeyWritable> values, Context context) throws IOException, InterruptedException {
        ArrayList<String> locationList = new ArrayList<>();
        boolean hasProduct = false;
        for (JoinKeyWritable keyWritable : values) {
            String name = keyWritable.getName();
            if ("location".equals(name)) {
                locationList.add(keyWritable.getValue());
            } else {
                hasProduct = true;
                for (String aLocationList : locationList) {
                    context.write(new Text(aLocationList), new Text(keyWritable.getValue()));
                }
            }
        }
        // 如果没有商品,那么左外连接为空
        if (!hasProduct) {
            for (String aLocationList : locationList) {
                context.write(new Text(aLocationList), new Text("null"));
            }
        }
    }
}

左连接MapReduce流程驱动方法

/**
 * 左外连接驱动方法
 *
 * @param userPath
 * @param transactionPath
 * @param outputPath
 * @throws IOException
 * @throws ClassNotFoundException
 * @throws InterruptedException
 */
public void LeftJoinDriver(String userPath, String transactionPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "LeftJoinDriver");
    Path userFilePath = new Path(userPath);
    Path transactionFilePath = new Path(transactionPath);
    MultipleInputs.addInputPath(job, userFilePath, TextInputFormat.class, UserMapper.class);
    MultipleInputs.addInputPath(job, transactionFilePath, TextInputFormat.class, TransactionMapper.class);
    job.setMapOutputKeyClass(JoinKeyWritable.class);
    job.setMapOutputValueClass(JoinKeyWritable.class);
    job.setReducerClass(ProductLocationReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    job.waitForCompletion(true);
}

中间输出键类

import lombok.Data;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * MapReduce做连接时的中间存储
 *
 * @author ping
 */
@Data
public class JoinKeyWritable implements WritableComparable<JoinKeyWritable> {
    private String name;
    private String value;

    @Override
    public int compareTo(JoinKeyWritable o) {
        return this.name.compareTo(o.name);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeUTF(value);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.name = in.readUTF();
        this.value = in.readUTF();
    }
}

功能聚合类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import writable.JoinKeyWritable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;

/**
 * 左连接的经典mapReduce算法
 * userInfo表 left outer join transactionInfo表
 * 两个映射器,分别加载两张表的数据
 * 1.找出商品地址阶段,MultipleInput类可以使用多个映射器
 *
 * @author ping
 */
public class LeftOuterJoin {
    /**
     * 用户信息映射器
     */
    public static class UserMapper extends Mapper<LongWritable, Text, JoinKeyWritable, JoinKeyWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String inputString = value.toString();
            String[] split = inputString.split(",");
            String userId = split[0];
            String location = split[1];
            JoinKeyWritable outputKey = new JoinKeyWritable();
            outputKey.setName(userId);
            outputKey.setValue("1");
            JoinKeyWritable outputValue = new JoinKeyWritable();
            outputValue.setName("location");
            outputValue.setValue(location);
            context.write(outputKey, outputValue);
        }
    }

    /**
     * 交易信息映射器
     */
    public static class TransactionMapper extends Mapper<LongWritable, Text, JoinKeyWritable, JoinKeyWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String inputString = value.toString();
            String[] split = inputString.split(",");
            String product = split[1];
            String userId = split[2];
            JoinKeyWritable outputKey = new JoinKeyWritable();
            outputKey.setName(userId);
            outputKey.setValue("2");
            JoinKeyWritable outputValue = new JoinKeyWritable();
            outputValue.setName("product");
            outputValue.setValue(product);
            context.write(outputKey, outputValue);
        }
    }

    /**
     * 商品地址信息映射器
     */
    public static class ProductLocationReducer extends Reducer<JoinKeyWritable, JoinKeyWritable, Text, Text> {
        @Override
        protected void reduce(JoinKeyWritable key, Iterable<JoinKeyWritable> values, Context context) throws IOException, InterruptedException {
            ArrayList<String> locationList = new ArrayList<>();
            boolean hasProduct = false;
            for (JoinKeyWritable keyWritable : values) {
                String name = keyWritable.getName();
                if ("location".equals(name)) {
                    locationList.add(keyWritable.getValue());
                } else {
                    hasProduct = true;
                    for (String aLocationList : locationList) {
                        context.write(new Text(aLocationList), new Text(keyWritable.getValue()));
                    }
                }
            }
            // 如果没有商品,那么左外连接为空
            if (!hasProduct) {
                for (String aLocationList : locationList) {
                    context.write(new Text(aLocationList), new Text("null"));
                }
            }
        }
    }

    /**
     * 统计商品地址映射器
     */
    public static class LocationCountMapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String inputString = value.toString();
            String[] split = inputString.split("\t");
            String location = split[0];
            String product = split[1];
            context.write(new Text(product), new Text(location));
        }
    }

    /**
     * 统计商品规约器,不重复
     */
    public static class LocationCountReducer extends Reducer<Text, Text, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            HashSet<Text> locationSet = new HashSet<>();
            for (Text location : values) {
                locationSet.add(location);
            }
            context.write(key, new IntWritable(locationSet.size()));
        }
    }

    /**
     * 左外连接驱动方法
     *
     * @param userPath
     * @param transactionPath
     * @param outputPath
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public void LeftJoinDriver(String userPath, String transactionPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "LeftJoinDriver");
        Path userFilePath = new Path(userPath);
        Path transactionFilePath = new Path(transactionPath);
        MultipleInputs.addInputPath(job, userFilePath, TextInputFormat.class, UserMapper.class);
        MultipleInputs.addInputPath(job, transactionFilePath, TextInputFormat.class, TransactionMapper.class);
        job.setMapOutputKeyClass(JoinKeyWritable.class);
        job.setMapOutputValueClass(JoinKeyWritable.class);
        job.setReducerClass(ProductLocationReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.waitForCompletion(true);
    }

    public void LocationCountDriver(String inputPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "LocationCountDriver");
        job.setMapperClass(LocationCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setReducerClass(LocationCountReducer.class);
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.waitForCompletion(true);
    }
}

测试启动类(注意hdfs的命名空间等环境区别)

import org.junit.Test;

import java.io.IOException;

/**
 * 左连接测试类
 *
 * @author ping
 */
public class LeftOuterJoinTest {
    @Test
    public void testLeftOuterJoin() throws InterruptedException, IOException, ClassNotFoundException {
        LeftOuterJoin leftOuterJoin = new LeftOuterJoin();
        leftOuterJoin.LeftJoinDriver("D:\\Destop\\hadooplearn\\src\\data\\user_info",
                "D:\\Destop\\hadooplearn\\src\\data\\transaction_info",
                "hdfs://spark01:9000/test/leftOuterJoinResult");
    }

    @Test
    public void testLocationCountDriver() throws InterruptedException, IOException, ClassNotFoundException {
        LeftOuterJoin leftOuterJoin = new LeftOuterJoin();
        leftOuterJoin.LocationCountDriver("hdfs://spark01:9000/test/leftOuterJoinResult",
                "hdfs://spark01:9000/test/locationCountResult");
    }
}

运行截图

用户信息数据

在这里插入图片描述

交易信息数据

在这里插入图片描述

运行成功数据

在这里插入图片描述

总结

总体来说,左连接的实现还是特别复杂的。而且本次实现在Spark的Join策略中属于时间复杂度最高的NPJ方式(双重for循环实现)。

如果有厉害的XD可以挑战一下MapReduce版本的**SMJ(排序双指针)HJ(哈希)**方式。

谢谢各位,奈何拙笔水平有限,有问题望各位指点迷津。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-29 16:23:14  更:2021-11-29 16:24:55 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 13:44:32-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码