IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Day40_Hadoop之MapReduce(三) -> 正文阅读

[大数据]Day40_Hadoop之MapReduce(三)

七、MapReduce经典案例

(一)好友推荐案例

1、需求

推荐好友的好友,比如给hadoop推荐cat、hello、mr。

(需求实际就是获取非好友的两个人有多少共同好友)

2、数据准备

双向好友关系

tom:hello hadoop cat
world:hadoop hello hive
cat:tom hive
mr:hive hello
hive:cat hadoop world hello mr
Hadoop:tom hive world
Hello:tom world hive mr

?3、思路

推荐者与被推荐者一定有一个或多个相同的好友,转变为找共同好友,但是,两人不能是直接好友,例如,针对第一行,可以给hello推荐hadoop,也可以给hadoop推荐hello,但是,两者不能为直接好友才可以,例如,hadoop跟world有共同好友hive,此时,是不能给他们互相推荐的,因为hadoop跟word已经是直接好友了。

?? 全局去寻找好友列表中两两关系,这种两两关系体现出来的他们是间接好友,并且只要他们组建了,就证明他们是有公共好友的,若共同好友是tom,可以给它们互相推荐,例如,第一行中的hello:hadoop、hello:cat、hadoop:cat,但是如果他们是直接好友的话,就不能推荐了,例如第5行中hadoop与world体现出了是间接好友,他们有共同好友hive,可以给他们互相推荐,但是在第二行里面,world与hadoop体现出的是直接好友,因此就不能给他们互相推荐了。所以要从这里面剔除直接好友。那么,直接好友去哪里找呢?

?? 全局去寻找好友依次与好友的两两关系,这种关系体现出来的就是直接好友。就是每行第一个与剩余的每个好友依次组建的两两关系。例如,tom:hello、 tom:hadoop、tom:cat

??? 因此所有这些两两关系中,既有直接好友关系,也有间接好友关系;要从间接好友中,去除直接好友;

??? 统计两两关系出现次数,即他们共同好友的个数。

代码思路:

??? map:按好友列表输出两俩关系

??? reduce:sum两两关系

结果:

cat:hadoop	2
cat:hello	2
cat:mr	1
cat:world	1
hadoop:hello	3
hadoop:mr	1
hive:tom	3
mr:tom	1
mr:world	2
tom:world	2

4、代码实现

(1)自定义RecommendMapper


package com.bigdata.recommendfriends;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class RecommendMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1 读取每行数据  tom:hello hadoop cat
        String line = value.toString();
        // 2 挑出人,好友列表
        // tom
        String person = line.split(":")[0];
        // [hello,hadoop,cat]
        String[] friends = line.split(":")[1].split(" ");
        // 3 遍历好友列表  组装间接好友 <友:友,1> 组装直接好友<友:人,0>
        for(int i =0 ;i <= friends.length-1;i++){
            // hello
            String friend = friends[i];
            // 组装直接好友 <友:人,0> 直接好友用valueout的0表示
            context.write(new Text(getFd(person,friend)),new IntWritable(0));
            for(int j = i+1;j<=friends.length-1;j++){
                // 组装间接好友 <友:友,1> 间接好友用valueout的1表示
                // 有时候 cat:hadoop 有时候  hadoop:cat  但是业务需要 cat:hadoop
                context.write(new Text(getFd(friend,friends[j])),new IntWritable(1));
            }
        }
        // 4 将kv写出

    }
    public static String getFd(String a,String b){
        return a.compareTo(b) < 0?  a+":"+b:b+":"+a;
    }
}

(2)自定义RecommendReducer



package com.bigdata.recommendfriends;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

