一、参考资料
视频链接
二、运行环境
- windows 10
- JDK 8
- Hadoop 3.1.3 windows版
- IDEA
三、CombineTextInputFormat 切片机制
Hadoop框架默认的TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会作为一个单独的切片,都会交给一个MapTask执行,当处理大量小文件时,效率会比较低。
CombineTextInputFormat切片机制作用域小文件过多的场景,可以将多个小文件从逻辑上规划到一个切片中,从而实现多个小文件交给一个MapTask处理。
虚拟存储切片最大值的设置:
CombineTextInputFormat.setMaxInputSplitSize(JOB, 4194304);
注:虚拟存储切片最大值最好根据实际的小文件大小情况来设置具体的值
切片机制:
生成切片过程包括:虚拟存储过程和去切片过程两部分
四、词频统计
4.1 Mapper
package com.uni.combineTextInputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
outKey.set(word);
context.write(outKey, outValue);
}
}
}
4.2 Reducer
package com.uni.combineTextInputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outValue.set(sum);
context.write(key, outValue);
}
}
4.3 Driver 驱动类(关键)
package com.uni.combineTextInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver{
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
FileInputFormat.setInputPaths(job, new Path("input1"));
FileOutputFormat.setOutputPath(job, new Path("output1"));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
五、总结
MapReduce 更换切片机制只需在提交Job前,调用 org.apache.hadoop.mapreduce.Job 对象API。主要有两个步骤,先设置切片机制为CombineTextInputFormat,然后再设置虚拟存储的最大值,这个会根据小文件(按文件名字典顺序的升序结果文件集合)的大小而决定切片的个数。
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
|