2021SC@SDUSC
研究内容简略介绍
上周我们将对InputFormat的几个重要子类,如FileInputFormat等展开了详细的分析,在了解了InputFormat的源码并分析其的功能的基础上,进一步分析了输入类FileInputFormat(切片)及其4个实现类(kv)的用法。本次我们将继续对核心源码的分析,首先从org.apache.hadoop.mapreduce.InputSplit 开始。
org.apache.hadoop.mapreduce.InputSplit源码分析
首先附上文件的源代码:
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class InputSplit {
public abstract long getLength() throws IOException, InterruptedException;
public abstract
String[] getLocations() throws IOException, InterruptedException;
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return null;
}
}
InputSplilt的主要作用是什么呢?我们可以查看官方给出的api。 大概意思是InputSplit意为输入分片,表示要由个人处理的数据Mapper,通常情况下,它在输入上呈现面向字节的视图,并且RecordReader作业负责处理此内容并呈现面向记录的视图。
进一步来看,在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。
其工作原理如下: Hadoop 2.x默认的block大小是128MB,Hadoop 1.x默认的block大小是64MB,可以在hdfs-site.xml中设置dfs.block.size,注意单位是byte。
分片大小范围可以在mapred-site.xml中设置,mapred.min.split.size mapred.max.split.size,minSplitSize大小默认为1B,maxSplitSize大小默认为Long.MAX_VALUE = 9223372036854775807。
那么分片到底是多大呢?
minSize=max{minSplitSize,mapred.min.split.size}
maxSize=mapred.max.split.size
splitSize=max{minSize,min{maxSize,blockSize}}
我们再来看一下源码: 所以在我们没有设置分片的范围的时候,分片大小是由block块大小决定的,和它的大小一样。比如把一个258MB的文件上传到HDFS上,假设block块大小是128MB,那么它就会被分成三个block块,与之对应产生三个split,所以最终会产生三个map task。我又发现了另一个问题,第三个block块里存的文件大小只有2MB,而它的block块大小是128MB,那它实际占用Linux file system的多大空间? 答案是实际的文件大小,而非一个块的大小。
在网上查询到已经有人验证这个答案了:http://blog.csdn.net/samhacker/article/details/23089157
hadooop提供了一个设置map个数的参数mapred.map.tasks,我们可以通过这个参数来控制map的个数。但是通过这种方式设置map的个数,并不是每次都有效的。原因是mapred.map.tasks只是一个hadoop的参考数值,最终map的个数,还取决于其他的因素。
为了方便介绍,先来看几个名词: block_size : hdfs的文件块大小,默认为64M,可以通过参数dfs.block.size设置 total_size : 输入文件整体的大小 input_file_num : 输入文件的个数
(1)默认map个数 如果不进行任何设置,默认的map个数是和blcok_size相关的。 default_num = total_size / block_size;
(2)期望大小 可以通过参数 mapred.map.tasks来设置程序员期望的map个数,但是这个个数只有在大于default_num的时候,才会生效。 goal_num = mapred.map.tasks;
(3)设置处理的文件大小 可以通过mapred.min.split.size 设置每个task处理的文件大小,但是这个大小只有在大于 block_size的时候才会生效。 split_size = max( mapred.min.split.size, block_size ); split_num = total_size / split_size;
(4)计算的map个数 compute_map_num = min(split_num, max(default_num, goal_num))
除了这些配置以外,mapreduce还要遵循一些原则。 mapreduce的每一个map处理的数据是不能跨越文件的,也就是说min_map_num >= input_file_num。 所以,最终的map个数应该为: final_map_num = max(compute_map_num, input_file_num)
经过以上的分析,在设置map个数的时候,可以简单的总结为以下几点: (1)如果想增加map个数,则设置mapred.map.tasks 为一个较大的值。 (2)如果想减小map个数,则设置mapred.min.split.size 为一个较大的值。 (3)如果输入中有很多小文件,依然想减少map个数,则需要将小文件merger为大文件,然后使用准则2。
接下来我们继续对源码进行分析。
org.apache.hadoop.mapreduce.Mapper源码分析
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
protected void setup(Context context
) throws IOException, InterruptedException {
}
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
protected void cleanup(Context context
) throws IOException, InterruptedException {
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
从源码中我们可以看出,Mapper类里总共包含四个方法,一个抽象类。
1.setup()方法—一般作为map()方法的准备工作,如进行相关配置文件的读取、参数的传递;
2.cleanup()方法—是用来做一些收尾工作,如关闭文件、Key-Value的分发;
3.map()方法—是真正的程序逻辑部分,如对一行文本的split、filter处理之后,将数据以key-Value的形式写入context;
4.run()方法—是驱动整个Mapper执行的一个方法,按照run()>>setup()>>map()>>cleanup()顺序执行;
5.Context抽象类—是Mapper里的一个内部抽象类,主要是为了在Map任务或者Reduce任务中跟踪task的相关状态和数据的存放。如Context可以存储一些jobConf有关的信息,在setup()方法中,就可以用context读取相关的配置信息,以及作为key-Value数据的载体。
同时,mapper有许多的子类。 如图为Mapper以及它的一些子类的类图(Mapper一共有九个子类。我们挑了其中的4个子类来做分析)。
子类InverseMapper
package org.apache.hadoop.mapreduce.lib.map;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Mapper;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class InverseMapper<K, V> extends Mapper<K,V,V,K> {
@Override
public void map(K key, V value, Context context
) throws IOException, InterruptedException {
context.write(value, key);
}
}
这个类很简单,只是将Key-Value进行反转,然后直接分发,如面包-生产商,我们既可以统计某一种面包来自多少个生产商,也可以统计一个生产商生产多少种面包。不同的需求,利用InverseMapper可以达到不同的效果。
子类TokenCounterMapper
package org.apache.hadoop.mapreduce.lib.map;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
这个类使用StringTokenizer来获取value中的tokens(在StringTokenizer的构造函数中将value按照“\t\n\t\f”进行分割),然后对每一个token,分发出一个<token,one>对,这将在Reduce端被收集,同一个token对应的Key-Value对都会被收集到同一个Reducer上,计算出所有Mapper分发出来的以某个token为key的<token,one>的数量,然后加起来,就得到了token的计数。在我们学习的wordcount程序中,其实只需要在main方法中将job.setMapperClass(TokenCounterMapper.class)进行设置,便可以统计单词的个数。
子类RegexMapper
package org.apache.hadoop.mapreduce.lib.map;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
这个类其实就是将wordcount进行了正则化,匹配自己需要格式的word进行统计。
子类MultithreadedMapper
package org.apache.hadoop.mapreduce.lib.map;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MultithreadedMapper<K1, V1, K2, V2>
extends Mapper<K1, V1, K2, V2> {
private static final Logger LOG =
LoggerFactory.getLogger(MultithreadedMapper.class);
public static String NUM_THREADS = "mapreduce.mapper.multithreadedmapper.threads";
public static String MAP_CLASS = "mapreduce.mapper.multithreadedmapper.mapclass";
private Class<? extends Mapper<K1,V1,K2,V2>> mapClass;
private Context outer;
private List<MapRunner> runners;
public static int getNumberOfThreads(JobContext job) {
return job.getConfiguration().getInt(NUM_THREADS, 10);
}
public static void setNumberOfThreads(Job job, int threads) {
job.getConfiguration().setInt(NUM_THREADS, threads);
}
@SuppressWarnings("unchecked")
public static <K1,V1,K2,V2>
Class<Mapper<K1,V1,K2,V2>> getMapperClass(JobContext job) {
return (Class<Mapper<K1,V1,K2,V2>>)
job.getConfiguration().getClass(MAP_CLASS, Mapper.class);
}
public static <K1,V1,K2,V2>
void setMapperClass(Job job,
Class<? extends Mapper<K1,V1,K2,V2>> cls) {
if (MultithreadedMapper.class.isAssignableFrom(cls)) {
throw new IllegalArgumentException("Can't have recursive " +
"MultithreadedMapper instances.");
}
job.getConfiguration().setClass(MAP_CLASS, cls, Mapper.class);
}
@Override
public void run(Context context) throws IOException, InterruptedException {
outer = context;
int numberOfThreads = getNumberOfThreads(context);
mapClass = getMapperClass(context);
if (LOG.isDebugEnabled()) {
LOG.debug("Configuring multithread runner to use " + numberOfThreads +
" threads");
}
runners = new ArrayList<MapRunner>(numberOfThreads);
for(int i=0; i < numberOfThreads; ++i) {
MapRunner thread = new MapRunner(context);
thread.start();
runners.add(i, thread);
}
for(int i=0; i < numberOfThreads; ++i) {
MapRunner thread = runners.get(i);
thread.join();
Throwable th = thread.throwable;
if (th != null) {
if (th instanceof IOException) {
throw (IOException) th;
} else if (th instanceof InterruptedException) {
throw (InterruptedException) th;
} else {
throw new RuntimeException(th);
}
}
}
}
private class SubMapRecordReader extends RecordReader<K1,V1> {
private K1 key;
private V1 value;
private Configuration conf;
@Override
public void close() throws IOException {
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void initialize(InputSplit split,
TaskAttemptContext context
) throws IOException, InterruptedException {
conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
synchronized (outer) {
if (!outer.nextKeyValue()) {
return false;
}
key = ReflectionUtils.copy(outer.getConfiguration(),
outer.getCurrentKey(), key);
value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
return true;
}
}
public K1 getCurrentKey() {
return key;
}
@Override
public V1 getCurrentValue() {
return value;
}
}
private class SubMapRecordWriter extends RecordWriter<K2,V2> {
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
}
@Override
public void write(K2 key, V2 value) throws IOException,
InterruptedException {
synchronized (outer) {
outer.write(key, value);
}
}
}
private class SubMapStatusReporter extends StatusReporter {
@Override
public Counter getCounter(Enum<?> name) {
return outer.getCounter(name);
}
@Override
public Counter getCounter(String group, String name) {
return outer.getCounter(group, name);
}
@Override
public void progress() {
outer.progress();
}
@Override
public void setStatus(String status) {
outer.setStatus(status);
}
@Override
public float getProgress() {
return outer.getProgress();
}
}
private class MapRunner extends Thread {
private Mapper<K1,V1,K2,V2> mapper;
private Context subcontext;
private Throwable throwable;
private RecordReader<K1,V1> reader = new SubMapRecordReader();
MapRunner(Context context) throws IOException, InterruptedException {
mapper = ReflectionUtils.newInstance(mapClass,
context.getConfiguration());
MapContext<K1, V1, K2, V2> mapContext =
new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(),
outer.getTaskAttemptID(),
reader,
new SubMapRecordWriter(),
context.getOutputCommitter(),
new SubMapStatusReporter(),
outer.getInputSplit());
subcontext = new WrappedMapper<K1, V1, K2, V2>().getMapContext(mapContext);
reader.initialize(context.getInputSplit(), context);
}
@Override
public void run() {
try {
mapper.run(subcontext);
reader.close();
} catch (Throwable ie) {
throwable = ie;
}
}
}
}
这个类使用多线程来执行Mapper(它由mapreduce.mapper.multithreadedmapper.mapclass设置)。该类重写了run()方法,首先设置运行上下文context和workMapper,然后启动多个MapRunner(内部类)子线程(由mapred.map.multithreadedrunner.threads 设置 ),最后使用join()等待子线程执行完毕。
MapRunner(内部类)继承了Thread,拥有独立的Context去执行Mapper,并进行异常处理。从MapRunner的Constructor中我们看见,它使用了独立的SubMapRecordReader、SubMapRecordWriter和SubMapStatusReporter。SubMapRecordReader在读Key-Value和SubMapRecordWriter在写Key-Value的时候都要同步。这是通过互斥访问MultithreadedMapper的上下文outer来实现的。
MultithreadedMapper可以充分利用CPU,采用多个线程处理后,一个线程可以同时在另一个线程执行的时候读取数据并执行,这样就使用了更多的CPU周期来执行任务,从而提高吞吐率(注意读写操作都是线程安全的)。但对于IO密集型的作业,采用MultithreadedMapper会适得其反,因为会有多个线程等待IO,IO便成为限制吞吐率的关键,这时候我们可以通过增多task数量的方法来解决,因为这样在IO上就是并行的。
总结
本次我们首先对输入分片InputSplit进行了分析,对其分片大小进行了进一步的探究。随后又展开了对与mapreduce中的核心类mapper的分析,在了解其作用的基础上,又对其几个重要的子类做了源码分析,为之后的研究打下了基础。
|