// keyout  是要推荐的人
// valueout  是他们之间认识的共同好友的数量
public class RecommendReduce extends Reducer<Text, IntWritable,Text,IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 同一个分组的数据有两大种可能性:一种是value含有0的,这种数据证明他们两人本来认识了,这种情况不能推荐
        // <hadoop:world,0>
        // <hadoop:world,1>
        // 另一种是value全是1的,这种数据证明他们两人不认识,需要做推荐
        // <hadoop:hello,1>
        // <hadoop:hello,1>
        // <hadoop:hello,1>
        // 循环每个分组的value,只要遇到value是0的情况,就停下来
        //定义共同好友的数量
        int sum = 0;
        for (IntWritable value : values) {
            int i = value.get();
            if(i == 0){
                return;
            }
            sum = sum+i;
        }
        context.write(key,new IntWritable(sum));

    }
}

?(3)自定义RecommendDriver



package com.bigdata.recommendfriends;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RecommendDriver {


    public static void main(String[] args) throws Exception {
        // 1 创建一个配置对象
        Configuration conf = new Configuration();
        // 2 通过配置对象创建一个job
        Job job = Job.getInstance(conf);
        // 3 设置job的mr的路径(jar包的位置)
        job.setJarByClass(RecommendDriver.class);
        // 4 设置job的mapper类  reduce类
        job.setMapperClass(RecommendMapper.class);
        job.setReducerClass(RecommendReduce.class);
        // 5 设置job的mapper类的keyout,valueout
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 6 设置job的最终输出的keyout,valueout
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 7 设置job的输入数据的路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        // 8 设置job的输出数据的路径 得保证,输出目录不能事先存在,否则报错,
        Path outPath = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(outPath)){
            fs.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job,outPath);

        // 9 提交job到yarn集群
        boolean b = job.waitForCompletion(true);
        System.out.println("是否运行成功:"+b);
    }

}



(二)数据清洗案例

????? 数据清洗:是指发现并纠正数据文件中可识别的错误的最后一道程序,包括检查数据一致性,处理无效值和缺失值等。与问卷审核不同,录入后的数据清理一般是由计算机而不是人工完成。

??? ETL:是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库。

1、需求

将日志按照空格分隔,去除每条日志中字段组成数组的长度小于等于11的日志

2、代码实现

(1)编写LogMapper


package com.bigdata.etl;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper<LongWritable,Text,Text,NullWritable>{

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1 读取每行数据   194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
        String line = value.toString();
        // 2 按照空格切分,查看长度,解析日志
        boolean flag = parseLog(line,context);
        if(!flag){
            return;
        }
        // 3 将合法的数据写出
        context.write(value,NullWritable.get());
    }

    private boolean parseLog(String line,Context context) {
        String[] split = line.split(" ");
        if(split.length >11){
            context.getCounter("logGroup","trueLogCounter").increment(1);
            return true;
        }else{
            context.getCounter("logGroup","falseLogCounter").increment(1);
            return false;
        }
    }
}


(2)编写LogDriver


package com.bigdata.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogDriver {

    public static void main(String[] args) throws Exception {
        // 1 创建一个配置对象
        Configuration conf = new Configuration();
        // 2 通过配置对象创建一个job
        Job job = Job.getInstance(conf);
        // 3 设置job的mr的路径(jar包的位置)
        job.setJarByClass(LogDriver.class);
        // 4 设置job的mapper类  reduce类
        job.setMapperClass(LogMapper.class);
        // 5 设置job的mapper类的keyout,valueout
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 6 设置job的最终输出的keyout,valueout
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 7 设置job的输入数据的路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        // 8 设置job的输出数据的路径 得保证,输出目录不能事先存在,否则报错,
        Path outPath = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(outPath)){
            fs.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job,outPath);

        // 因为数据清洗,不涉及数据的累加,因此就不需要reduce,这里设置reduce的数量为0
        job.setNumReduceTasks(0);
        // 9 提交job到yarn集群
        boolean b = job.waitForCompletion(true);
        System.out.println("是否运行成功:"+b);
    }

}


  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-02 11:26:45  更:2021-09-02 11:29:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 16:43:27-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码