IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Hive优化方案 -> 正文阅读

[大数据]Hive优化方案

排序方式的选择

order by,sort by,distribute by和sort by一起使用,cluster by

1.order by

Hive中的order by跟传统的sql语言中的order by作用是一样的,会对查询的结果做一此全局排序
也就是说,在HQL中order by所有的数据都会到同一个reducer进行处理,不管有多少map,也不管文件中有多少的block,最后只会启动一个reducer,对于大量的数据这将会消耗很长的时间

2.sort by

Hive中指定了sort by,那么在每个reducer端都会做排序,也就是说保证了局部有序(每个reducer出来的数据是有序的,但是不能保证所有的数据都是有序的,除非只有一个reducer)
这么做的好处是:执行了局部排序之后可以为接下来的全局排序提高不少的效率
坏处是:可能造成数据的重叠和丢失。 由于MapReduce 是采用 Hash 的方式来组织数据的,所以当使用 Sort By 时,一个 reducer 的输出可能会覆盖另一个 reducer 的数据

3.distribute by 和 sort by 一起使用

distribute by就是为sort by而生的。它可以修正 Sort By 带来的负面作用,避免数据的覆盖和丢失。Distribute By 将保证具有相同的指定关键字的记录进入到同一个 reducer 进行处理,这样就可以避免 reducer 在输出数据时将不同 reducer 的记录放到同一个位置,从而造成数据的覆盖。
distribute by是控制map的输出在reducer是如何划分的,举个例子,我们有一张表store,mid是指这个store所属的商户,money是这个商户的盈利,name是这个store的名字
执行Hive语句:select mid,money,name from store distribute by mid sort by mid asc,money asc。我们所有mid相同的数据会被送到同一个reducer去处理,这就是因为指定了distribute by mid,这样就可以统计出每个商户中各个商店盈利的排序了,这个肯定是全局有序的,因为相同的商户会放到同一个reducer去处理)distribute by 必须要写在sort by之前

4.cluster by

cluster by 的功能就是distribute by 和sort by相结合,如下两个语句是等价的:
select mid,money,name from store cluster by mid
select mid,money,name from store distribute by mid sort by mid
如果要获得和3中语句一样的效果:
select mid,money,name from store cluster by mid sort by money
注意:被cluster by 指定的列只能是降序,不能指定asc和desc

Hive设定为严格模式

当Hive设定为严格模式(hive.mapred.mode=strict)时,不允许在HQL语句种出现笛卡尔积
上面所说的order by如果在严格模式下,就必须指定limit来限制输出条数
原因是:所有的数据都会在同一个reducer端进行,数据量大的话可能不能出结果,那么在严格模式下,就必须指定输出的条数

MapJoin

使用场景一般是一个大表和一个小表的join操作
大表join小表,合并的操作如果像传统的common join在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜,这个时候解决方案就是在Map端实现数据合并
Mapjoin,需要将join操作的一个或多个表完全读入内存。
例如:将其中一张小表缓存到内存当中,大表正常加载到MapTask,然后再MapTask完成两表的join
一来省去shuffle这个代价昂贵的阶段,二来不需要分发也就没有数据倾斜的问题

参数说明

 set hive.auto.convert.join = true
 自动将common join转换为map join解决了这个问题
hive.mapjoin.smalltable.filesize=30000000
如果原始小表的总大小大于该值,则任务将选择map join运行。
hive.mapjoin.localtask.max.memory.usage = 0.999;
本地任务的内存使用率阈值。若是客户端内存足够的时候,建议调大
hive.auto.convert.join.noconditionaltask
是否将多个mapjoin合并为一个。合并的好处在于每个mapjoin都会有一个mapper,对数据进行多次的读写,合并mapjoin以后只会有一个mapper,所以只需要读取一次会加快速度。
hive.auto.convert.join.noconditionaltask.size
    多个mapjoin转换为1个时,所有小表的文件大小总和的最大值。根据内存大小调整
hive.mapjoin.followby.gby.localtask.max.memory.usage
如果mapjoin后面紧跟着一个group by任务,这种情况下 本地任务的最大内存使用率,默认是0.55。一般不做改动
hive.mapjoin.check.memory.rows
localtask每处理完多少行,就执行内存检查,默认为100000,当localtask的内存使用超过阀值,任务会直接失败。一般不做改动

执行原理:
1.首先是在本地客户端生成的Task A,是一个MapReduce Local Task,负责把小表数据从HDFS读取到内存哈希表。读取后,它会将内存中的哈希表序列化为磁盘上的文件,并将哈希表文件压缩为tar文件
2.接下来是Task B,该任务是一个没有Reduce的MapReduce任务,启动的时候,上一步骤的tar文件会被放到Hadoop分布式缓存中,Hadoop分布式缓存将把tar文件填充到每个Mapper的本地磁盘并解压缩该文件。然后mapper可以将哈希表文件反序列化回内存,并像以前一样执行join工作,也就是根据大表中的每一条记录去和DistributeCache中小表对应的HashTable关联,并直接输出结果。

