IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Hive(Parquet)存储方式Task个数影响因素 -> 正文阅读

[大数据]Hive(Parquet)存储方式Task个数影响因素

01

写在前面的话

从源码角度探究(Parquet)存储方式,查询语句(不含Shuffle)Task个数影响因素,换句话说也就是Map的分区数。篇幅涉及大量源代码建议收藏后再看。

02

查看hive建表语句

CREATE TABLE `ods.log_info`( 
       log_id   string,
     log_type  string,
     original_msg string,
     create_time bigint,
     update_time bigint)
PARTITIONED BY (                         
  `d_date` string COMMENT '快照日',      
  `pid` string COMMENT '渠道号')       
ROW FORMAT SERDE                         
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  
STORED AS INPUTFORMAT                              
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'  
OUTPUTFORMAT                                       
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 


从建表语句中找到INPUTFORMAT?

org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

03

查看MapredParquetInputFormat类

protected MapredParquetInputFormat(final ParquetInputFormat<ArrayWritable> inputFormat) {
  this.realInput = inputFormat;
  vectorizedSelf = new VectorizedParquetInputFormat(inputFormat);
}

入参ParquetInputFormat<ArrayWritable> inputFormat 为构造方法入参,入口是这个类

04

查看ParquetInputFormat 类(getSplits 方法)

???????

public static boolean isTaskSideMetaData(Configuration configuration) {
        return configuration.getBoolean("parquet.task.side.metadata", Boolean.TRUE);
    }
public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
    Configuration configuration = ContextUtil.getConfiguration(jobContext);
    List<InputSplit> splits = new ArrayList();
    if (!isTaskSideMetaData(configuration)) { // configuration.getBoolean("parquet.task.side.metadata", Boolean.TRUE); 默认返回为True
        splits.addAll(this.getSplits(configuration, this.getFooters(jobContext)));
        return splits;
    } else {
        Iterator i$ = super.getSplits(jobContext).iterator(); // 调用父类的 getSplits 返回的是List<InputSplit> 这里取的是List<InputSplit> 的迭代器


        while(i$.hasNext()) {
            InputSplit split = (InputSplit)i$.next();
            Preconditions.checkArgument(split instanceof FileSplit, "Cannot wrap non-FileSplit: " + split);
            splits.add(ParquetInputSplit.from((FileSplit)split));
        }


        return splits;
    }
}

默认走下面的else分支,即调用super.getSplits方法

public class ParquetInputFormat<T> extends FileInputFormat<Void, T>

05

进入父类FileInputFormat查看(getSplits)

  • 获取切片最小?minSize

long?minSize?=?Math.max(getFormatMinSplitSize(),?getMinSplitSize(job));
  • ???????
protected long getFormatMinSplitSize() {
  return 1;
}
public static final String SPLIT_MINSIZE = 
  "mapreduce.input.fileinputformat.split.minsize"; 
public static long getMinSplitSize(JobContext job) {
  return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); 
}

由上面的公式可计算出long?minSize?=?Math.max(1,?1);?minSize?=1

  • 计算切片最大maxSize

??long?maxSize?=?getMaxSplitSize(job);?
  public static final String SPLIT_MAXSIZE = 
    "mapreduce.input.fileinputformat.split.maxsize";
  public static long getMaxSplitSize(JobContext context) {
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE);
  }

如配置了mapreduce.input.fileinputformat.split.maxsize?

参数值的默认值是HDFS集群的块大小,这里值是256

  • 找出该路径下的所有文件?listStatus(job)

这个地方会有一个文件名过滤,过滤掉

  • 循环获取该目录下的文件大小

  • 判断文件是否可切?isSplitable(job, path)、

protected boolean isSplitable(JobContext context, Path filename) {
  return true; // 默认可切
}
  • 计算切片大小

long splitSize = computeSplitSize(blockSize, minSize, maxSize);

computeSplitSize方法

protected long computeSplitSize(long blockSize, long minSize,
                                long maxSize) {
??return?Math.max(minSize,?Math.min(maxSize,?blockSize));?
}

