IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> MapReduce笔记 —— 手动设置多个ReduceTask以及设置Combiner(两种方式运行MapReduce) -> 正文阅读

[大数据]MapReduce笔记 —— 手动设置多个ReduceTask以及设置Combiner(两种方式运行MapReduce)

手动设置多个ReduceTask

先来看只有一个ReduceTask时的词频统计的结果
当没有手动设置ReduceTask的数量时,默认只有一个reduceTask

数据为
在这里插入图片描述

package Demo.mr.WordCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    @Override
    protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
        String datas = value.toString();
        String[] split = datas.split(",");
        for (String data : split){
            context.write(new Text(data),new IntWritable(1));
        }
    }
}

package Demo.mr.WordCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
        int sum=0;
        for(IntWritable val:values){
            sum=sum + val.get();
        }
        context.write(key,new IntWritable(sum));
    }
}

package Demo.mr.WordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //向yarn申请一个job任务用于执行mapreduce程序
        Job job = Job.getInstance(new Configuration());
        //设置入口类
        job.setJarByClass(WordCountDriver.class);
        //设置mapper类
        job.setMapperClass(WordCountMapper.class);
        //设置reduce类
        job.setReducerClass(WordCountReducer.class);
        //设置Mapper类的输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置reduce类的输出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置要处理的文件
        FileInputFormat.addInputPath(job,new Path("hdfs://master:9000/data/words.txt"));
        //设置输出路径
        FileOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/output"));
        //启动
        job.waitForCompletion(true);
    }
}
在idea中启动MapReduce

这里是因为给出路径时,“hdfs://master:9000/data/words.txt” 直接连接到了hdfs中的文件路径,所以可以在idea中直接运行
在这里插入图片描述

结果为
在这里插入图片描述
part-r-00000的内容为
hadoop 4
hive 2
java 3
python 2
word 2

然后手动设置ReduceTask的数量为2
在WordCountDriver类,也就是主方法中设置,只需要一条语句 job.setNumReduceTasks(2);

public class WordCountDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(WordCountDriver.class);

        //设置reduceTask数量
        job.setNumReduceTasks(2);

        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job,new Path("hdfs://master:9000/data/words.txt"));
        FileOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/output"));
        job.waitForCompletion(true);
    }
}

Map端和Reduce端的代码不需要改变

看一下结果
在这里插入图片描述
这里就会发现,每个ReduceTask都会产生一个自己的结果文件,这里的两个ReduceTask分别产生了part-r-00000以及part-r-00001文件
分别打开这两个文件

part-r-00000的内容为
hadoop 4

part-r-00000的内容为
hive 2
java 3
python 2
word 2

这里的part-r-00000以及part-r-00001两个文件的内容合在一起才是上面单个ReduceTask任务结果文件的内容。
这里是因为不同key的键值对的partition值不一样,因此会被传入不同的reduceTask中

简单的测试一下partiton

package Demo.mr;

public class Test {
    public static void main(String[] args) {
        int h = "hadoop".hashCode();
        System.out.println(h%2);

        int h1 = "hive".hashCode();
        int h2 ="java".hashCode();
        int h3 ="python".hashCode();
        int h4 = "word".hashCode();
        System.out.println(h1%2);
        System.out.println(h2%2);
        System.out.println(h3%2);
        System.out.println(h4%2);
    }
}

在这里插入图片描述
会发现hadoop的值,与剩下四个hive,java,python,word都不相同
所以key为hadoop的键值对单独进入一个reduceTask里面,然后计算后被输出在当前reduceTask对应的结果文件part-r-00000里面

key为hive,java,python,word的这些键值对会被送往另一个接收partition值为0的reduceTask中,然后被输出在文件part-r-00001里面

手动设置Combiner

Combiner类如果不自己定义的话,默认的shuffle过程中是不会combine的

先来看看没有combine的执行情况

package Demo.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Gender {
    public static class GenderMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(",");
            if("男".equals(split[3])){
                context.write(new Text(split[3]),new IntWritable(1));
            }
        }
    }

    public static class GenderReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception{
        Job job = Job.getInstance();
        job.setNumReduceTasks(2);
        job.setJobName("class age sum");
        job.setJarByClass(ClazzAgeSum.class);

        //map端
        job.setMapperClass(GenderMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //reduce端
        job.setReducerClass(GenderReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //指定输入输出路径
        Path input = new Path("/data/students.txt");
        FileInputFormat.addInputPath(job,input);
        Path output = new Path("/output");

        FileSystem fs = FileSystem.get(new Configuration());

        if(fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job,output);

        //启动job
        job.waitForCompletion(true);

    }
}

在这里插入图片描述

通过jar包在linux终端执行

这里没有给出确切的hdfs的文件位置,所以不能像上面设置多个reduceTask的代码一样直接在idea里面运行,需要打成jar包然后传到linux里面用命令运行
在这里插入图片描述
hadoop jar /usr/local/jar/hdfs-1.0-SNAPSHOT.jar Demo.mr.Gender
格式为 hadoop ??jar ??jar的位置 ??jar包里面具体执行的类名

运行过程的信息如图所示
在这里插入图片描述
再来看看写了combine的情况

需要写具体的Combiner类,还要在主方法里面加上一句job.setCombinerClass(CombineReducer.class);

package Demo.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Gender {
	//Map端
    public static class GenderMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(",");
            if("男".equals(split[3])){
                context.write(new Text(split[3]),new IntWritable(1));
            }
        }
    }

	//Combiner预聚合
    public static class CombineReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }

	//Reduce端
    public static class GenderReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception{
        Job job = Job.getInstance();
        job.setNumReduceTasks(2);
        job.setJobName("class age sum");
        job.setJarByClass(ClazzAgeSum.class);
        job.setMapperClass(GenderMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //combine端 预聚合
        job.setCombinerClass(CombineReducer.class);

        job.setReducerClass(GenderReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        Path input = new Path("/data/students.txt");
        FileInputFormat.addInputPath(job,input);
        Path output = new Path("/output");
        FileSystem fs = FileSystem.get(new Configuration());
        if(fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job,output);
        job.waitForCompletion(true);

    }
}

写好后重新打包,打包的时候继续双击package即可,会自动覆盖原先的旧的jar包,然后再重新上传
在这里插入图片描述
之前没有设置Combiner时,红线标出来的地方 Combine的input和output后面的值都为0,说明没有combine过程。这里设置Combiner后,就有值了,说明经过了combine过程

再来看一下具体的数据,Combine input records读取的数据量为507,而上面几行的Map output records的值同样为507。Combine就相当于一个发生在reduce之前的reduce端,接收一个MapTask输出的值进行combine过程后,等待map的shuffle阶段结束,将不同map的combine输出结果传送到对应的reduceTask那里

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-22 12:24:30  更:2021-11-22 12:25:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 21:57:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码