大表join小表解决方案二

在大表和小表做笛卡尔积时,规避笛卡尔积的 方法是,给 Join 添加一个 Join key
它的原理是:将小表扩充一列 join key,并将小表的条 目复制数倍,
join key 各不相同;将大表扩充一列 join key 为小表join key范围内的随机数。
小表复制几倍,最后就有几个 reduce 来做,而且大表的数据是前面小表扩充 key 值范围里面随机出来的,所以复制了几倍 n, 大表的数据就被随机的分为了 n 份。
并且最后处理所用的 reduce 数量也是 n,而且也不会 出现数据倾斜。

合理使用in、exists语句

虽然经过测验,hive1.2.1也支持in/exists操作,但还是推荐使用hive的一个高效替代方案:left semi join
例如:
select a.id,a.name from a where a.id in (select b.id from b);
select a.id,a.name from a where exists(select id from b where a.id = b.id);
应该转换成:
select a.id,a.name from a left semi join b on a.id = b.id;

设置合理MapTask数量

Map数过大
Map阶段输出文件太小,产生大量小文件
初始化和创建Map的开销很大
Map数过小
文件处理或查询并发度小,Job执行时间过长
大量作业时,容易堵塞集群

小文件合并

两种经典的控制MapTask的个数方案:减少MapTask数或者增加MapTask数
1.通过合并小文件来实现减少MapTask数,这一点主要是针对数据源

set hive.merge.mapfiles = true ##在 map only 的任务结束时合并小文件

set hive.merge.mapredfiles = false ## true 时在 MapReduce 的任务结束时合并小文件

set hive.merge.size.per.task = 256*1000*1000 ##合并文件的大小

set mapred.max.split.size=256000000; ##每个 Map 最大分割大小

set mapred.min.split.size.per.node=1; ##一个节点上 split 的最少值

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ##执行 Map 前进行小文件合并

控制reducetask个数

2.增加MapTask数可以通过控制上一个job 的redeceTask个数
Hadoop MapReduce 程序中,reducer 个数的设定极大影响执行效率,这使得 Hive 怎样决定 reducer 个数成为一个关键问题。遗憾的是 Hive 的估计机制很弱,不指定 reducer 个数的情况下,Hive 会猜测确定一个 reducer 个数,基于以下两个设定:

参数 1、hive.exec.reducers.bytes.per.reducer(默认为 1000^3=1G)
为了改变reduce的平均负载
参数 2、hive.exec.reducers.max(默认为 999)
为了限制reduce的最大数量

计算 reducer 数的公式:N=min(参数 2,总输入数据量/参数 1)
通常情况下,有必要手动指定 reducer 个数。考虑到 map 阶段的输出数据量通常会比输入有大幅减少,因此即使不设定 reducer 个数,重设参数 2 还是必要的。依据 Hadoop 的经验,可以将参数 2 设定为 0.95*(集群中 TaskTracker 个数)

也可以通过调整

set mapred.reduce.tasks = 15

来调整reduce个数

使用multi group by

在查询语句中使用multi group by特性连续group by2次数据,使用不同的group by key.这一特性可以减少一次MapReduce操作

例子:

select Provice,city,county,count(rainfall) from area where data="2018-09-02" group by provice,city,count

select Provice,count(rainfall) from area where data="2018-09-02" group by provice

 

#使用multi group by

from area

 insert overwrite table temp1

  select Provice,city,county,count(rainfall) from area where data="2018-09-02" group by provice,city,count

 insert overwrite table temp2

  select Provice,count(rainfall) from area where data="2018-09-02" group by provice

使用multi group by 之前必须配置hive-site.xml参数:

<property>

    <name>hive.multigroupby.singlemr</name>

    <value>true</value>

</property>

合理利用分区

分区是指按照数据表的某列或某些列分为多个区,区从形式上可以理解为文件夹,
比如监测传感器传回的每日数据存在同一张表上,由于每天会生成大量的数据,导致数据表的内容巨大,在查询时进行全表扫描耗费的资源非常多。那其实这个情况下,我们可以按照日期对数据表进行分区,不同日期的数据存放在不同的分区,在查询时只要指定分区字段的值就可以直接从该分区查找。
动态分区
需要先将动态分区设置打开
set hive.exec.dynamic.partition.mode=nonstrict
set hive.exec.dynamic.partition=true
insert overwrite 是覆盖
insert into是追加

insert overwrite table sopdm.yp2 partition(age) select id,name,tel,age from sopdm.wyp

静态分区与动态分区的主要区别在于静态分区是手动指定,而动态分区是通过数据来进行判断

合理利用分桶