方法入参:blockSize?集群块大小(这里是256M)

minSize?最小默认值为1?

maxSize?最大默认值为:

mapreduce.input.fileinputformat.split.maxsize?参数控制

故变成:Math.max(1, Math.min(256, 256))

返回值为:256

完整的getSplits方法如下:

???????

public List<InputSplit> getSplits(JobContext job) throws IOException {
  StopWatch sw = new StopWatch().start();
??//?计算最小值
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 
??// 计算最大值
  long maxSize = getMaxSplitSize(job); 


  // generate splits
  List<InputSplit> splits = new ArrayList<InputSplit>();
??//?获取所有的文件会过滤掉以.和_开头的文件
??List<FileStatus>?files?=?listStatus(job);?
???//循环该目录下的所有文件
  for (FileStatus file: files) {
????//?处理文件信息
    Path path = file.getPath();
    long length = file.getLen();
    if (length != 0) {
      BlockLocation[] blkLocations;
      if (file instanceof LocatedFileStatus) {
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();
      } else {
        FileSystem fs = path.getFileSystem(job.getConfiguration());
        blkLocations = fs.getFileBlockLocations(file, 0, length);
      }
??????//?判断文件是否可切?默认值 返回True
??????if?(isSplitable(job,?path))?{?
        long blockSize = file.getBlockSize();
 ????????//?计算切片大小 Math.max(minSize, Math.min(maxSize, blockSize));
????????long?splitSize?=?computeSplitSize(blockSize,?minSize,?maxSize);?
        long bytesRemaining = length;
??????// private static final double SPLIT_SLOP = 1.1;   // 10% slop
?????????//?当文件大小/?splitSize?>1.1时对改文件进行循环切片????
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {    
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                      blkLocations[blkIndex].getHosts(),
                      blkLocations[blkIndex].getCachedHosts()));
          bytesRemaining -= splitSize;
        }
????????//?剩下的都是一块
        if (bytesRemaining != 0) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                     blkLocations[blkIndex].getHosts(),
                     blkLocations[blkIndex].getCachedHosts()));
        }
      } else { // not splitable
        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                    blkLocations[0].getCachedHosts()));
      }
????}?else?{?//不可切d额时候直接整个加入
      //Create empty hosts array for zero length files
      splits.add(makeSplit(path, 0, length, new String[0]));
    }
  }
  // Save the number of input files for metrics/loadgen
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); // 设置输入的文件个数 
  //public static final String NUM_INPUT_FILES = "mapreduce.input.fileinputformat.numinputfiles";
 
  sw.stop();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()
        + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
  }
  return splits; // 返回所有的切片信息
}

06

核心代码

  • 计算最小值

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//由参数mapreduce.input.fileinputformat.split.minsize控制?
  • 计算最大值

long maxSize = getMaxSplitSize(job);
//由参数mapreduce.input.fileinputformat.split.maxsize 控制
  • 计算splitSize

Math.max(minSize, Math.min(maxSize, blockSize));
  • ?HDFS集群默认块大小?blockSize(这里是256M)

07

结论

  • 增大Map阶段的分区数

set mapreduce.input.fileinputformat.split.maxsize=128000000;
  • 减少Map计算的分区数

set mapreduce.input.fileinputformat.split.maxsize=512000000

08

测试

  • 测试SQL:

set?hive.support.quoted.identifiers=None;
set?spark.app.name="ods.log_info"
insert overwrite table ods.log_info_temp partition(d_date= '2021-02-21',pid)
select `(is_settled)?+.+`
from ods.log_info
where 1 > 0
  • 结果如下:

set?mapreduce.input.fileinputformat.split.maxsize=128000000;task个数为65

set mapreduce.input.fileinputformat.split.maxsize=256000000;task个数为65

set mapreduce.input.fileinputformat.split.maxsize=512000000;task个数为34

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-13 11:48:46  更:2022-05-13 11:49:21 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 5:56:58-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码