01
写在前面的话
从源码角度探究(Parquet)存储方式,查询语句(不含Shuffle)Task个数影响因素,换句话说也就是Map的分区数。篇幅涉及大量源代码建议收藏后再看。
02
查看hive建表语句
CREATE TABLE `ods.log_info`(
log_id string,
log_type string,
original_msg string,
create_time bigint,
update_time bigint)
PARTITIONED BY (
`d_date` string COMMENT '快照日',
`pid` string COMMENT '渠道号')
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
从建表语句中找到INPUTFORMAT?
org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
03
查看MapredParquetInputFormat类
protected MapredParquetInputFormat(final ParquetInputFormat<ArrayWritable> inputFormat) {
this.realInput = inputFormat;
vectorizedSelf = new VectorizedParquetInputFormat(inputFormat);
}
入参ParquetInputFormat<ArrayWritable> inputFormat 为构造方法入参,入口是这个类
04
查看ParquetInputFormat 类(getSplits 方法)
???????
public static boolean isTaskSideMetaData(Configuration configuration) {
return configuration.getBoolean("parquet.task.side.metadata", Boolean.TRUE);
}
public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
Configuration configuration = ContextUtil.getConfiguration(jobContext);
List<InputSplit> splits = new ArrayList();
if (!isTaskSideMetaData(configuration)) { // configuration.getBoolean("parquet.task.side.metadata", Boolean.TRUE); 默认返回为True
splits.addAll(this.getSplits(configuration, this.getFooters(jobContext)));
return splits;
} else {
Iterator i$ = super.getSplits(jobContext).iterator(); // 调用父类的 getSplits 返回的是List<InputSplit> 这里取的是List<InputSplit> 的迭代器
while(i$.hasNext()) {
InputSplit split = (InputSplit)i$.next();
Preconditions.checkArgument(split instanceof FileSplit, "Cannot wrap non-FileSplit: " + split);
splits.add(ParquetInputSplit.from((FileSplit)split));
}
return splits;
}
}
默认走下面的else分支,即调用super.getSplits方法
public class ParquetInputFormat<T> extends FileInputFormat<Void, T>
05
进入父类FileInputFormat查看(getSplits)
long?minSize?=?Math.max(getFormatMinSplitSize(),?getMinSplitSize(job));
protected long getFormatMinSplitSize() {
return 1;
}
public static final String SPLIT_MINSIZE =
"mapreduce.input.fileinputformat.split.minsize";
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
由上面的公式可计算出long?minSize?=?Math.max(1,?1);?minSize?=1
??long?maxSize?=?getMaxSplitSize(job);?
public static final String SPLIT_MAXSIZE =
"mapreduce.input.fileinputformat.split.maxsize";
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}
如配置了mapreduce.input.fileinputformat.split.maxsize?
参数值的默认值是HDFS集群的块大小,这里值是256
这个地方会有一个文件名过滤,过滤掉
protected boolean isSplitable(JobContext context, Path filename) {
return true; // 默认可切
}
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
computeSplitSize方法
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
??return?Math.max(minSize,?Math.min(maxSize,?blockSize));?
}
方法入参:blockSize?集群块大小(这里是256M)
minSize?最小默认值为1?
maxSize?最大默认值为:
mapreduce.input.fileinputformat.split.maxsize?参数控制
故变成:Math.max(1, Math.min(256, 256))
返回值为:256
完整的getSplits方法如下:
???????
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
??//?计算最小值
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
??// 计算最大值
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
??//?获取所有的文件会过滤掉以.和_开头的文件
??List<FileStatus>?files?=?listStatus(job);?
???//循环该目录下的所有文件
for (FileStatus file: files) {
????//?处理文件信息
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
??????//?判断文件是否可切?默认值 返回True
??????if?(isSplitable(job,?path))?{?
long blockSize = file.getBlockSize();
????????//?计算切片大小 Math.max(minSize, Math.min(maxSize, blockSize));
????????long?splitSize?=?computeSplitSize(blockSize,?minSize,?maxSize);?
long bytesRemaining = length;
??????// private static final double SPLIT_SLOP = 1.1; // 10% slop
?????????//?当文件大小/?splitSize?>1.1时对改文件进行循环切片????
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
????????//?剩下的都是一块
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
????}?else?{?//不可切d额时候直接整个加入
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); // 设置输入的文件个数
//public static final String NUM_INPUT_FILES = "mapreduce.input.fileinputformat.numinputfiles";
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits; // 返回所有的切片信息
}
06
核心代码
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//由参数mapreduce.input.fileinputformat.split.minsize控制?
long maxSize = getMaxSplitSize(job);
//由参数mapreduce.input.fileinputformat.split.maxsize 控制
Math.max(minSize, Math.min(maxSize, blockSize));
07
结论
set mapreduce.input.fileinputformat.split.maxsize=128000000;
set mapreduce.input.fileinputformat.split.maxsize=512000000
08
测试
set?hive.support.quoted.identifiers=None;
set?spark.app.name="ods.log_info"
insert overwrite table ods.log_info_temp partition(d_date= '2021-02-21',pid)
select `(is_settled)?+.+`
from ods.log_info
where 1 > 0
set?mapreduce.input.fileinputformat.split.maxsize=128000000;task个数为65
set mapreduce.input.fileinputformat.split.maxsize=256000000;task个数为65
set mapreduce.input.fileinputformat.split.maxsize=512000000;task个数为34
|