IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 手把手与你一起走 MapReduce 编程的套路 -> 正文阅读

[大数据]手把手与你一起走 MapReduce 编程的套路

目录

一.MapReduce的运行步骤(待补充)

二.编程模板

? ? ? ?1.自定义?Mapper类继承Mapper,并重写map方法:

????????2.自定义Reducer类集成Reducer,并重写Reduce方法

??? ? ? ? 3.Driver 主入口,整合mapper和reducer

? ? ? ? 4.idea 打成jar包

? ? ? ? 5.服务器执行hadoop jar?


一.MapReduce的运行步骤(待补充)

二.编程模板

? ? ? ? 按照以下的1,2,3模板进行编程,然后打成jar包,放在服务器上运行。

????????以耳熟能详的WordCount举例,统计多个文件中每个单词的数量。

? ? ? ?1.自定义?Mapper类继承Mapper,并重写map方法:

? ? ? ? map方法一般是读取Hdfs上的数据,默认是一行行读取,有多少行,就调用了多少次map方法:

package com.xxj.hadoop.config.wc;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 自定义类 extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 * 输入:一行行读取
 * KEYIN, 输入的key的类型   这里指的是每一行的起始偏移量  long
 * VALUEIN 输入的value的类型  这里指的是每一行的内容和偏移量一一对应的   String
 * 输出:
 * KEYOUT,  输出的键的类型 ,类型取决于  业务
 * VALUEOUT,输出的值的类型,类型取决于  业务
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split(",");
        //比如每一行aa   bb  cc,根据逗号, 切分后,把字符串为key,value为1
        for (String word : split) {
            Text text = new Text(word);
            IntWritable num = new IntWritable(1);
            context.write(text, num);
        }
    }
}

? ? ? ? 其中:mapper后的四个泛型<KEYIN, VALUEIN, KEYOUT, VALUEOUT>:

???输入:一行行读取
????????KEYIN 输入的key的类型   这里指的是每一行的起始偏移量,long,没有实际意义,hadoop的底层数据读取的时候是按字节读取的
????????VALUEIN 输入的value的类型  这里指的是每一行的内容和偏移量一一对应的   String
???输出:
????????KEYOUT,  输出的键的类型 ,类型取决于  业务
????????VALUEOUT,输出的值的类型,类型取决于  业务

???????这里的数据类型 ?不能使用java的原生类型,如int,String。

? ? ? ? 原因:?

? ? ? ? (1)首先在数据存储和数据网络传输中,需要将数据序列化和反序列化,如传输:“顽强的豆芽”,首先会将“顽强的豆芽” 序列化为 1000101001,传输完成,读取的时候,会将1000101001反序列化为我们看得懂的“顽强的豆芽”。

? ? ? ? (2)Java的原生类型,如int,String,Long虽然实现了Java的序列化Serializable接口,但是Serializable接口太重且繁琐,因为它不仅会将值序列化,也会将相关类也序列化,所以Hadoop自己实现的自己的一套序列化和反序列化的接口:Writable,只会对数据的值进行序列化和反序列化。

? ? ? ? Hadoop也实现一些基本数据类型的序列化类,可以供我们使用:

JavaHadoop
intIntWritable
longLongWritable
stringText
byteByteWritable
doubleDoubleWritable
floatFloatWritable
booleanBooleanWritable
nullNullWritable

???????????????自己定义的需要序列化和反序列化可以通过实现 Writable接口来使用。

? ? ? ? ? ? ? ? 在重写map方法时,如果中间处理数据时将类型转化为Java的数据类型,将结果写入上下文对象Context,要重新转为Hadoop的类型。

????????2.自定义Reducer类集成Reducer,并重写Reduce方法

? ? ? ? ?reducer是读取map初步处理好的数据,做数据计算,比如求和,求最大值,最小值等等,

package com.xxj.hadoop.config.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //循环遍历values  求和
        int sum=0;
        for(IntWritable v:values){
          //mapreduce的框架已经帮我们做好了从map出来后已经做好按key分组,
            // 也就是到这里的,Iterable<IntWritable> values 是同一个单词的数量迭代器,进行相加就可以得到最后的数量
            //类似于{"aa":[1,1,1,1,1]},所以统计aa单词出现的个数的话,只需要将迭代器中的[1,1,1,1,1]相加就可以得出总数
            sum+=v.get();
        }
        //写出结果文件
        IntWritable rv=new IntWritable(sum);
        context.write(key, rv);
    }
}


??
? ? ? ? 3.Driver 主入口,整合mapper和reducer

? ? ? ? Driver类中有入口main方法,这个方法主要(1)开启一个job (2)指定mapper类和reducer类,然后程序不知道这两个类是在jar中哪个文件,(3)设置map输出key value的类型和设置reduce输出key value的类型,虽然在具体的mapper和reducer类已经指定类型,但是因为泛型编译的时候生效,运行的时候泛自动擦除,所以这主函数需要再设置,

(3)创建读取流 FileInputFormat 来读取hdfs的数据,一行行(底层是文件读取器:LineRecordReader

)传入map方法,通过map方法,和框架的shuffle过程(在框架里面,我们不用写代码,自动帮我们实现,主要是实现,分区 Partitioner ,排序 sort ,分组 group, combiner组件,这个在mapreducer过程解析会提到),以及reduce方法,也会(4)创建输出流FileOutputFormat 将结果输出的hdfs的指定位置。(5)job提交语句:job.waitForCompletion(true) ,true表示需要打印日志

?

package com.xxj.hadoop.config.wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver {
    public static void main(String[] args) {
        //将mapper  reducer类进行一个封装  封装为一个任务----job(作业)
        //加载配置文件
        Configuration conf = new Configuration();
        //启动一个Job  创建一个job对象
        try {
            Job job = Job.getInstance(conf);
            //设置这个job
            //设置整个job的主函数入口
            job.setJarByClass(Driver.class);

            //设置job的mappper的类
            job.setMapperClass(WordCountMapper.class);

            //设置job的reducer的类
            job.setReducerClass(WordCountReducer.class);


            //设置map输出key   value的类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);

            //设置reduce的输出的k   v类型  以下方法设置的是mr的最终输出
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            //指定需要统计的文件的输入路径  FileInputFormat  文件输入类
            Path inpath = new Path(args[0]);
            FileInputFormat.addInputPath(job, inpath);
            
            //指定输出目录  输出路径不能存在的  否则会报错  默认输出是覆盖式的输出  如果输出目录存在  有可能造成原始数据的丢失
            Path outpath = new Path(args[1]);
            FileOutputFormat.setOutputPath(job, outpath);
            //提交job  执行这一句的时候 job才会提交  上面做的一系列的工作  都是设置job
            job.waitForCompletion(true);

        } catch (Exception e) {

            e.printStackTrace();

        }
    }

}

????????

? ? ? ?

? ? ? ? 4.idea 打成jar包

?选择file---->project Structure

?

?按完apply之后,可以选择主文件,这样到时候在服务器上运行jar包的时候就不用指定主函数文件路径,

?

然后再就会out文件夹出现打的jar包

?上hadoop集群的任一节点:

在之前上传了数据3份数据:

?每份数据为:

?

? ? ? ? 5.服务器执行hadoop jar?

???????????????hadoop jar wc01.jar /wcnew/ /wcoutnew/

最后查询得出的结果:

?

?

?

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-23 16:45:39  更:2021-08-23 16:45:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年10日历 -2024/10/27 18:23:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码