IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> MapReduce-wordcount -> 正文阅读

[大数据]MapReduce-wordcount

MapReduce概述

????????MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。MapReduce核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

优点:

1MapReduce易于编程

它简单的实现一些接口,就可以完成一个分布式程序。

2)良好的扩展性

当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

3)高容错性

MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

4)适合PB级以上海量数据的离线处理

缺点:

1)不擅长实时计算

2)不擅长流式计算

流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。

3)不擅长DAG(有向无环图)计算

多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

MapReduce的计算过程

MapReduce的计算过程可以分为两个步骤:

先进行Map阶段再进行Reduce阶段。Map阶段的任务是并发的,Reduce阶段的任务也是并发的。同时该编程模型只能包含一个Map阶段和一个Reduce阶段,如果业务逻辑非常复杂,那么就要写多个MapReduce程序。

WordCount案例

在Hadoop里,其自行封装了一些数据序列化类型,要与Java的进行对比

Java类型

Hadoop Writable类型

Boolean

BooleanWritable

Byte

ByteWritable

Int

IntWritable

Float

FloatWritable

Long

LongWritable

Double

DoubleWritable

String

Text

Map

MapWritable

Array

ArrayWritable

Null

NullWritable

MapReduce的编程规范:

Mapper ,Reducer ,Driver

1.Mapper阶段

??????? 1)用户定义的Mapper要继承父类

??????? 2)Mapper的输入输出类型都是KV对(可自定义)

??????? 3)Mapper的业务逻辑写在map()方法中

??????? 4)map()方法对每一个KV都会调用一次

以WordCount为例,? a a a

每一次调用的map方法,会读取一行如a a a? ,经过业务逻辑分割,KV对为 <a,1><a,1><a,1>

此时的map是不会对着三个键值对进行整合的,会直接对其输出,作为上下文context交给Reducer。

2.Reducer阶段 ?

????????1)用户定义的Mapper要继承父类

??????? 2)Mapper的输出类型都是KV对(可自定义)

??????? 3)Mapper的业务逻辑写在reduce()方法中

??????? 4)reduce Task进程会对每一组相同K的<K,V>组调用一次reduce方法()

即Mapper传递过来的KV对,找到相同的K,依次进行组合。输出<a,3>

3.Driver阶段

??????? 相当于Yarn集群的客户端,将我们的任务提交给Yarn,由yarn为我们的任务分配资源。传递给Yarn的是封装了MapReduce程序相关运行参数的job对象

WordCount案例本地测试(pom.xml配置了依赖,且配置log4j的日志资源)

(由于之前在Windows环境下安装了hadoop并且配置了Windows依赖,我们先准备好java程序,在本地进行测试。)

编写Mapper、Reducer、Driver类(注意导包不要发生错误

package com.zc.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @program:MapReduceDemo
 * @descripton:
 * KEYIN,---------      LongWriterable
 * VALUEIN::---------  Text
 * KEYOUT, ---------:  Text
 * VALUEOUT,---------  IntWriterable
 **/
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
    private Text outKey = new Text();//提高性能
    private IntWritable outVal = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1.获取一行信息
        String line = value.toString();
        //切割
        String[] words = line.split(" ");//zc jyx 等
        //循环写出
        for (String word : words) {
            //封装
            outKey.set(word);
            //写出
            context.write(outKey,outVal);
        }
    }
}
package com.zc.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @program:MapReduceDemo
 * @descripton:map的输出,是Reduce的输入
 * KEYIN,   ------  Text
 * VALUEIN, ------  IntWriterable
 * KEYOUT,  ------  Text
 * VALUEOUT ------  IntWriterable
 **/
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    private IntWritable outVal = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int sum =0;
        //Iterable相当于是一个集合
        for (IntWritable value : values) {
            sum += value.get();
        }
        //写出
        outVal.set(sum);
        context.write(key,outVal);
    }
}
package com.zc.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @program:MapReduceDemo
 * @descripton:
 * @author:ZhengCheng
 * @create:2021/10/28-21:00
 **/
public class WordCountDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //2.设置jar包路径
        job.setJarByClass(WordCountDriver.class);
        //3.关联mapper和reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //4.设置map的输出的key-val类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //5。设置最终输出的key-val类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //6.设置输入路径和输出路径
     //   FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileInputFormat.setInputPaths(job, new Path("E:\\HDFS"));
     // FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path("E:\\HDFSout\\a"));
        //7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
    /*
    传入的true or false 就是看是否需要这些东西。即监控我们的进程并且printjob
    if (verbose) {
            this.monitorAndPrintJob();
        } else {
            int completionPollIntervalMillis = getCompletionPollInterval(this.cluster.getConf());

            while(!this.isComplete()) {
                try {
                    Thread.sleep((long)completionPollIntervalMillis);
                } catch (InterruptedException var4) {
                }
            }
        }
     */


}

注意:此时运行IDEA要用管理员身份运行,避免发生权限不足导致的异常。其次,我们在设置输出路径时,我们需要注意,输出路径必须不存在。否则会有FileAlreadyExist异常。

之后变会得到输出结果。在part-r-00000中用nodepad++打开即可。

本地测试无误后,我们将上述代码中的输入输出路径做出更改。为了让我们可以随意的指定输入输出路径,按照类似shell编程中的传递参数,我们将其修改为args[0] 和args[1]

如果不知道String args[]是什么东西,可以查看相关文章。其类似于我们在执行该程序时,给程序传递的参数。第一个参数赋给arg[0],依次类推。

那么我们将文件用Maven打包,导入到虚拟机(集群)中。开启Hadoop集群,使用命令运行该jar包。

?

?注意:jar包里的Diver,一定要使用全类名。不知道的可以在IDEA里,找到Copy Reference即可复制。

给出输入输出路径,执行即可。如果出现了一些异常。可以查看我的相关文章。

最后在Hadoop-1:9870中,我们可以看到自己的输出结果。WordCount案例顺利完成。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-30 12:37:23  更:2021-10-30 12:38:36 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 2:09:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码