分桶是相对分区进行更细粒度的划分。分桶将整个数据内容按照某列属性值的hash值进行区分,如要按照name属性分为3个桶,就是对name属性值的hash值对3取摸,按照取模结果对数据分桶。如取模结果为0的数据记录存放到一个文件,取模为1的数据存放到一个文件,取模为2的数据存放到一个文件。
分桶之前要执行命令hive.enforce.bucketiong=true

create table bucketed_user(id INT)name STRING)clustered by(id) into 4 buckets

将表或者分区组织成通有两个理由:
获得更高的查询处理效率
使取样更高效

建临时表

针对join的优化

优先过滤后在进行join操作,最大限度的减少参与join的数据量

在编写 Join 查询语句时,如果确定是由于 join 出现的数据倾斜,那么请做如下设置:

set hive.skewjoin.key=100000; // 这个是 join 的键对应的记录条数超过这个值则会进行 分拆,值根据具体数据量设置

set hive.optimize.skewjoin=true; // 如果是 join 过程出现倾斜应该设置为 true 

使用 Group By 有数据倾斜的时候进行负载均衡

set hive.groupby.skewindata = true

当 sql 语句使用 groupby 时数据出现倾斜时,如果该变量设置为 true,那么 Hive 会自动进行 负载均衡。策略就是把 MR 任务拆分成两个:第一个先做预汇总,第二个再做最终汇总

在 MR 的第一个阶段中,Map 的输出结果集合会缓存到 maptaks 中,每个 Reduce 做部分聚 合操作,并输出结果,这样处理的结果是相同 Group By Key 有可能被分发到不同的 Reduce 中, 从而达到负载均衡的目的;第二个阶段 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成 最终的聚合操作。

Map 端部分聚合

并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进 行部分聚合,最后在 Reduce 端得出最终结果。

MapReduce 的 combiner 组件参数包括:

set hive.map.aggr = true 是否在 Map 端进行聚合,默认为 True

set hive.groupby.mapaggr.checkinterval = 100000 在 Map 端进行聚合操作的条目数目

合理利用文件存储格式

创建表时,尽量使用 orc、parquet 这些列式存储格式,因为列式存储的表,每一列的数据在 物理上是存储在一起的,Hive 查询时会只遍历需要列数据,大大减少处理的数据量。

开启本地模式

当查询的数据量比较小时,其实 没有必要启动分布式模式去执行,因为以分布式方式执行就涉及到跨网络传输、多节点协调 等,并且消耗资源。这个时间可以只使用本地模式来执行 mapreduce job,只在一台机器上 执行,速度会很快。启动本地模式涉及到三个参数:
在这里插入图片描述
set hive.exec.mode.local.auto=true 是打开 hive 自动判断是否启动本地模式的开关,但是只 是打开这个参数并不能保证启动本地模式,要当 map 任务数不超过

hive.exec.mode.local.auto.input.files.max 的个数并且 map 输入文件大小不超过

hive.exec.mode.local.auto.inputbytes.max 所指定的大小时,才能启动本地模式。

并行化处理

一个 hive sql 语句可能会转为多个 mapreduce Job,每一个 job 就是一个 stage,这些 job 顺序 执行,这个在 cli 的运行日志中也可以看到。但是有时候这些任务之间并不是是相互依赖的, 如果集群资源允许的话,可以让多个并不相互依赖 stage 并发执行,这样就节约了时间,提 高了执行速度,但是如果集群资源匮乏时,启用并行化反倒是会导致各个 job 相互抢占资源 而导致整体执行性能的下降。启用并行化

set hive.exec.parallel=true;

set hive.exec.parallel.thread.number=8; //同一个 sql 允许并行任务的最大线程数

设置压缩格式

Hive 最终是转为 MapReduce 程序来执行的,而 MapReduce 的性能瓶颈在于网络 IO 和 磁盘 IO,要解决性能瓶颈,最主要的是减少数据量,对数据进行压缩是个好的方式。压缩 虽然是减少了数据量,但是压缩过程要消耗 CPU 的,但是在 Hadoop 中, 往往性能瓶颈不 在于 CPU,CPU 压力并不大,所以压缩充分利用了比较空闲的 CPU

JVM重用

因为Hive语句最终要转换为一系列的Mapreduce Job的,而每一个MapReduce Job是由一系列的MapTask和ReduceTask组成的,默认情况下,
MapReduce中一个MapTask或者一个ReduceTask就会启动一个JVM进程,一个Task执行完毕后,JVM进程就退出。这样如果任务花费时间很短,
又要多次启动JVM的情况下,JVM的启动时间会变成一个比较大的消耗,这个时候就可以通过重用JVM来解决:
set mapred.job.reuse.jvm.num.tasks=5

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-03 16:21:05  更:2022-03-03 16:25:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 21:00:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码