Hive本质
Hive的本质其实就是 Hdfs+MapReduce,HDFS存储、MR执行任务
发生数据倾斜的原因
1.数据本身倾斜
内容倾斜、group by倾斜、小表Join大表
2.过多
Join过多导致Job过多、小文件过多、Mapper或Reducer过多
3.SQL语句使用问题
count(distinct)->全盘扫描
join ... where 笛卡尔乘积
关于数据倾斜见我的博客:
大数据常见问题:数据倾斜的原理及处理方案_AriesLY0411的博客-CSDN博客
解决方案
模型设计
整体最优,考虑全局
合理减少表数量
sqoop:query "select ... join ..."
#数据清洗
ods -> dwd insert into ... select ... join ...
选择合适的数据建模
星型(优先考虑)、雪花、星座
维度表(静态数据)、事实表(动态数据:谁在什么时间做了什么事情)
维度退化(降维)=> 星型
充分了解业务,提前设计好预聚合
分层 => 轻量聚合
基于主题,时间维度表、用户画像、产品三级分类...?
分区 => 避免交换
拉链表
压缩 => 减少体量
配置压缩,工作场景优先LZO
热点数据:内置自动优化
join:非大小表
#默认Join键对应的记录数超过该值则进行倾斜分析
set hive.skewjoin.key=100000;
#默认false,如果Join键倾斜则设为true
set hive.optimize.skewjoin=true
#默认10000,倾斜处理mapper数量上限
set hive.skewjoin.mapjoin.map.tasks=10000;
#默认32M,倾斜最小切片大小
set hive.skewjoin.mapjoin.min.split=32M
map join : 大小表
#默认true
set hive.auto.convert.join=true
#默认小表<=25M (根据经验,设置为64M更好,二分之一Block块)
set hive.mapjoin.smalltable.filesize=25M
#默认false,分桶表表mapjoin专用
set hive.optimize.bucketmapjoin=true
combiner
set hive.map.aggr=true; #默认true
group by:HashPartitioner
#倾斜的倍数 默认-1 n = skew_data_avg/other_data_avg
#倾斜数据均值 / 非倾斜数据均值
set mapred.reduce.tasks=n;
#默认false
set hive.groupby.skewindata=true;
#a.hashcode()%3 = partitionNo
select * from A group by a;
sql manunal
抽样统计,是否倾斜,倾斜程度:20%的键占用了80%的数据
倾斜键加分隔前缀加盐(加盐=加序号),分组统计,去盐后再分组统计
Map或Reduce输出过多小文件合并
若满足以下设置条件,任务结束后会单起MR对输出文件进行合并
#默认为true,map-only输出是否合并
set hive.merge.mapfiles=true;
#默认为false,mapreduce输出是否合并
set hive.merge.mapredfiles=true;
#默认256m,合并文件上限
set hive.merge.size.per.task=256M;
#默认16M,文件小于该阈值合并
set hive.merge.smallfiles.avgsize=16M;
控制Mapper数量
mapper的启动和初始化开销较大,【数量过多】导致开销大于逻辑处理,浪费资源
#默认的Mapper数量
int default_num = total_file_size / dfs.block.size;
#默认为2,只有大于default_num时才会生效
set mapred.map.tasks=2;
Mapper数量有限制
Math.max(min.split.size,Math.min(dfs.block.size,max.split.size))
#默认128M
set dfs.block.size=128M;
#默认单个Mapper处理数据上限256M
set mapred.max.split.size=256M;
#默认1个字节
set mapred.min.split.size=1;
#默认[单个节点]处理的数据下限1字节
set mapred.min.split.size.per.node=1;
#默认[单个机架]处理的数据下限1字节
set mapred.min.split.size.per.rack=1;
Mapper输入多个小文件合并后再切片
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
控制Reducer数量
主动设置reduce数量
set mapred.reduce.tasks; #默认 -1
若未设置 reduce数量,自动计算 Reducer数量
#计算方式 SIZE / bytes.per.reducer 与 max之间取最小值
Math.min(SIZE/bytes.per.reducer,max)
#默认每个reducer的数据为256M
set hive.exec.reducers.bytes.per.reducer=256;
#默认单个任务最大reducer数量 1009
#设置时,需要考虑实际情况
#reducer也要启容器,一个容器=一个JVM虚拟机
#设置Mapper和Reducer数量需要综合考虑,均衡利用资源
set hive.exec.reducers.max=1009;
Reducer只能为1的情况
没有group by,直接使用sum,count,max,min,avg,collect_list,concat_ws等聚合函数
作用域全局,只有一个Reducer,性能底
解决方案:先以其中某个键分组聚合(某个数据均匀的列),再聚合
#优化方案
select sum(sum_a) from (select sum(a) from A group by TEMP)T
使用了order by优化方案
#group by 同键分组且必须聚合,distribute by 仅按键分散数据
#order by 全局排序仅1个reducer, sort by 仅分区内有序
set mapred.reduce.tasks=3;
select * from (select * from A distribute by a sort by a)order by a;
存在笛卡尔积,尽量不用
#若出现小表+大表的笛卡尔积
小表扩展join key,并根据需求复制n份
大表扩展join key,根据小表join key随机生成
关闭自动mapjoin:set hive.auto.convert.join=false
设置reducer的数量为:set mapred.reduce.tasks=n
减小数据规模
调整存储格式
create 建表的默认格式和分区表
#默认TextFile,可选:orc,RCFile,SequenceFile
set hive.default.fileformat=orc;
提升IO性能,但会增加cpu压力
#默认false,任务输出是否压缩
set hive.exec.comperss.output=true;
#默认false,任务过程输出是否压缩
set hive.exec.compress.intermediate=true;
动态分区
#默认开启
set hive.exec.dynamic.partition=true;
#默认strict
set hive.exec.dynamic.partition.mode=nonstrict
#默认最大动态分区数1000
set hive.exec.max.dynamic.partitions=1000;
#默认单节点最大动态分区数100
set hive.exec.max.dynamic.partitions.pernode=100;
count(distinct)
不妥:
select count(distinct b) from TABLE group by a
稳妥:
select count(b) from (select a,b from TABLE group by a,b)group by a
筛选条件放在子查询或on之后,开启谓词下推(见以下表格)
谓词下推:条件尽量放在子查询中
在不影响结果情况下,条件再map端执行减少输出
条件尽量放在on或子查询中
set hive.optimize.pdd=true; #默认为true
whether push | inner join | left join | right join | full join | left | right | left | right | left | right | left | right | on | yes | yes | no | yes | yes | no | no | no | where | yes | yes | yes | no | no | yes | no | no |
列裁剪
查询时不写 select * 而是具体字段
set hive.cbo.enable=true; 默认true
分区裁剪
把经常做条件的字段作为分区
以on、where多条件字段顺序,建【多重】分区表
默认开启支持,以分区字段为条件筛选数据
tez 引擎:动态分区裁剪支持
set hive.execution.engine=tez;
#默认mr,tez|spark DAG
并行执行无依赖job
#默认false(关闭)
set hive.exec.parallel=true;
#默认 8 ,最大并行任务数
set hive.exec.parallel.thread.number=8;
JVM重用
依赖 hadoop mapred-site.xml 以下配置
mapreduce.job.jvm.numtasks = 8; #每个JVM运行的任务数
本地化运算
#默认1,启动本地化模式reducer数量必须为 0|1
set mapred.reduce.tasks=0/1;
#默认4,本地化文件数量上限
set hive.exec.mode.local.auto.input.files.max=4;
#默认128M,本地化文件大小上限
set hive.exec.mode.local.auto.inputbytes.max=128M;
#默认false,hive决定是否启用本地化模式
set hive.exec.mode.local.auto=true;
可能会导致内存溢出 java.lang.OutofMemoryError : java heap space
解决方案:
修改 mv hive-env.sh.template - > hive-env.sh
去掉注释
#export HADOOP_HEAPSIZE=1024
llap
llap为 DataNode常驻进程,混合模型,小型任务可以由llap解决,大任务由yarn容器执行
set hive.execution.mode=llap;
默认container,2.0版本之后扩展此项,llap可选
fetch
set hive.fetch.task.conversion=more;
默认more,简单查询不走mr,直接提取
explain select ... 查看执行计划
|