以最详细,生产实例的方式展现hive大量小文件的调优过程,包括:
开启输出结果小文件合并: rcfile和orc需要特殊设置才能进行文级别的合并,parquet、textfile、sequencefile默认进行文件级别的合并
set hive.merge.mapfiles = true;
set hive.merge.mapredfiles = true;
set hive.merge.size.per.task = 256000000;
set hive.merge.smallfiles.avgsize=256000000;
set hive.merge.orcfile.stripe.level=false;
set hive.merge.rcfile.block.level=false;
以表中字段分组
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=200000000;
set hive.merge.mapfiles = true;
set hive.merge.mapredfiles = false;
set hive.merge.size.per.task = 256000000;
set hive.merge.smallfiles.avgsize = 256000000;
set hive.merge.orcfile.stripe.level=false;
set hive.merge.rcfile.block.level=false;
set hive.default.fileformat = orc;
drop table if exists tmp.tmp_01;
create table tmp.tmp_01
as
SELECT
a.item_sku_id
FROM (
SELECT
item_sku_id,sku_name,brandname_cn,data_type
FROM big_table
WHERE dt = '2022-02-15' AND brandname_cn = '汾酒'
distribute by item_sku_id
) a
LEFT OUTER JOIN
( SELECT item_sku_id
FROM small_table
WHERE version = '1' AND normal_brandname_en = 'fenjiu' AND dt = '2022-02-15'
) b
ON a.item_sku_id = b.item_sku_id
WHERE b.item_sku_id IS NULL;
结果:
2022-02-16 23:03:47 INFO Table tmp.tmp_ljjtest02 stats: [numFiles=23, numRows=23, totalSize=5894, rawDataSize=2252]
2022-02-16 23:03:47 INFO Stage-1: Map: 18024 Reduce: 1009 Cumulative CPU: 377078.2 sec HDFS Read: 193.287 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 4m18s539ms job_5690061100801_32622232
2022-02-16 23:03:47 INFO Stage-10: Map: 920 Cumulative CPU: 5431.21 sec HDFS Read: 0.008 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 52s336ms job_5690061100801_32622707
2022-02-16 23:03:47 INFO Stage-4: Map: 832 Cumulative CPU: 1078.67 sec HDFS Read: 0.001 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 31s273ms job_8766029101801_36250210
2022-02-16 23:03:47 INFO Total MapReduce CPU Time Spent: 4d10h33m8s80ms
2022-02-16 23:03:47 INFO Total Map: 19776 Total Reduce: 1009
2022-02-16 23:03:47 INFO Total HDFS Read: 193.296 GB Written: 0.000 GB
结果分析:
以表中字段分组,如果数据不均匀,可能会造成数据倾斜。
以随机数分组
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=200000000;
set hive.merge.mapfiles = true;
set hive.merge.mapredfiles = false;
set hive.merge.size.per.task = 256000000;
set hive.merge.smallfiles.avgsize = 256000000;
set hive.merge.orcfile.stripe.level=false;
set hive.merge.rcfile.block.level=false;
set hive.default.fileformat = orc;
drop table if exists tmp.tmp_01;
create table tmp.tmp_01
as
SELECT
a.item_sku_id
FROM (
SELECT
item_sku_id,sku_name,brandname_cn,data_type
FROM big_table
WHERE dt = '2022-02-15' AND brandname_cn = '汾酒'
distribute by item_sku_id
) a
LEFT OUTER JOIN
( SELECT item_sku_id
FROM small_table
WHERE version = '1' AND normal_brandname_en = 'fenjiu' AND dt = '2022-02-15'
) b
ON a.item_sku_id = b.item_sku_id
WHERE b.item_sku_id IS NULL;
结果:
2022-02-16 22:53:00 INFO Table tmp.tmp_01 stats: [numFiles=3, numRows=23, totalSize=958, rawDataSize=2235]
2022-02-16 22:53:00 INFO Stage-1: Map: 18024 Reduce: 1009 Cumulative CPU: 372613.35 sec HDFS Read: 193.289 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 5m32s865ms job_5690061100801_32621522
2022-02-16 22:53:00 INFO Stage-10: Map: 924 Cumulative CPU: 5424.53 sec HDFS Read: 0.008 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 41s846ms job_5690061100801_32621870
2022-02-16 22:53:00 INFO Stage-4: Map: 858 Cumulative CPU: 1374.95 sec HDFS Read: 0.001 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 43s592ms job_5690061100801_32621913
2022-02-16 22:53:00 INFO Total MapReduce CPU Time Spent: 4d9h23m32s830ms
2022-02-16 22:53:00 INFO Total Map: 19806 Total Reduce: 1009
2022-02-16 22:53:00 INFO Total HDFS Read: 193.298 GB Written: 0.000 GB
结果分析:
以随机数分组,可以把数据从map端随机均匀的拆分给reduce端
总结
数据分组(触发一个mr任务),控制在map端如何拆分数据给reduce端,默认hash distribute by item_sku_id 会导致数据倾斜 distribute by rand() 如果执行过程中其中一个reduce任务失败,可能会导致数据丢失或重复 distribute by rand(int seed) 使用随机种子,可以避免数据不准确的问题
distribute by rand()的风险具体原理如下:
-
部分reduce shuffle失败导致上游个别map task重新生成数据,如果shuffle分片逻辑包含随机因子,会导致新生成的数据shuffle分片与之前不完全一致,进而导致部分数据重复读取或者数据丢失 -
reduce task从每个map task的结果文件中拉取对应分区的数据。数据在map阶段已经是分好区了,并且会有一个额外的索引文件记录每个分区的起始偏移量。所以reducetask取数的时候直接根据偏移量去拉取数据。 -
新生成的shuffle分片内数据会丢数据,同时也会包含分发到其他reducer的数据。
map-only操作结束时小文件合并
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=200000000;
set hive.merge.mapfiles = true;
set hive.merge.mapredfiles = false;
set hive.merge.size.per.task = 256000000;
set hive.merge.smallfiles.avgsize = 256000000;
set hive.merge.orcfile.stripe.level=false;
set hive.merge.rcfile.block.level=false;
set hive.default.fileformat = orc;
drop table if exists tmp.tmp_01;
create table tmp.tmp_01
as
create table tmp.tmp_01 as
SELECT
item_sku_id
FROM
cdm.cdm_goods_360buy_act_basic_da
WHERE
dt = '2022-02-15'
AND barndname_cn = '汾酒'
结果:
2022-02-17 14:45:37 INFO Table tmp.tmp_ljjtest021 stats: [numFiles=1, numRows=18415, totalSize=110953, rawDataSize=1762456]
结果分析:
【set hive.merge.mapfiles = true;】控制map-only操作结束时的小文件合并
mapReduce操作结束时小文件合并
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=200000000;
set hive.merge.mapfiles = false;
set hive.merge.mapredfiles = true;
set hive.merge.size.per.task = 256000000;
set hive.merge.smallfiles.avgsize = 256000000;
set hive.merge.orcfile.stripe.level=false;
set hive.merge.rcfile.block.level=false;
set hive.default.fileformat = orc;
drop table if exists tmp.tmp_01;
create table tmp.tmp_01
as
SELECT
a.item_sku_id
FROM (
SELECT
item_sku_id,sku_name,brandname_cn,data_type
FROM big_table
WHERE dt = '2022-02-15' AND brandname_cn = '汾酒'
) a
LEFT OUTER JOIN
( SELECT item_sku_id
FROM small_table
WHERE version = '1' AND normal_brandname_en = 'fenjiu' AND dt = '2022-02-15'
) b
ON a.item_sku_id = b.item_sku_id
WHERE b.item_sku_id IS NULL;
结果:
2022-02-16 23:12:45 INFO Table tmp.tmp_01 stats: [numFiles=1, numRows=23, totalSize=426, rawDataSize=2235]
结果分析:
【set hive.merge.mapredfiles = true;】控制mapReduce和mapjoin操作结束时的小文件合并
总结
这里需要注意mapjoin操作,虽然只触发了map操作,而没有触发reduce操作。 但这是优化的结果,本质上还是mapReduce任务,因此被【set hive.merge.mapredfiles = true;】参数控制。
执行mapjoin,不合并小文件,不分组
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=200000000;
set hive.merge.mapfiles = true;
set hive.merge.mapredfiles = false;
set hive.merge.size.per.task = 256000000;
set hive.merge.smallfiles.avgsize = 256000000;
set hive.merge.orcfile.stripe.level=false;
set hive.merge.rcfile.block.level=false;
set hive.default.fileformat = orc;
drop table if exists tmp.tmp_01;
create table tmp.tmp_01
as
SELECT
a.item_sku_id
FROM (
SELECT
item_sku_id,sku_name,brandname_cn,data_type
FROM big_table
WHERE dt = '2022-02-15' AND brandname_cn = '汾酒'
) a
LEFT OUTER JOIN
( SELECT item_sku_id
FROM small_table
WHERE version = '1' AND normal_brandname_en = 'fenjiu' AND dt = '2022-02-15'
) b
ON a.item_sku_id = b.item_sku_id
WHERE b.item_sku_id IS NULL;
结果:
2022-02-17 16:53:18 INFO Table tmp.tmp_01 stats: [numFiles=18020, numRows=23, totalSize=887747, rawDataSize=2252]
2022-02-17 16:53:18 INFO MapReduce Jobs Launched:
2022-02-17 16:53:19 INFO Stage-4: Map: 18020 Cumulative CPU: 205038.93 sec HDFS Read: 193.386 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 3m28s419ms job_8766029101801_36451400
2022-02-17 16:53:19 INFO
2022-02-17 16:53:19 INFO Total MapReduce CPU Time Spent: 2d8h57m18s930ms
2022-02-17 16:53:19 INFO Total Map: 18020 Total Reduce: 0
2022-02-17 16:53:19 INFO Total HDFS Read: 193.386 GB Written: 0.000 GB
结果分析:
执行mapjoin(在map端关联聚合),不合并小文件,每一个map对应一个输出文件。输出结果出现大量小文件。
执行mapjoin,不合并小文件,大表分组
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=200000000;
set hive.merge.mapfiles = true;
set hive.merge.mapredfiles = false;
set hive.merge.size.per.task = 256000000;
set hive.merge.smallfiles.avgsize = 256000000;
set hive.merge.orcfile.stripe.level=false;
set hive.merge.rcfile.block.level=false;
set hive.default.fileformat = orc;
drop table if exists tmp.tmp_01;
create table tmp.tmp_01
as
SELECT
a.item_sku_id
FROM (
SELECT
item_sku_id,sku_name,brandname_cn,data_type
FROM big_table
WHERE dt = '2022-02-15' AND brandname_cn = '汾酒'
) a
LEFT OUTER JOIN
( SELECT item_sku_id
FROM small_table
WHERE version = '1' AND normal_brandname_en = 'fenjiu' AND dt = '2022-02-15'
) b
ON a.item_sku_id = b.item_sku_id
WHERE b.item_sku_id IS NULL;
结果:
2022-02-16 22:53:00 INFO Table tmp.tmp_01 stats: [numFiles=3, numRows=23, totalSize=958, rawDataSize=2235]
2022-02-16 22:53:00 INFO Stage-1: Map: 18024 Reduce: 1009 Cumulative CPU: 372613.35 sec HDFS Read: 193.289 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 5m32s865ms job_5690061100801_32621522
2022-02-16 22:53:00 INFO Stage-10: Map: 924 Cumulative CPU: 5424.53 sec HDFS Read: 0.008 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 41s846ms job_5690061100801_32621870
2022-02-16 22:53:00 INFO Stage-4: Map: 858 Cumulative CPU: 1374.95 sec HDFS Read: 0.001 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 43s592ms job_5690061100801_32621913
2022-02-16 22:53:00 INFO Total MapReduce CPU Time Spent: 4d9h23m32s830ms
2022-02-16 22:53:00 INFO Total Map: 19806 Total Reduce: 1009
2022-02-16 22:53:00 INFO Total HDFS Read: 193.298 GB Written: 0.000 GB
结果分析:
执行mapjoin(在map端关联聚合),不合并小文件,大表预先分组,触发MR,减少mapjoin阶段的map数量。输出结果小文件大大减少。
执行mapjoin,合并小文件
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=200000000;
set hive.merge.mapfiles = true;
set hive.merge.mapredfiles = true;
set hive.merge.size.per.task = 256000000;
set hive.merge.smallfiles.avgsize = 256000000;
set hive.merge.orcfile.stripe.level=false;
set hive.merge.rcfile.block.level=false;
set hive.default.fileformat = orc;
drop table if exists tmp.tmp_01;
create table tmp.tmp_01
as
SELECT
a.item_sku_id
FROM (
SELECT
item_sku_id,sku_name,brandname_cn,data_type
FROM big_table
WHERE dt = '2022-02-15' AND brandname_cn = '汾酒'
) a
LEFT OUTER JOIN
( SELECT item_sku_id
FROM small_table
WHERE version = '1' AND normal_brandname_en = 'fenjiu' AND dt = '2022-02-15'
) b
ON a.item_sku_id = b.item_sku_id
WHERE b.item_sku_id IS NULL;
结果:
2022-02-16 23:12:45 INFO Table tmp.tmp_01 stats: [numFiles=1, numRows=23, totalSize=426, rawDataSize=2235]
2022-02-16 23:12:45 INFO Stage-1: Map: 18024 Reduce: 1009 Cumulative CPU: 365693.05 sec HDFS Read: 193.289 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 3m21s803ms job_5690061100801_32623163
2022-02-16 23:12:45 INFO Stage-10: Map: 918 Cumulative CPU: 5232.29 sec HDFS Read: 0.008 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 40s544ms job_8766029101801_36250732
2022-02-16 23:12:45 INFO Stage-4: Map: 1 Cumulative CPU: 13.95 sec HDFS Read: 0.000 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 43s762ms job_8766029101801_36250769
2022-02-16 23:12:45 INFO Total MapReduce CPU Time Spent: 4d7h2m19s290ms
2022-02-16 23:12:45 INFO Total Map: 18943 Total Reduce: 1009
2022-02-16 23:12:45 INFO Total HDFS Read: 193.297 GB Written: 0.000 GB
结果分析:
执行mapjoin(在map端关联聚合),合并小文件。输出结果小文件数量最少。
总结
mapjoin会在小表与大表join时的优化提升还是很明显的,但使用时有一些问题需要注意。 上例中执行mapjoin, 有18020个map,就对应输出18020个小文件(执行mapReduce会在大大减少输出的小文件数,测试集群默认最大的reduce数量1009个)。 因此就需要考虑后期小文件的合并,在大量小文件的场景下,最好的处理方式是开启小文件合并
不分组,不合并小文件
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=200000000;
set hive.merge.mapfiles = false;
set hive.merge.mapredfiles = false;
set hive.merge.size.per.task = 256000000;
set hive.merge.smallfiles.avgsize = 256000000;
set hive.merge.orcfile.stripe.level=false;
set hive.merge.rcfile.block.level=false;
WITH tmp as
(
SELECT
a.*
FROM (
SELECT
item_sku_id,sku_name,brandname_cn,data_type
FROM big_table
WHERE dt = '2022-02-15' AND brandname_cn = '汾酒'
) a
LEFT OUTER JOIN
( SELECT item_sku_id
FROM small_table
WHERE version = '1' AND normal_brandname_en = 'fenjiu' AND dt = '2022-02-15'
) b
ON a.item_sku_id = b.item_sku_id
WHERE b.item_sku_id IS NULL
)
create table tmp.tmp_01
as
SELECT
c.item_sku_id
FROM tmp c
join small_table d
on c.item_sku_id=d.item_sku_id
结果:
2022-02-17 20:49:21 INFO Stage-8: Map: 18026 Cumulative CPU: 210204.23 sec HDFS Read: 193.388 GB HDFS Write: 0.001 GB SUCCESS Elapsed : 2m55s772ms job_5690061100801_32863650
2022-02-17 20:49:21 INFO Stage-6: Map: 472 Cumulative CPU: 3330.08 sec HDFS Read: 0.334 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 50s42ms job_5690061100801_32865433
2022-02-17 20:49:21 INFO Total MapReduce CPU Time Spent: 2d11h18m54s310ms
2022-02-17 20:49:21 INFO Total Map: 18498 Total Reduce: 0
2022-02-17 20:49:21 INFO Total HDFS Read: 193.722 GB Written: 0.001 GB
2022-02-17 20:49:22 INFO Time taken: 1045.892 seconds, Fetched: 46 row(s)
2022-02-17 20:49:23 INFO 任务结束! sql执行耗时 0 hour 17 mins 38 s
结果分析:
不分组,不合并小文件的情况下用时1045秒
不分组,合并小文件
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=200000000;
set hive.merge.mapfiles = true;
set hive.merge.mapredfiles = true;
set hive.merge.size.per.task = 256000000;
set hive.merge.smallfiles.avgsize = 256000000;
set hive.merge.orcfile.stripe.level=false;
set hive.merge.rcfile.block.level=false;
WITH tmp as
(
SELECT
a.*
FROM (
SELECT
item_sku_id,sku_name,brandname_cn,data_type
FROM big_table
WHERE dt = '2022-02-15' AND brandname_cn = '汾酒'
) a
LEFT OUTER JOIN
( SELECT item_sku_id
FROM small_table
WHERE version = '1' AND normal_brandname_en = 'fenjiu' AND dt = '2022-02-15'
) b
ON a.item_sku_id = b.item_sku_id
WHERE b.item_sku_id IS NULL
)
create table tmp.tmp_01
as
SELECT
c.item_sku_id
FROM tmp c
join small_table d
on c.item_sku_id=d.item_sku_id
结果:
2022-02-17 20:45:25 INFO Stage-8: Map: 18026 Cumulative CPU: 214628.08 sec HDFS Read: 193.388 GB HDFS Write: 0.001 GB SUCCESS Elapsed : 3m41s147ms job_8766029101801_36477823
2022-02-17 20:45:25 INFO Stage-6: Map: 472 Cumulative CPU: 3995.4 sec HDFS Read: 0.334 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 59s677ms job_5690061100801_32865009
2022-02-17 20:45:25 INFO Total MapReduce CPU Time Spent: 2d12h43m43s480ms
2022-02-17 20:45:25 INFO Total Map: 18498 Total Reduce: 0
2022-02-17 20:45:25 INFO Total HDFS Read: 193.722 GB Written: 0.001 GB
2022-02-17 20:45:27 INFO Time taken: 1074.534 seconds, Fetched: 46 row(s)
2022-02-17 20:45:27 INFO 任务结束! sql执行耗时 0 hour 18 mins 10 s
结果分析:
不分组,合并小文件的情况下用时1074秒
分组,合并小文件
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=200000000;
set hive.merge.mapfiles = true;
set hive.merge.mapredfiles = true;
set hive.merge.size.per.task = 256000000;
set hive.merge.smallfiles.avgsize = 256000000;
set hive.merge.orcfile.stripe.level=false;
set hive.merge.rcfile.block.level=false;
WITH tmp as
(
SELECT
a.*
FROM (
SELECT
item_sku_id,sku_name,brandname_cn,data_type
FROM big_table
WHERE dt = '2022-02-15' AND brandname_cn = '汾酒'
distribute by rand(1)
) a
LEFT OUTER JOIN
( SELECT item_sku_id
FROM small_table
WHERE version = '1' AND normal_brandname_en = 'fenjiu' AND dt = '2022-02-15'
) b
ON a.item_sku_id = b.item_sku_id
WHERE b.item_sku_id IS NULL
)
create table tmp.tmp_01
as
SELECT
c.item_sku_id
FROM tmp c
join small_table d
on c.item_sku_id=d.item_sku_id
结果:
2022-02-17 19:57:57 INFO Stage-1: Map: 18026 Reduce: 1009 Cumulative CPU: 362544.56 sec HDFS Read: 193.290 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 4m55s871ms job_5690061100801_32857972
2022-02-17 19:57:57 INFO Stage-9: Map: 928 Cumulative CPU: 5520.28 sec HDFS Read: 0.008 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 2m33s752ms job_5690061100801_32858541
2022-02-17 19:57:57 INFO Stage-7: Map: 472 Cumulative CPU: 3638.63 sec HDFS Read: 0.334 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 1m42s589ms job_5690061100801_32858927
2022-02-17 19:57:57 INFO Total MapReduce CPU Time Spent: 4d7h15m3s470ms
2022-02-17 19:57:57 INFO Total Map: 19426 Total Reduce: 1009
2022-02-17 19:57:57 INFO Total HDFS Read: 193.634 GB Written: 0.000 GB
2022-02-17 19:57:58 INFO Time taken: 690.782 seconds, Fetched: 46 row(s)
2022-02-17 19:57:58 INFO 任务结束! sql执行耗时 0 hour 11 mins 44 s
结果分析:
分组,合并小文件的情况下用时690秒
总结
在hive任务中间结果产生大量小文件的情况下,合并小文件设置对结果没有影响; 但设置大表预先分组,触发MR,减少mapjoin阶段的map数量,会大大减少计算任务的用时
|