所有实例都是在本地环境下测试的,无需启动集群!
版本说明:
idea:2021.2.2
jdk:1.8
maven:3.8.2(用idea自带的也行)
1. MapReduce 框架原理
运行大致步骤:
MapReduce
FileOutputFormat
ReduceTask
Shuffle
MapTask
FileInputFormat
1.1 MapTask
工作机制简述: 输入的数据通过 split 被逻辑切分为多个 split 文件,通过 Record 按行读取内容给 map(用户实现)进行处理,数据被 map 处理结束后交给 OutputCollector 收集器,对其结果 key 进行分区(hash),然后写入 buffer,每个 maptask 都有一个内存缓冲区,存储着 map 的输出结果,当缓冲区快满时需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个 maptask 结束后,再对磁盘中 maptask 产生的所有临时文件做合并,生成最终的正式输出文件,然后等待 reducetask 获取该数据。
详解见下文…
1.2 ReduceTask
工作机制简述: Reduce 分为 copy、sort、reduce 三个阶段。ReduceTask 从各个 MapTask 上远程 copy 一片数据,进行一次归并排序,最后由 reduce(用户实现)将数据写到 HDFS 上。
详解见下文…
2. 数据输入
数据输入:输入文件的格式包括:日志文件、二进制文件、数据库表等,具体的文件类型,就需要对应的 InputFormat 来读取数据。
常用的 InputFormat 的实现类:
- FileInputFormat: 按照文件的内容长度进行切片,切片大小默认等于 Block 大小,切片时不需要考虑数据集整体,只是针对每一个文件单独切片。
- TextInputFormat: 是 FileInputFormat 默认的实现类,按照行读取每行记录,键是存储改行在整个文件中的起始字节偏移量(offset),LongWritable 类型;值是该行的内容,Text 类型。
- CombineTextInputFormat: 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,及将多个小文件交给一个 MapTask 处理。
FileInputFormat 常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、 NLineInputFormat、CombineTextInputFormat 和自定义 InputFormat 等。
数据块: 是 HDFS 存储的数据单位,实际上是 HDFS 将文件分成一块一块(Block)
数据切片: 是 MapReduce 程序计算输入数据的单位,一个切片会启动一个对应的 MapTask(数据切片只是逻辑上对输入的数据进行分片)
一个 Job 的 Map 阶段的并行度由切片数决定,多少个切片就分配多少 MapTask,默认切片大小 = BlockSize
2.1分片机制
MapReduce 程序启动时,会使用 InputFormat 计算任务的分片,分片的个数与 MapReduce 任务启动的 MapTask 的线程数(并发度)对应。
通常使用两种分片机制: FileInputFormat、 CombineTextInputFormat
2.2 FileInputFormat 分片机制
2.2.1 getSplits 方法
源码中获取切片的方法为 org.apache.hadoop.mapreduce.lib.input.FileInputFormat#getSplits
为啥分片是用这个方法呢?
源码类的开头可以看到: This provides a generic implementation of getSplits(JobContext).
通过打断点,我们发现,程序运行时,也确实进入了该方法
2.2.2 getSplits 源码
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file : files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(makeSplit(path, length - bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else {
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
2.2.3 getSplits 举例
有两个文件:
File1.txt :300MB
File1 将分割为:File1.txt.split1(128MB)、File1.txt.split2(128MB)、File1.txt.split3(44MB)
File2.txt :130MB
130 < 128 * 1.1,所以将文件整体作为一个分片
File2 将分割为:File2.txt.split1(130MB)
2.2.4 案例实操
案例:对IP进行数量的统计
public class Job_IPCountDriver {
public static class Job_IPCountMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String ip = value.toString().split(" ")[0];
k.set(ip);
context.write(k, v);
}
}
public static class Job_IPCountReducer
extends Reducer<Text, IntWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (IntWritable value : values) {
count += value.get();
}
LongWritable v = new LongWritable(count);
context.write(key, v);
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Job_IPCountDriver.class);
job.setMapperClass(Job_IPCountMapper.class);
job.setReducerClass(Job_IPCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
📥 输入参数:日志文件 log.txt 路径、输出路径
📤 查看输出:在IDEA 运行中打印的日志我们可以找到如下内容,:
[INFO ] 2021-11-24 21:47:36,367 method:org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:198)
number of splits:1
...
Shuffled Maps =1
Merged Map outputs=1
证明该日志只被分成了一片。
2.3 CombineTextInputFormat 分片机制
CombineTextInputFormat 是 CombineFileInputFormat 的子类。
获取切片的方法为 org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat#getSplits
进入源码我们可以发现,已经完全重写了该方法,不用详细理解其底层逻辑。
我们知道,如果文件未超过块大小的 1.1 倍(默认值),FileInputFormat 会将每个文件作为一个分片,那么如果有许多小文件,那 FileInputFormat 就不再适合了。
举例:
准备一个包含四个日志文件的文档 logs,将该文档的绝对路径作为输入参数,再次运行。
[INFO ] 2021-11-24 21:56:16,737 method:org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:198)
number of splits:4
...
Shuffled Maps =4
Failed Shuffles=0
Merged Map outputs=4
当有许多小文件时,可以通过 CombineFileInputFormat 将许多小文件从逻辑上规划到一个切片中,然后交给 一个 MapTask 处理。
2.3.1 使用
如果需要使用 CombineTextInputFormat,只需在 Driver 类中设置 InputFormatClass,并设置分片的大小即可。
Driver:
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job,10 * 1024 * 1024);
CombineTextInputFormat.setMinInputSplitSize(job,10 * 1024 * 1024);
2.3.2 分片机制
生成切片包括两个过程:虚拟存储过程、切片过程。
虚拟存储过程:
获取输入路径下所有的文件大小,一次跟 setMaxInputSplitSize 值作比较,如果小于最大值,则全部划分为一片;如果文件大于设置的最大值且大于两倍,那么将以最大值切割为一片;如果文件大于最大值且小于2倍最大值,则将文件划分为2个虚拟存储块(平均分)。
举例:
setMaxInputSplitSize = 10MB ,如果文件为22MB,将先切出一片,剩余12MB,如果按最大值切分,就会出现一个2MB的小文件,所以会将剩余12MB的文件平均分两份,最后将分为3个分片:10MB、6MB、6MB。
🌟 切片过程:
判断文件是否大于 setMaxInputSplitSize ,若大于则形成切片;若不大于则跟下一个文件进行合并,形成一个切片
举例:
File1.txt 3MB、File2.txt 12MB、File3.txt 5MB、File4.txt 16MB
虚拟存储后将形成6个文件,分别为:3MB、(6MB、6MB)、5MB、(8MB、8MB)
最终切分为个文件,分别为:(3MB+6MB)、(6MB+5MB)、(8MB+8MB)
2.3.3 案例实操
修改上面案例的main方法:
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
...
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job,100 * 1024 * 1024);
CombineTextInputFormat.setMinInputSplitSize(job,100 * 1024 * 1024);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
📥 输入参数:包含4个日志文件的输入路径、输出路径
📤 查看输出:
number of splits:1
...
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
2.4 自定义文件输入格式化类
我们知道, HDFS 的一个块大小默认为 128MB,而 MapReduce 是依据块大小实现分片的,所以如果存储和处理很多的小文件,那么每个文件都会被 HDFS分为一块、被 MApReduce 分成一片,这样就导致小文件占用大量的存储和计算资源,为解决此问题,Hadoop 提供了两种容器来存储小文件:SeqenceFile、MapFile。
二者本质上都以键值对来存储数据。
MapFile 是一种排序了的 SquenceFile,其由 index 和 data 两部分组成。index 作为文件的数据索引,主要记录了每个 Record 的 key 值,以及该 Record 在文件中的偏移位置。在 MapFile 被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定 Record 所在文件位置。
源自:https://zhuanlan.zhihu.com/p/36768065
🌟 MapReduce 允许开发者根据自己的需要创建自定义的数据输入格式化类。
实现步骤:
- 自定义一个类,继承
FileInputFormat ,实现 createRecordReader() 方法 - 自定义一个类,继承
RecordReader ,实现6个方法。 - 在 Driver 中使用
job.setInputFormatClass (设置自定义输入格式化类)
2.4.1 SeqenceFile
SequenceFile: 是一个由二进制序列化过的key/value的字节流组成的存储文件。
HDFS 和 MapReduce job使用 SequenceFile 文件能够提高读取效率!
SequenceFile 格式由一个 Header (文件头)及多个 Record(记录)组成。
Header: SequenceFile 文件的前三个字节为 SEQ,包含 Key 的类型、Value 的类型,同以及一些其他信息。
Record: 记录的长度、Key长度、Key值和Value值,并且Value值的结构取决于该记录是否被压缩(compression)。
2.4.2 自定义 FileInputFormat
当存储小文件时,Record 的 Key 可以用来保存文件名、Value 可以用来保存文件的内容。
我们读取文件时就可以依次进行 SeqenceFile 的读取。
自定义的类需要实现 FileInputFormat,并实现其中的抽象方法 RecordReader 。
继承时需要指定键值对的泛型,可以都用 Text。
代码如下:
public class WholeInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return null;
}
}
?? 注意: RecordReader 也是一个抽象类,如果想要自定格式化,那也就要自己实现该类,并实现其中的抽象方法。
2.4.3 自定义 RecordReader
定义一个类实现 RedcordReader 并实现的6个方法!
public class WholeRecordReader extends RecordReader {
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {return false;}
@Override
public Object getCurrentKey() throws IOException, InterruptedException {return null;}
@Override
public Object getCurrentValue() throws IOException, InterruptedException {return null;}
@Override
public float getProgress() throws IOException, InterruptedException {return 0;}
@Override
public void close() throws IOException {}
}
2.4.3.1 完善 WholeInputFormat 类
可以发现,上面定义的 WholeRecordReader 类中实现的 initialize 方法的参数与 WholeInputFormat 类中的 createRecordReader 的参数一致。确实,通过 initialize 方法,我们可以获取到 RecordReader 对象。
依次,可以先来完善 InterruptedException 方法:
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
WholeRecordReader reader = new WholeRecordReader();
reader.initialize(split, context);
return null;
}
2.4.3.2 实现 initialize
初始化方法中,获取文件系统对象和文件切片。
文件系统对象用于后期获取 FileStatus
文件切片中是文件当前切片的属性信息:路径、起始位置、长度、主机名、主机上的地址
FileSystem fileSystem = null;
FileSplit fileSplit = null;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
this.fileSystem = FileSystem.get(conf);
this.fileSplit = (FileSplit) split;
}
2.4.3.3 实现 nextKeyValue
nextKeyValue 方法将由 Mapper 的 map 方法调用,如果 nextKeyValue 方法一致返回 true,那么 map 方法将进入死循环,所以我需要提供一个 flag 标识,读取完后就返回 false ,证明无下一组 Key/Value
boolean flag = true;
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (this.flag) {
Path path = this.fileSplit.getPath();
FSDataInputStream inputStream = this.fileSystem.open(path);
FileStatus fileStatus = this.fileSystem.listStatus(path)[0];
long len = fileStatus.getLen();
byte[] buf = new byte[(int) len];
inputStream.read(buf);
inputStream.close();
this.k.set(path.getName());
this.v.set(new String(buf));
this.flag = false;
return true;
}
return false;
}
2.4.3.4 实现 getCurrentKey
getCurrentKey 方法返回当前 key
@Override
public Object getCurrentKey() throws IOException, InterruptedException {
return this.k;
}
2.4.3.5 实现 getCurrentValue
getCurrentValue方法返回当前 Value
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
return this.v;
}
2.4.3.6 实现 getProgress
getProgress 用于获取 0.0到1.0之间的一个数字,表示读取的数据的比例
@Override
public float getProgress() throws IOException, InterruptedException {
return this.flag ? 0.0F : 1.0F;
}
2.4.3.7 实现 close
@Override
public void close() throws IOException {
this.fileSystem.close();
}
2.4.4 合并小文件 MapReduce
我们只需要合并文件,也就是 Map 部分,所以可以忽略 Reduce 部分。
如下定义 Driver 类:
public class Job_WholeFilerMergeDriver {
public static class Job_WholeFilerMergeMapper
extends Mapper<Text, Text, Text, Text> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
System.out.println(key.toString());
System.out.println(value.toString().length()););
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Job_WholeFilerMergeDriver.class);
job.setMapperClass(Job_WholeFilerMergeMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
2.4.5 运行测试
准备包含四个小文件进行测试,可以注意到,上面 main 方法里,并没有传递实际的参数,如果运行,则会出现数组下标越界的异常。因此,实际运行前需要先传递参数。
如上向该类传递两个参数,用空格空开。
将日志输出级别提到最高(error ),日志配置文件如下:
log4j.rootLogger = error,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
📤 运行,输出到控制台如下:
修改 mapper 类中的 map 方法,将任务输出到文件。
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
2.4.6 读取 SeqenceFile
SeqenceFile 序列文件已经输出,并且是多个小文件合并之后的一个整体,所以我们需要尝试读取该文件,可以修改上文 FileInputFormat 提到的案例代码。修改 mapper 类,将其输入类型修改为 Text,并设置 InputFormatClass 为 SequenceFileInputFormat 。
public static class Job_IPCountMapper
extends Mapper<Text, Text, Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
String[] lines = value.toString().split("\n");
for (String line : lines) {
String ip = line.split(" ")[0];
k.set(ip);
context.write(k, v);
}
}
}
public static class Job_IPCountReducer
extends Reducer<Text, IntWritable, Text, LongWritable> {
...
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
...
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
...
}
运行上面程序,查看输出结果:
3. MapTask
3.1 MapTask 原理
3.1.1 MapTask 工作机制
0?? inputformat 阶段: inputFormat(默认 TextInputFormat)通过 getSplits 方法对输入目录中的文件进行逻辑切片,得到 block,但后分配对应数量的 MapTask。
🌟 MapTask
1?? read 阶段: MapTask 通过 inputFormat 获取 recordReader 对象(默认 LineRecordReader)进行读取,以 \n 最为分隔符,读取一行数据,返回 <key, value> 。
key 表示每行首字符的字节偏移量,value 表示这一行的内容
2?? map 阶段: 将解析出的 <key, value> 交给用户自己继承的 Mapper 类中,执行其中的 map 方法。
RecordReader 每读取一行,在这里调用一次!
3?? Collect 阶段: Mapper 的逻辑结束后,将每条数据通过 contect.write 进行收集数据,通过 OutputCollect.collect() 输出结果;在 collect() 方法中,会先对其进行分区(默认使用 HashPartitioner)处理,并写入一个环形缓冲区中。
collector.collect(key, value, partitioner.getPartition(key, value, numPartitions));
MapTask 提供的 Partitioner 接口,其作用是根据 key 或 value 及 reducer 的数量来决定当前输出数据应该交由 reducetask 处理,默认对 key hash 后以 reducer 数量取模(默认的取模方式是平均 reducer 的处理能力)
🌟 环形缓冲区
缓冲区的作用是批量收集 Mapper 结果,减少磁盘 I/O 的影响,key/value 对以及 partition 的结果都会被写入缓冲区(key、value 都会被序列化成字节数组)。
4?? Spill 阶段: 当溢写线程启动后,对着缓冲区80%空间内的 key 做依次本地排序(sort),并在必要时对数据进行合并、压缩等操作。
🌟 溢写的流程
1、利用快速排序算法对缓冲区内的数据进行排序,先按照分区编号排序,然后每个分区内按照key排序。
2、按照分区编号将每个分区中的数据写入任务工作目录下的临时文件 output/spillN.out (N表示当前溢写的次数)。如果有 Combiner ,则写入文件之前,对每个分区内的数据进行依次聚集操作。
3、将分区数据的元数据写到内存索引数据结构 SpillRecord 中。如果当前内存索引大小超过1MB,则将内存索引写到文件 output/spillN.out.index 中。
如果 Job 设置了 Combiner,那么就会将相同的 key 的 key/value 对的 value 加起来,以减少溢写到磁盘的数据量。
5?? Merge 阶段: 每次溢写会在磁盘上生成一个临时文件(写之前判断是否需有 Combiner),当数据处理结束后,会对所有的临时文件进行一次合并,确保最终只生成一个文件(output/file.out ),同时生成一个索引文件(output/file.out.index )以记录数据偏移量。
在进行合并的过程中,MapTask 以分区为单位进行合并,对于某个分区,采用多轮递归合并的方式,每轮合并(默认)10个文件,并将产生的我呢见重新加入待合并列表中,对文件排序后,重复以上过程,最终的到一个文件。
3.1.2 MapTask 执行原理图
3.2 环形缓冲区
环形缓冲区是一个数组,数组中存放着 key、value 的序列化数据以及相关的元数据,包括 partition、key的起始位置、value的起始位置、value的长度等。
缓冲区的默认大小是100MB,当缓冲区80%的空间被占用是,就会启动溢写(Spill),Mapper 的输出结果可以往剩下20%的空间继续写入数据。
缓冲区溢写的比例默认为 0.8 (spill.percent),当缓冲区的数据达到域值 buffer size * spill percent = 100MB * 0.8 = 80MB 时,溢写线程启动!
3.3 MapTask 并行度决定依据
一个 Job 的 Map 阶段并行度由 Job 的分片数决定。
每一个 Split 分片分配一个 MapTask 进行处理。
4.Shuffle
Shuffle: map 阶段完成后,数据需要传输到 reduce,这中间的(数据处理)过程可称为 shuffle(洗牌,发牌)。
shuffle的核心机制包括:数据分区、排序、分组、规约、合并等过程。
4.1 Shuffle 原理
4.1.1 Shuffle 工作机制
0?? input Split: 输入的数据分片后,每一个 split 都会由一个 Mapreduce 处理,最先由 map 进行处理。
🌟 Map 阶段的 Shuffle:
1?? Collect 阶段: 将MapTask的结果输出到默认大小为100M的环形缓冲区,保存的是key/value序列化数据,Partition分区信息等。
2?? Spill 阶段: 当内存中的数据量达到一定的阀值(80%)的时候,就会将数据写入本地磁盘。在将数据写入磁盘之前,线程首先根据数据最终要传递到的 Reduce 任务把数据划分成相应的分区(Partition),每个分区中对数据的 key 进行一次排序的操作,如果配置了combiner(预聚合),还会将有相同分区号和 key 的数据进行聚合。
3?? Merge 阶段: 把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。
溢出写文件归并完毕后,Map 任务将删除所有的临时溢出写文件,并告知 TaskTracker 任务已完成,只要其中一个 Map 任务完成,Reduce 任务就会开始复制它的输出(Copy 阶段)。
Map 任务的输出文件放置在运行 Map 任务的 TaskTracker 的本地磁盘上,它是运行 Reduce 任务的 TaskTracker 所需要的输入数据。
🌟 Reduce 阶段的 Shuffle:
4?? Copy 阶段:ReduceTask启动Fetcher线程到已经完成MapTask的节点上,根据自己的分区号,复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
5?? Merge 阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程(一个是内存到磁盘的合并,一个是磁盘到磁盘的合并),对本地的数据文件进行合并操作。
merge 有三种形式:
内存到内存:该形式不启用
内存到磁盘:该形式一直在运行
磁盘到磁盘:marge阶段,生成最终的文件
6?? Sort 阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask 阶段已经对数据进行了局部的排序,ReduceTask只需对Copy的数据进行归并排序。
最终文件默认位于磁盘中,当 Reduce 的输出文件已定,整个 Shuffle 阶段就结束了,但后进行 Reducer 的执行(从文件中取出每一个键值对,调用用户自定义的 reduce 方法)。
4.1.2 Shuffle 执行原理图
4.2 Partition 分区
当数据文件过大时,我们可以按照不同的条件将大文件拆分为小分进行处理,比如按照区域划分、按照日期划分等。
分区: MapReduce 在 Map 阶段结束后,会对数据按照 key 进行分区。
默认分区数据量为1,如果job中设置了 reduceTask 数量,则分区数量与 reduceTask 数量一致。
4.2.1 Partitioner
在 Driver 中,我们可以通过 job.setNumReduceTasks(Num) 来自定义分区数量,在理解之前就先试下。
对原本输出一个文件的案例的 Driver 修改其 ReduceTasks 数量:
job.setNumReduceTasks(3);
查看输出的结果,我们可以返现输出了三个文件。
4.2.1.1 HashPartitioner 源码
分区是由 HashPartitioner 类实现的,该类实现了 Partitioner 类,不妨先看下该类下的内容。
public interface Partitioner<K2, V2> extends JobConfigurable {
int getPartition(K2 key, V2 value, int numPartitions);
}
只有一个获取分区的方法,其传入的参数的 key/value 类型为 mapper 的输出类型。
通常默认使用的实现类是 HashPartitioner 。
找到其实现类,org/apache/hadoop/mapred/lib/HashPartitioner.java
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
默 认分区是根据 key 的 hashCode返回值取正数,并对 ReduceTasks 个数取模得到的。所以上面我们设置了3个 ReduceTasks ,所以启动了3个线程,最终生成了3文件,而且是平均分配。
虽然进行了分区,但是默认是将文件均分到不同的分区中,这就不符合实际应用,所以我们需要自定义分区的实现。
4.2.2 自定义 Partition
1?? Step1:自定义实现类
自定义一个类,继承 Partition ,并实现其中的 getPartition 方法。
public class IpHeadPartitioner extends Partitioner<Text , LongWritable> {
@Override
public int getPartition(Text text, LongWritable longWritable, int numPartitions) {
return 0;
}
}
2?? Step2:实现分区方法
需求:还是利用统计 IP 的案例,根据 IP 的首位数字进行分区
实现:根据首位数字分区,将分成 1~9 个分区,对 map 输出的 key 处理,并返回相应的分区号
说明:获取首位字符值,返回值为 char 类型,ASCII码中数字符号对应的十进制数自48(0)开始,所以只需减去 '0' 位对应的数字,就能得出 IP 首位的数字,但是由于分区是从0看是计数,所以将求得的值减1就是分区了。
补充方法:
@Override
public int getPartition(Text text, LongWritable longWritable, int numPartitions) {
int i = text.toString().charAt(0) - '0';
return i-1;
}
3?? Step3:修改 Driver
在 Driver 类的 main方法中,添加如下配置。
说明:一个 splitFIle 只对应1个 ReduceTasks,如果不修改,那对 1 取模就没有意义,所以还需要设置 ReduceTasks 为 9
job.setNumReduceTasks(9);
job.setPartitionerClass(IpHeadPartitioner.class);
📤 查看输出:
4.2.3 分区数量
分区数 = reduce数:理想状态
分区数 > reduce数:任务报错
分区数 < reduce数:生成多余空文件
4.3 Comparable 排序
MapTask 和 ReduceTask 均会对数据按照 key 的字典顺序进行(快速)排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
4.3.1 Task的排序
对于 MapTask ,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于 ReduceTask ,它从每个 MapTask 上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask 统一对内存和磁盘上的所有数据进行一次归并排序。
4.3.2 排序分类
部分排序: MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
全局排序: 最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
辅助排序: (GroupingComparator分组) ,在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
?次排序: 如果需要中间过程对key的分组规则和reduce前对key的分组规则不同,那么可以通过 JobConf.setOutputValueGroupingComparator(Class) 来指定一个Comparator。再加上 JobConf.setOutputKeyComparatorClass(Class) 可用于控制中间过程的key如何被分组,所以结合两者可以实现按值的二次排序。
4.3.3 自定义排序
4.3.3.1 WritableComparable
在 Hadoop 序列化 这篇中,对 bean 对象进行了序列化、排序。
通过自定义实体类,并实现 WritableComparable 接口重写 compareTo 方法,最终可以实现自定义排序。
public class WritableUser implements WritableComparable<WritableUser> {
@Override
public int compareTo(WritableUser o) {
if (this.age == o.age){
return -Integer.compare( this.money, o.money);
} else {
return -Integer.compare( this.age, o.age);
}
}
}
?? 注意:返回值前加负号标识降序排列,否则为升序!
4.3.2.2 窥见源码
可以探究下这个 compareTo 方法!
public int compareTo(T o);
compareTo 方法只能对两个相同数据类型的值比较,可用于比较 Byte、Long、Integer等。
该方法的返回值为int类型,所以返回可能是正数、负数或0,一般返回值为 1 -1 0
可以来看下 Integer 重写的 compareTo 方法:
public int compareTo(Integer anotherInteger) {
return compare(this.value, anotherInteger.value);
}
public static int compare(int x, int y) {
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}
4.4 Combiner 预聚合
Combiner: 是 MapReduce 提供的一个 Mapper 端的预聚合机制,是对每个 Mapper 产生的结果进行本地聚集。
?? 注意:如果没有改变计算结果,则可以使用预聚合优化
4.4.1 Reducer作为Combiner
在 Driver 中,可设置 CombinerClass 为自己的 Reducer
job.setCombinerClass(Job_IPCountReducer.class);
4.4.2 自定义 Combiner
自定义类,继承 Reduce ,自定义实现,也可以实现跟 Reduce 类似的操作,并修改 setCombinerClass 。
public static class Job_Combiner extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
job.setCombinerClass(Job_Combiner.class);
📤 查看日志输出:
未使用 Combiner | 使用 Combiner 后 |
---|
Combine input records=0 | Combine input records=50107 | Combine output records=0 | Combine output records=3271 |
🌈 如上,自定义的 Combiner 跟 Reduce 没有区别,所以如果将 Mapper 的输出类型都改为<Text, LongWritable> 那么 Reduce 也就可以代替 Combiner 了。
5. ReduceTask
5.1 ReduceTask 原理
5.1.1 ReduceTask 工作机制
1?? Copy 阶段: ReduceTask 从各个 MapTask 上远程拷贝数据,如果某片数据超过一定的域值,则写到磁盘上,否则就放在内存中。
2?? Marge阶段: 拷贝数据的同时,ReduceTask 启动两个后台线程对内存和磁盘上的文件进行合并,以防止文件过多。
3?? Sort 阶段: 为了将相同的数据聚集在一起,Hadoop 采用了基于排序的策略,由于 MapTask 已经实现了对分区中数据的局部排序,因此只需将所有数据进行一次归并排序。
4?? reduce 阶段: 在用户自己继承的 Reduce类中,通过执行 reduce方法将输入的数据按 key 进行汇聚,并写出。
5.1.2 ReduceTask 执行原理图
5.2 ReduceTask 并行度决定机制
ReduceTask 的并行度将影响 Job 执行的并发度和,执行效率。
ReduceTask 数量的设置:
job.setNumReduceTasks(X);
ReduceTask 的数量默认为1,对应最终输出文件的数量。如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜。
? 如果分区数不是1,然ReduceTask为1,是否执行分区过程
🌈 不执行分区过程,在MapTask源码中,执行分区前需要先判断 ReduceNum 的个数是否大于,不大于1则不分区!
6. 数据输出
数据输出的父类为 OutputFormat。
常用的 OutputFormat 的实现类:
-
TextOutputFormat:默认的输出格式,它把每条记录写为文本行,且 key/value 可以是任意类型 -
SequenceFileOutputFormat:将输出写为一个顺序文件,格式紧凑易于压缩,如果输出需要为后续 MapReduce 任务的输入,这便是一种很好的输出格式。 -
自定义输出:符合特定需求,比如输出数据到MySQL、HBase等存储框架中。
🌈 上文中多多少少提到了TextOutputFormat、SequenceFileOutputFormat,这里就不再赘述了,下面可以试着实现自定义的 InputFormat 实现类。
6.1 自定义输出
自定义 InputFormat 的是实现类的大致步骤为:
- 继承 FileInputFormat
- 重写抽象方法 recordWriter,具体是实现输出数据方法
write()
6.2 自定义输出实例
实例需求:过滤日志中的 IP,将 IP 归属地不同的数据输出到相应的文件中。
需求实现:
- 获取 IP 的归属地信息,提取到具体地址字段
- 过滤日志文件,将归属地不同的日志信息输出到对应的文件中
6.2.1 获取归属地信息
日志中的的字段中没有地址新信息,所以每个 IP 的归属地需要我们自己获取。
实现:通过免费的解析网站将 IP 解析出归属地信息
6.2.1.1 导入Maven依赖
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
6.2.1.2 归属地解析demo
public class IPGEOUtils {
private static final String URL = "http://ip.ws.126.net/ipquery?ip=";
public static void main(String[] args) throws IOException {
String ip = "51.222.253.17";
CloseableHttpClient client = HttpClients.createDefault();
HttpGet request = new HttpGet(URL + ip);
CloseableHttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
String s = EntityUtils.toString(entity);
System.out.println(s);
}
}
📤 测试demo,查看输出:
6.2.1.3 代码优化
public class IPGEOUtils {
private static final String URL = "http://ip.ws.126.net/ipquery?ip=";
private static CloseableHttpClient client = HttpClients.createDefault();
public static void main(String[] args) throws IOException {
String ip = "51.222.253.17";
System.out.println(getGEO(ip));
}
public static List<String> getGEO(String ip) throws IOException {
HttpGet request = new HttpGet(URL + ip);
if (client == null) {
client = HttpClients.createDefault();
}
CloseableHttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
String res = EntityUtils.toString(entity);
String[] split = res.split("\"");
String pro = split[1];
String city = split[3];
return Arrays.asList(pro, city);
}
}
📤 测试demo,查看输出:
6.2.3 自定义FileOutputFormat
实现步骤:
- 自定义一个类,继承
FileOutputFormat ,实现 getRecordWriter() 方法 - 自定义一个类,继承
RecordWriter ,实现6个方法。
这跟自定义实现 FileInputFormat 时一样,先继承父类,由于没有 RecoderReader,需要自己实现一个,当时重写了6个方法!
public static class CustomGEOOutputFormat extends FileOutputFormat<Text, Text> {
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
return null;
}
}
6.2.4 自定义 RecordWriter
在实现 RecordWriter 之前,我们需要先获取到文件系统对象用于开启流。
6.2.4.1 获取 FileSystem
public static class CustomGEORecordWriter extends RecordWriter<Text, Text> {
FileSystem fs = null;
public CustomGEORecordWriter(TaskAttemptContext context) {
Configuration conf = context.getConfiguration();
try {
this.fs = FileSystem.get(conf);
} catch (IOException e) {
}
}
}
6.2.4.2 实现 write 方法
public static class CustomGEORecordWriter extends RecordWriter<Text, Text> {
HashMap<String, FSDataOutputStream> streamHashMap = new HashMap<>();
@Override
public void write(Text key, Text value) throws IOException, InterruptedException {
String city = key.toString();
Path path = new Path("xxx/" + city + "地区的用户.txt");
if (!streamHashMap.containsKey(city)){
FSDataOutputStream outputStream = this.fs.create(path);
streamHashMap.put(city, outputStream);
}
String kv = key.toString() + "\t" + value.toString();
streamHashMap.get(city).write(kv.getBytes());
}
}
6.2.4.3 实现 close 方法
关闭 streamHashMap 中的所有的流。
public static class CustomGEORecordWriter extends RecordWriter<Text, Text> {
HashMap<String, FSDataOutputStream> streamHashMap;
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
for (FSDataOutputStream outputStream : streamHashMap.values()) {
outputStream.flush();
outputStream.close();
}
}
}
6.2.5 完善 CustomGEOOutputFormat
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
return new CustomGEORecordWriter(job);
}
6.2.6 编写 Mapper
public static class GEOMapper extends Mapper<LongWritable, Text, Text, Text> {
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
String ip = value.toString().split(" ")[0];
List<String> geo = IPGEOUtils.getGEO(ip);
String city = geo.get(1);
k.set(city + ":" + ip);
context.write(k, v);
}
}
🌟 reduce不仅有聚合的功能,还有去重的功能,如上只传一个key,并没有写vaue,这样到了reduce阶段就把重复的信息去除掉了!
6.2.7 编写 Reduce
public static class GEOReduce extends Reducer<Text, Text, Text, Text>{
Text k = new Text();
Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
String[] split = key.toString().split(":");
k.set(split[0]);
v.set(split[1]);
context.write(k, v);
}
}
6.2.8 编写 Driver
将 Dricver 写进 main 方法中,并通过 configuration 指定父路径
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("ip.geo.ouput.path", args[1]);
Job job = Job.getInstance(conf);
job.setMapperClass(GEOMapper.class);
job.setReducerClass(GEOReduce.class);
job.setJarByClass(Job_CustomOutputFormatDriver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(CustomGEOOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
修改刚才的 write 方法,通过文件系统获取conf,在取到设置的父路径。
Configuration conf = fs.getConf();
String parentPath = conf.get("ip.geo.ouput.path");
Path path = new Path(parentPath + "/" + city + "地区的用户.txt");
6.2.9 运行测试
传入参数,并运行。由于需要访问网络,所以如果数据量很大,运行时间就很慢,所以拿100条测试如下:
7. 写在最后
默认的数据输出、数据输出使用 TextInputFormat,如何我们需要自定义输入输出 Format,可以继承 FileOutputFormat 。
自定义输入需要实现 createRecordReader() 方法,并且需要继承 RecordReader 类 并实现其中的6个方法:initialize 、nextKeyValue 、getCurrentKey 、getCurrentValue 、getProgress 。
自定义输出需要实现 getRecordWriter 方法,并需要继承 RecordWriter 并实现其中的2个方法:write 、close ,如果需要获取文件系统,可以提供一个有参(context)构造。
Mapper、Reduce的 Task 原理才是重点!
?
???END???
|