IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Hadoop三大组件之MapReduce -> 正文阅读

[大数据]Hadoop三大组件之MapReduce

作者:token keyword

初试Hadoop之MapReduce

一、MapReduce的定义

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并行运行在一个Hadoop集群上。

二、MR的优缺点

优点:

  • 它是框架,易于编写

  • 良好的扩展性(通过对集群的机器的扩展,从而增加计算能力)

  • 高容错性(MR如果执行失败,它会重新分配任务)

  • 适合做海量数据的离线处理

缺点:

  • 不擅长实时计算,无法像MySql一样,在毫秒或者秒级内返回结果
  • 不擅长流式计算,因为输入的数据集时静态的
  • 不擅长DAG(有向无环图)计算,使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下

三、认识与识别MR

3.1常用数据序列化类型

老规矩,先和我一起认识认识MR里面的数据类型吧,与Java做对比吧!

Java类型Hadoop Writable类型
BooleanBooleanWritable
ByteByteWritable
IntIntWritable
FloatFloatWritable
LongLongWritable
DoubleDoubleWritable
StringText
MapMapWritable
ArrayArrayWritable
NullNullWritable

有细心的小伙伴可能发现了,这个类型,其实没那么难记,除了String类型外,就只剩下在后面加上一个Writable就可了。

3.2官方WordCount源码解析

  • 看源码之前可以尝试着带着几个问题去看:
    • 一个最基本的MR的程序是如何构成的?
    • 程序的运行流程是怎样的?

官方源码解析如下

package org.apache.hadoop.examples;

