IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> mapreduce高阶内容(二) 自定inputFormat -> 正文阅读

[大数据]mapreduce高阶内容(二) 自定inputFormat

1、需求

无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案

2、分析

小文件的优化无非以下几种方式:

  1. 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
  2. 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
  3. 在mapreduce处理时,可采用combineInputFormat提高效率

注意:

本文介绍的是第二种处理方法!!!!

3、实现

本文实现的是上述第二种方式

程序的核心机制:

自定义一个InputFormat

改写RecordReader,实现一次读取一个完整文件封装为KV

在输出时使用SequenceFileOutPutFormat输出合并文件

自定义InputFormat

package cn.laojiajun.myformat.demo1;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class MyInputFormat extends FileInputFormat<NullWritable, BytesWritable> {

    /*** 
     * 这个方法返回值是一个RecordReader
     * 
     *
     * @param inputSplit
     * @param taskAttemptContext
     * @return
     * @throws IOException
     * @throws InterruptedException
     */

    @Override
    public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        MyRecordReader reader = new MyRecordReader();
        reader.initialize(inputSplit,taskAttemptContext);
        return reader;
    }

    /**
     * 表示我们的文件是否可切分
     * 直接返回false表示我们的文件不可切分,到时候读取文件的时候,就会一次性将
     * 文件内容全部读取
     */
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
}

自定义RecordReader

package cn.laojiajun.myformat.demo1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MyRecordReader extends RecordReader<NullWritable, BytesWritable> {

    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();  //这里是定义value2
    private boolean processed = false;

    /**
	 * 初始化方法
	 * 参数 inputsplit 为文件的切片
	 * 拿到了文件的切片,就可以拿到文件
	 * 拿到了文件就可以转换成数组
	 *
	 * 参数 context 为上下蚊对象,我们一些参数都封装在context里面
	 */
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) inputSplit;
        this.conf = taskAttemptContext.getConfiguration();
    }


    /**
	 * 方法作用为 往下继续读取文件
	 * 返回值是boolean 如果返回为true 表示继续文件已经读完成,不需要继续往下读取了
	 * 如果返回是 false ,那么就要继续读取
	 */
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
		//需要实现根据我们的文件的切片,将我们文件的内容全部读取出来,封装到BytesWritable里面去
        if(!processed) {
			//字节数组定义的大小,需要装得下我们的文件内容
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
			//获取我们的文件系统
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
				//拿到fileSystem之后我们就可以打开文件的输入流
                in = fs.open(file);
				//将输入流转到字节数组中去
                IOUtils.readFully(in, contents, 0,contents.length);
                value.set(contents,0,contents.length);
            } finally {
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }

        return false;
    }

    /**
	 * 这个方法,返回我们的 k1
	 */
    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }


    /**
	 * 这个方法,返回我们的v1  BytesWritable 我们需要将我们的文件内容读取出来
	 * 封装到BytesWritable里面机型返回
	 */
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }


    /**
	 * 文件读取进度的方法,没什么用
	 */
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return processed?1.0f:0.0f;
    }


    /**
	 * 读取完成之后,释放资源,也没什么用
	 */
    @Override
    public void close() throws IOException {

    }
}

定义Mapper程序

package cn.laojiajun.myformat.demo1;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MyMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
    private Text filenameKey;

    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        context.write(filenameKey, value);
    }

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
		//获取文件的切片
        InputSplit split = context.getInputSplit();
		//获取文件路径
        Path path = ((FileSplit)split).getPath();
        //获取文件名
		filenameKey = new Text(path.toString());
    }
}

主运行程序

package cn.laojiajun.myformat.demo1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyFormatRun extends Configuration implements Tool {


    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf,"测试任务");
        job.setJarByClass(MyFormatRun.class);
        job.setInputFormatClass(MyInputFormat.class);
        MyInputFormat.addInputPath(job,new Path("hdfs://192.168.88.3:8020/test_dir"));
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.88.3:8020/src_output2"));

		//设置reduce 输出类型,虽然没有写reduce程序,但是默认输出类型 Text BytesWritable
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        job.setMapperClass(MyMapper.class);

        return job.waitForCompletion(true)?0:1;
    }

    @Override
    public void setConf(Configuration conf) {

    }

    @Override
    public Configuration getConf() {
        return null;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MyFormatRun(),args);
        System.exit(exitCode);
    }

}

运行完毕之后,大概会显现下面的结果,前面部分是文件名,后面文件内容会出现乱码

?

hadoop执行情况

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-06 11:13:37  更:2021-09-06 11:15:58 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/22 12:16:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码