前言
- COUNT(DISTINCT xxx)在hive中很容易造成数据倾斜。
- 第一印象是用
size(collect_set(字段))替代COUNT(DISTINCT 字段)
- 但是如果当数据量大到一定的数量,会导致处理倾斜的reduce任务产生内存溢出的异常
- collect_set输出一个数组,中间结果会放到内存中,所以如果collect_set聚合太多数据,会导致内存溢出。
实例
- uuid是一个很长的字符串,假定其长度为64位。
- 需求:统计用户数
SELECT
COUNT(DISTINCT uuid)
FROM test t
- 由于引入了DISTINCT,因此在Map阶段无法利用combine对输出结果消重,必须将uuid作为key输出,在Reduce阶段再对来自于不同Map Task、相同Key的结果进行消重,计入最终统计值。
- 即使调整
set mapred.reduce.tasks=100 参数也不会实际影响Reduce Task个数,Hive运行时输出“Number of reduce tasks determined at compile time: 1”。原来Hive在处理COUNT这种“全聚合(full aggregates)”计算时,它会忽略用户指定的Reduce Task数,而强制使用1
优化一
SELECT
SUM(mau_part) mau
FROM
(
SELECT
substr(uuid, 1, 3) uuid_part,
COUNT(DISTINCT substr(uuid, 4)) AS mau_part
FROM test
GROUP BY substr(uuid, 1, 3)
) t;
- 解析
- 内层SELECT根据uuid的前3位进行GROUP BY,多个reduce处理,减少每个reduce的数据量,充分利用mr的分而治之思想,外层SELECT求和,得到最终结果。
- 关于截取值n的取值,我们假定uuid是由字母和数字组成的:大写字母、小写字母和数字,字符总数为26+26+10=62。理论上,内层SELECT进行GROUP BY时,会有 62^n 个分组,外层SELECT就会进行 62^n 次求和。所以n的取值需要适当。
优化二
SELECT
SUM(s.mau_part) mau
FROM
(
SELECT
tag,
COUNT(*) mau_part
FROM
(
SELECT
uuid,
CAST(RAND() * 100 AS INT) tag
FROM test
GROUP BY uuid
) t
GROUP BY tag
) s
- 第一层SELECT:对uuid进行去重,并为去重后的uuid打上整数标记
- 第二层SELECT:按照标记进行分组,统计每个分组下uuid的个数
- 第三层SELECT:对所有分组进行求和
- 上面这个方法最关键的是为每个uuid进行标记,这样就可以对其进行分组,分别计数,最后去和。如果数据量确实很大,也可以增加分组的个数。例如:CAST(RAND() * 1000 AS BIGINT) tag
|