import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount
{
   /**
   * Mapper阶段
   * TokenizerMapper继承Mapper函数
   * Mapper<KEYIN(输入key类型), VALUEIN(输入value类型), KEYOUT(输出key类型), VALUEOUT(输出value类型)>
   */
  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
  {
    //当每一个新的单词出现后,就置成1,并且再将其作为一个<key,value>键值对的形式,因此可以作为常量值为1
    private static final IntWritable one = new IntWritable(1);//valueout
    private Text word = new Text();//keyout

    //重写map方法,读取初试划分的每一个键值对,即行偏移量和一行字符串,key为偏移量,value为该行字符串
    public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException
    {
       //StringTokenizer时Java中用于字符串的
       /**1. StringTokenizer(String str) :构造一个用来解析 str 的 StringTokenizer 对象。java 默认的分隔符是空格("")、制表符(\t)、换行符(\n)、回车符(\r)。
		  2. StringTokenizer(String str, String delim) :构造一个用来解析 str 的 StringTokenizer 对象,并提供一个指定的分隔符。
		  3. StringTokenizer(String str, String delim, boolean returnDelims) :构造一个用来解析 str 的 StringTokenizer 对象,并提供一个指定的分隔符,同时,指定是否返回分隔符。**/
      //当我们读取的时候,每一个value其实相当于一行数据,这里使用StringTokenizer进行分割
      StringTokenizer itr = new StringTokenizer(value.toString());
      //遍历我们分割的数据,读取每一个单词
      while (itr.hasMoreTokens()) {
        //获取每一个对呀的key值
        this.word.set(itr.nextToken());
        //one代表1,最开始每个单词都是1次,context直接将<word,1>写到本地磁盘上
        //write函数直接将两个参数封装成<key,value>并提交
        context.write(this.word, one);
      }
    }
  }
    
  /**
  * Reduce函数
  * IntSumReducer继承了Reducer函数
  * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>:Map的输出类型,就是Reduce的输入类型
  */
  public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
  {
    //result就是对结果的统计,统计输出次数
    private IntWritable result = new IntWritable();

    //重写reduce函数,key为单词,values是reducer从多个mapper中得到数据后进行排序并将相同key组
    //这里我们只需要关注的是map和reduce函数处理后的结果组成的<key,value>结果就可。
    public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
      throws IOException, InterruptedException
    {
      //初始化一个sum值作为累加器
      int sum = 0;
        /**
       * 因为在同一个spilt对应的mapper中,会将其进行combine,使得其中单词(key)不重复,
       * 然后将这些键对按照hash函数分配给对应的reducer,reducer进行排序,和组合成list,
       * 然后再调用的用户自定义的函数
       */
      for (IntWritable val : values) {
        //累加
        sum += val.get();
      }
      //设置输出的value值
      this.result.set(sum);
      //将reduce的结果提交
      context.write(key, this.result);
    }
  }
    
  /**主函数入口*/
  public static void main(String[] args)
    throws Exception
  {
    //声明配置对象
    Configuration conf = new Configuration();
    //GenericOptionsParser是hadoop框架中解析命令行参数的基本类。它能够辨别一些标准的命令行参数,能够使应用程序轻易地指定namenode,jobtracker,以及其他额外的配置资源
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    //如果你命令行的参数小于2的话就会出错,输入地址和输出地址至于为啥不设置成!=2看后面就知道了    
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    //声明job对象
    Job job = Job.getInstance(conf, "word count");
    //声明当前job的驱动类
    job.setJarByClass(WordCount.class);
    //设置mapper类、Combiner类和Reducer类
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    //设置最终输出结果的key类型和value类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    //找到文件的输入路径
    for (int i = 0; i < otherArgs.length - 1; i++) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    //从输入的参数找到输出的路径
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
    //结束程序
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

3.3手写MR的WordCount案例

3.3.1需求说明

在给定的文本文件中统计输出每一个单词出现的总次数

(1)输入数据hello.txt

atguigu atguigu
ss ss
cls cls
jiao
banzhang
xue
hadoop
sgg sgg sgg
nihao nihao
bigdata0111
laiba               

(2)期望输出数据

atguigu  2
banzhang 1
cls  2
hadoop  1
jiao 1
ss   2
xue 1

3.3.2 在IDEA创建项目并完成一些配置信息

? 创建Maven工程不会IDEA中创建Maven项目的可以参考我之前的博客Maven的安装以及在IDEA中的使用 ,

? (1) 在pom.xml中添加一下配置信息

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
</dependencies>

? (2)在项目的src/main/resources目录下,新建一个文件,命名为“log4j2.xml”,在文件中填入。

这个文件可以简单的理解成打印错误日志的文件。不配置不影响你的接下来操作。

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
    <Appenders>
        <!-- 类型名为Console,名称为必须属性 -->
        <Appender type="Console" name="STDOUT">
            <!-- 布局为PatternLayout的方式,
            输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
            <Layout type="PatternLayout"
                    pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
        </Appender>

    </Appenders>

    <Loggers>
        <!-- 可加性为false -->
        <Logger name="test" level="info" additivity="false">
            <AppenderRef ref="STDOUT" />
        </Logger>

        <!-- root loggerConfig设置 -->
        <Root level="info">
            <AppenderRef ref="STDOUT" />
        </Root>
    </Loggers>

</Configuration>

3.3.3模拟实操MR

? (1)编写mapper类

package com.mr_test.wordcount_hello;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author:Pier
 * @DATE:2022/3/7
 */
public class helloMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    Text outk = new Text();
    IntWritable outv = new IntWritable(1);

    /**
     *  Map阶段的核心业务处理方法
     *  每输入一行数据会调用一次map方法
     * @param key 输入数据的key
     * @param value 输入数据的value
     * @param context 上下文对象
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key ,Text value ,Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        //获取当前输入的数据
        String line = value.toString();
        //切割数据
        String[] splits = line.split(" ");
        for (String split: splits) {
            outk.set(split);
            //遍历集合 封装 输出数据的key 和 value
            context.write(outk,outv);
        }
    }
}

? (2)编写Reducer类

package com.mr_test.wordcount_hello;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

/**
 * @author:Pier
 * @DATE:2022/3/7
 */
public class helloReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    //为啥不在这里定义sum,因为这里算全局变量
//    private int sum = 0;
    IntWritable outv = new IntWritable();

    /**
     * 核心
     * @param key
     * @param values
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key ,Iterable<IntWritable> values ,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        //每次一个key值就会重置
        int sum=0;
        //获取每一个key出现的次数
        for (IntWritable value: values) {
            sum += value.get();
        }
        //输出
        outv.set(sum);
        context.write(key,outv);
    }
}

? (3)编写Driver驱动类

package com.mr_test.wordcount_hello;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author:Pier
 * @DATE:2022/3/7
 */
public class helloDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //获取配置信息
        Configuration conf = new Configuration();
        //获取job对象
        Job job = Job.getInstance(conf);

        //关联Driver程序相关的jar
        job.setJarByClass(helloDriver.class);

        //关联Mapper相关的jar
        job.setMapperClass(helloMapper.class);
        //关联Reducer相关的jar
        job.setReducerClass(helloReducer.class);
        
        //设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //设置最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置输入和输出路径
        FileInputFormat.setInputPaths(job,new Path("D:\\StudyFile\\BigDate\\02.大数据技术之Hadoop\\03.代码\\day04\\MapReduce\\src\\main\\java\\com\\atguigu\\mr\\wordcount\\com.bigData.mapreduce\\src\\main\\resources\\wcinput\\hello.txt"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\StudyFile\\BigDate\\02.大数据技术之Hadoop\\03.代码\\day04\\MapReduce\\src\\main\\java\\com\\atguigu\\mr\\wordcount\\com.bigData.mapreduce\\src\\main\\java\\testData\\wcinput2"));
        
        //提交job
        job.waitForCompletion(true);
    }
}

(4) 运行结果截图

image-20220310101812884

最近忙着摸鱼哈哈,没咋写博客了,但是这两天应该会再次更新MapReduce的后续内容和Yarn的。一起加油呀!!!

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-11 22:17:06  更:2022-03-11 22:19:04 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 19:02:42-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码