IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Hadoop | MapReduce学习笔记 | JavaAPI更换切片机制 | CombineTextInputFormat 切片 | 词频统计案例 -> 正文阅读

[大数据]Hadoop | MapReduce学习笔记 | JavaAPI更换切片机制 | CombineTextInputFormat 切片 | 词频统计案例

一、参考资料


视频链接

二、运行环境


  • windows 10
  • JDK 8
  • Hadoop 3.1.3 windows版
  • IDEA

三、CombineTextInputFormat 切片机制


Hadoop框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会作为一个单独的切片,都会交给一个MapTask执行,当处理大量小文件时,效率会比较低。

CombineTextInputFormat切片机制作用域小文件过多的场景,可以将多个小文件从逻辑上规划到一个切片中,从而实现多个小文件交给一个MapTask处理。

虚拟存储切片最大值的设置:

CombineTextInputFormat.setMaxInputSplitSize(JOB, 4194304); // 4 MB

注:虚拟存储切片最大值最好根据实际的小文件大小情况来设置具体的值

切片机制:

生成切片过程包括:虚拟存储过程和去切片过程两部分

四、词频统计


4.1 Mapper

package com.uni.combineTextInputFormat;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/** 词频统计
 * <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 * KEYIN , map 阶段输入key的类型: LongWritable
 * VALUEIN, map 阶段输入value的类型: Text
 * KEYOUT, map 阶段输出vkey的类型:Text
 * VALUEOUT, map阶段输出的value类型:IntWritable
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    // 放在上面声明防止在循环里多次创建对象,浪费空间
    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1. 获取一行
        String line = value.toString();
        // 2. 切割
        String[] words = line.split(" ");
        // 3. 循环写出
        for (String word : words) {
            // 封装 outKey
            outKey.set(word);
            context.write(outKey, outValue);
        }
    }
}

4.2 Reducer

package com.uni.combineTextInputFormat;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/** 词频统计
 * <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 * KEYIN , reduce 阶段输入key的类型: Text
 * VALUEIN, reduce 阶段输入value的类型: IntWritable
 * KEYOUT, reduce 阶段输出vkey的类型:Text
 * VALUEOUT, reduce阶段输出的value类型:IntWritable
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private  IntWritable outValue = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 累加
        for (IntWritable value : values) {
            sum += value.get();
        }
        outValue.set(sum);
        // 写出
        context.write(key, outValue);
    }
}

4.3 Driver 驱动类(关键)

package com.uni.combineTextInputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver{
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1. 获取 job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2. 设置jar包路径
        job.setJarByClass(WordCountDriver.class);
        // 3. 关联 mapper 和 reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 4. 设置 map 输出的 k v 类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5. 设置最终输出的k v类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6. 修改切片规则, 默认是TextInputFormat.calss
        job.setInputFormatClass(CombineTextInputFormat.class);
        // 7. 虚拟存储切片最大值设置成4MB
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
        // 8. 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("input1"));
        FileOutputFormat.setOutputPath(job, new Path("output1"));
        // 9. 提交 job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

五、总结


MapReduce 更换切片机制只需在提交Job前,调用 org.apache.hadoop.mapreduce.Job对象API。主要有两个步骤,先设置切片机制为CombineTextInputFormat,然后再设置虚拟存储的最大值,这个会根据小文件(按文件名字典顺序的升序结果文件集合)的大小而决定切片的个数。

// 设置切片机制
job.setInputFormatClass(CombineTextInputFormat.class);
// 设置虚拟存储切片最大值
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-07 13:47:24  更:2022-02-07 13:47:58 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 1:21:21-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码