1、需求
无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案
2、分析
小文件的优化无非以下几种方式:
- 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
- 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
- 在mapreduce处理时,可采用combineInputFormat提高效率
注意:
本文介绍的是第二种处理方法!!!!
3、实现
本文实现的是上述第二种方式
程序的核心机制:
自定义一个InputFormat
改写RecordReader,实现一次读取一个完整文件封装为KV
在输出时使用SequenceFileOutPutFormat输出合并文件
自定义InputFormat
package cn.laojiajun.myformat.demo1;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class MyInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
/***
* 这个方法返回值是一个RecordReader
*
*
* @param inputSplit
* @param taskAttemptContext
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
MyRecordReader reader = new MyRecordReader();
reader.initialize(inputSplit,taskAttemptContext);
return reader;
}
/**
* 表示我们的文件是否可切分
* 直接返回false表示我们的文件不可切分,到时候读取文件的时候,就会一次性将
* 文件内容全部读取
*/
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
}
自定义RecordReader
package cn.laojiajun.myformat.demo1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class MyRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable(); //这里是定义value2
private boolean processed = false;
/**
* 初始化方法
* 参数 inputsplit 为文件的切片
* 拿到了文件的切片,就可以拿到文件
* 拿到了文件就可以转换成数组
*
* 参数 context 为上下蚊对象,我们一些参数都封装在context里面
*/
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) inputSplit;
this.conf = taskAttemptContext.getConfiguration();
}
/**
* 方法作用为 往下继续读取文件
* 返回值是boolean 如果返回为true 表示继续文件已经读完成,不需要继续往下读取了
* 如果返回是 false ,那么就要继续读取
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
//需要实现根据我们的文件的切片,将我们文件的内容全部读取出来,封装到BytesWritable里面去
if(!processed) {
//字节数组定义的大小,需要装得下我们的文件内容
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
//获取我们的文件系统
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
//拿到fileSystem之后我们就可以打开文件的输入流
in = fs.open(file);
//将输入流转到字节数组中去
IOUtils.readFully(in, contents, 0,contents.length);
value.set(contents,0,contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
/**
* 这个方法,返回我们的 k1
*/
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
/**
* 这个方法,返回我们的v1 BytesWritable 我们需要将我们的文件内容读取出来
* 封装到BytesWritable里面机型返回
*/
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
/**
* 文件读取进度的方法,没什么用
*/
@Override
public float getProgress() throws IOException, InterruptedException {
return processed?1.0f:0.0f;
}
/**
* 读取完成之后,释放资源,也没什么用
*/
@Override
public void close() throws IOException {
}
}
定义Mapper程序
package cn.laojiajun.myformat.demo1;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class MyMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
private Text filenameKey;
@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(filenameKey, value);
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//获取文件的切片
InputSplit split = context.getInputSplit();
//获取文件路径
Path path = ((FileSplit)split).getPath();
//获取文件名
filenameKey = new Text(path.toString());
}
}
主运行程序
package cn.laojiajun.myformat.demo1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyFormatRun extends Configuration implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"测试任务");
job.setJarByClass(MyFormatRun.class);
job.setInputFormatClass(MyInputFormat.class);
MyInputFormat.addInputPath(job,new Path("hdfs://192.168.88.3:8020/test_dir"));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.88.3:8020/src_output2"));
//设置reduce 输出类型,虽然没有写reduce程序,但是默认输出类型 Text BytesWritable
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setMapperClass(MyMapper.class);
return job.waitForCompletion(true)?0:1;
}
@Override
public void setConf(Configuration conf) {
}
@Override
public Configuration getConf() {
return null;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MyFormatRun(),args);
System.exit(exitCode);
}
}
运行完毕之后,大概会显现下面的结果,前面部分是文件名,后面文件内容会出现乱码
?
hadoop执行情况
?
|