概述
本文意图解决 HIVE 3 版本中使用 MR 作为运算引擎进行 JOIN 操作时导致的丢数情况。
问题描述
Apache Hive 在 2.3 版本后宣布放弃维护 MapReduce 作为底层执行引擎,并转而使用 Tez 作为默认的查询引擎。但是由于 Tez 在大作业量和高并发时的严重性能问题,导致许多任务不得不继续使用 MapReduce 进行操作,因此就需要开发者自行维护 Hive 对于 MR 的可用性。
然而,在 Hive 升级至 Hive 3 版本中,继续使用 MapReduce 会导致非常严重的恶性错误。例如,即使进行非常简单的 JOIN 操作,都会导致部分应该被关联上的数据丢失。
本文档意图提供测试场景浮现上述恶性漏洞,并阐述其根本原因,最后对出现问题部分的源代码进行修改,以彻底修复该问题。
问题复现
场景1: 多表 (超过三张表) 时数据丢失
在复现开始之前先对 Hive 的部分参数进行设置:
SET hive.execution.engine=mr;
SET mapred.reduce.tasks=2;
SET hive.auto.convert.join=false;
首先,创建三张表。这三张表除了表名不一样,其他包括列信息甚至数据在内完全相同。
建表语句如下。我们使用文件的形式快速插入,当然为了复现这个问题您也可以手动插入如下数据:
USE default;
create table table_a(id string, name string, addr string) stored as orc;
create table table_b(id string, name string, addr string) stored as orc;
create table table_c(id string, name string, addr string) stored as orc;
LOAD DATA LOCAL INPATH "/home/hadoop/reproduce_hive/Scenario1/table_a_data.orc" INTO TABLE table_a;
LOAD DATA LOCAL INPATH "/home/hadoop/reproduce_hive/Scenario1/table_b_data.orc" INTO TABLE table_b;
LOAD DATA LOCAL INPATH "/home/hadoop/reproduce_hive/Scenario1/table_c_data.orc" INTO TABLE table_c;
通过以下语句查看三张表的内容,可以看到其中的数据完全一致。
hive> select * from table_a;
OK
11 a aaa
22 b bbb
33 c ccc
44 d ddd
55 e eee
66 f fff
77 g ggg
88 h hhh
99 i iii
00 j jjj
Time taken: 0.157 seconds, Fetched: 10 row(s)
hive> select * from table_b;
OK
11 a aaa
22 b bbb
33 c ccc
44 d ddd
55 e eee
66 f fff
77 g ggg
88 h hhh
99 i iii
00 j jjj
Time taken: 0.471 seconds, Fetched: 10 row(s)
hive> select * from table_c;
OK
11 a aaa
22 b bbb
33 c ccc
44 d ddd
55 e eee
66 f fff
77 g ggg
88 h hhh
99 i iii
00 j jjj
Time taken: 0.186 seconds, Fetched: 10 row(s)
在确认三张表的数据准确无误后,使用如下关联语句对三张表进行关联:
select a.id as a_id, b.name as b_name, c.addr as c_addr from table_a a join table_b b on(a.id=b.id) join table_c c on(c.name=b.name);
关联结果如下,数据丢失的结果令人咋舌。
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 23439 HDFS Write: 5508 SUCCESS
Stage-Stage-2: HDFS Read: 26292 HDFS Write: 5508 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
22 b bbb
66 f fff
88 h hhh
55 e eee
99 i iii
Time taken: 3.343 seconds, Fetched: 5 row(s)
可以非常明显地看到,本来应该被完全关联在一起的 10 条数据,居然出现了严重的数据丢失。有一半的数据竟然没有被成功关联。如果多次运行关联语句,可以发现这不是偶然情况。每次关联 2 张表以上的数据都会出现极为严重的数据丢失问题。
场景2: 表的某些属性 (e.g. bucketing_version) 不同时,即使两张表关联也会导致数据丢失
使用如下数据进行数据建表关联。在建表时使用不同的?bucketing_version ?进行表的初始化。
数据文件如下:
0,Kurt,vulnedcasey@yahoo.co.uk
1,Rolland,naejose@gmx.com
2,Cortez,blategarfield@yahoo.com
3,Tyron,tameprobes@gmail.com
4,Matthew,wellezekiel@yahoo.co.uk
5,Jeffrey,fabingeborg@comcast.net
6,Gerard,oughtoutgo@att.net
7,Hal,coursedmauro@hotmail.com
8,Virgil,squintprude@gmail.com
9,Hector,lewddillon@email.com
利用如下语句建表,并在建表时使用不同的?bucketing_version ?属性。
CREATE TABLE `join_test_1`(`id` string, `first` string, `email` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ('field.delim'=',', 'serialization.format'=',') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' TBLPROPERTIES ('bucketing_version'='1');
LOAD DATA LOCAL INPATH '/home/hadoop/reproduce_hive/Scenario2/test_data.csv' OVERWRITE INTO TABLE join_test_1;
CREATE TABLE `join_test_2`(`id` string, `first` string, `email` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ('field.delim'=',', 'serialization.format'=',') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' TBLPROPERTIES ('bucketing_version'='2');
LOAD DATA LOCAL INPATH '/home/hadoop/reproduce_hive/Scenario2/test_data.csv' OVERWRITE INTO TABLE join_test_2;
运行关联操作的 SQL 语句:
SET hive.execution.engine=mr;
SET mapred.reduce.tasks=2;
SET hive.auto.convert.join=false;
SELECT * from (SELECT id from join_test_1) as tbl1 LEFT JOIN (SELECT id from join_test_2) as tbl2 on tbl1.id = tbl2.id;
查询关联结果,令人惊讶的事情再一次发生:
Ended Job = job_local184369678_0005
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 28434 HDFS Write: 7956 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
0 NULL
2 NULL
4 NULL
6 NULL
8 8
1 NULL
3 NULL
5 5
7 NULL
9 NULL
蕴含同样数据的两张表,仅仅由于建表时的某些属性不同,就导致了绝大部分数据的关联都不成功。数据最基本的准确性都无法得到保障,这毫无疑问是 HIVE 3 中非常致命的问题。
问题根源
由于该问题影响过于严重,导致许多使用 HIVE 的开发者第一时间发现了本问题并及时进行了 Bug Report。在 HIVE Jira 上面可以看到非常多的针对该问题的问题报告和可能的解决方案。
本文主要采用了 HIVE JIRA 中编号为?HIVE-22098 ?的问题描述和相应解决思路。
根据?HIVE-22098 ?的问题描述,究其根源,是由于 HIVE 2 与 HIVE 3 在 JOIN 操作时使用了不同的 Hash 算法,导致同样的值在关联时被不同的 Hash 算法映射成了不同的值,而这些不同的 Hash 值在进行关联时无法被相互匹配。最终导致本来该被关联在一起的数据由于 Hash 值得不同未能被关联在一起。而决定到底应用哪套 Hash 值算法则是根据?bucketing_version ?的值来进行评判的。
特别地,在进行多表关联时,即使相同?bucketing_version ?的 Hive 表,由于其关联的中间过程所产生的中间表,在源代码中?bucketing_version ?值会被置为?-1 ,因此该中间表再与第三张乃至更多的表关联时会直接导致 Hash 算法的混乱计算。
因此,为了保障关联的数据准确性,必须要确保?bucketing_version ?在进行多表关联或者多版本表关联时的稳定。即,保障?bucketing_version ?的稳定性就是保证 Hive 3 数据关联时的准确性。
此外,HIVE 社区已经针对?bucketing_version ?不稳定的问题进行了集中的问题汇总和修改建议指导。可以通过查看?JIRA: HIVE-21304?了解系统性的?bucketing_version ?稳定性提高方法,此处不做过多赘述。
Ps: 该问题还可能导致许多其他异常的出现,比如?HIVE-18983 ,?HIVE-20164 ,?HIVE-22429 ?等诸多问题的出现。因此该 BUG 的严重级别是最高的。修复了本问题,其余数十个问题也就都可以迎刃而解。
解决思路
由于 HIVE 中 JOIN 操作执行流程的本质是一个二叉树,因此我们只需要通过算法在关联时遍历每个节点,并将每个节点的?bucketing_version ?在关联前手动设置为该二叉树中的最高版本,即可保证?bucketing_version ?在关联时的稳定,也就可以保障关联不丢数。
源码修改及编译上传
将如下代码替换至原代码即可修复本问题。
下面给出 Patch 中的 Git 代码,对相应的类进行修复。Patch 的代码在 Jira HIVE-22098 中都可以找到。在此衷心感谢各位 Code Contributors 对于 HIVE 社区的贡献。
From c0774da927451008ba78ed7b8637a1a4899d9e12 Mon Sep 17 00:00:00 2001
From: luguangming <luguangming1@huawei.com>
Date: Mon, 12 Aug 2019 14:24:05 +0800
Subject: [PATCH]HIVE-22098
---
.../apache/hadoop/hive/ql/exec/mr/ExecMapper.java | 41 ++++++++++++++++++++++
1 file changed, 41 insertions(+)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
index 99b33a3..d0c847e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
@@ -20,10 +20,12 @@
import java.io.IOException;
import java.net.URLClassLoader;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -104,6 +106,10 @@ public void configure(JobConf job) {
// initialize map operator
mo.initialize(job, null);
mo.setChildren(job);
+
+ // defined self balance ReduceSinkOperator of bucketVersion
+ balanceRSOpbucketVersion(mo);
+
l4j.info(mo.dump(0));
// initialize map local work
localWork = mrwork.getMapRedLocalWork();
@@ -138,6 +144,41 @@ public void configure(JobConf job) {
}
}
}
+
+ /**
+ * defined-self balance ReduceSinkOperator of bucketVersion, keep values to sameness
+ * @param rootOp
+ */
+ private static void balanceRSOpbucketVersion(Operator rootOp){
+ List<Operator<? extends OperatorDesc>> needDealOps = new ArrayList<Operator<? extends OperatorDesc>>();
+ visitChildGetRSOps(rootOp, needDealOps);
+ int bucketVersion = -1;
+ for(Operator<? extends OperatorDesc> rsop : needDealOps){
+ if(rsop.getBucketingVersion() != 2 && rsop.getBucketingVersion() != 1){
+ rsop.setBucketingVersion(-1);
+ }
+ if(rsop.getBucketingVersion() > bucketVersion){
+ bucketVersion = rsop.getBucketingVersion();
+ }
+ }
+ for(Operator<? extends OperatorDesc> rsop : needDealOps){
+ l4j.info("update reduceSinkOperator name="+rsop.getName()+", opId="+rsop.getOperatorId()+", oldBucketVersion="+rsop.getBucketingVersion()+", newBucketVersion="+bucketVersion);
+ rsop.setBucketingVersion(bucketVersion);
+ }
+ needDealOps.clear();
+ }
+ private static void visitChildGetRSOps(Operator rootOp, List<Operator<? extends OperatorDesc>> needDealOps){
+ List<Operator<? extends OperatorDesc>> ops = rootOp.getChildOperators();
+ if(ops == null || ops.isEmpty()){
+ return;
+ }
+ for(Operator<? extends OperatorDesc> op : ops) {
+ if (op instanceof ReduceSinkOperator) {
+ needDealOps.add(op);
+ }
+ visitChildGetRSOps(op, needDealOps);
+ }
+ }
@Override
public void map(Object key, Object value, OutputCollector output,
Reporter reporter) throws IOException {
--
2.9.2
编译对应的模块?hive-exec-3.1.2.jar ,并将该 Jar 包替换 Hive 3 自带的 Jar 包。编译 HIVE 的命令可以去查询 HIVE 的官方文档,这里不做过多赘述。
重启 Hive 让其重新加载我们修改源码后的 Jar 包,再次重复上述两个场景,即可观察到 MapReduce 的结果正常,该问题被成功修复。
关联结果示例:
Ended Job = job_local184369678_0006
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 22434 HDFS Write: 7966 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
0 0
1 1
2 2
4 4
6 6
8 8
3 3
5 5
7 7
9 9
小结
HIVE JIRA 中有许多关于异常信息和报错的讨论。经常性地浏览社区,配合阅读源代码可以对 HIVE 的理解更加深入。
再次,HIVE 社区已经针对?bucketing_version ?不稳定的问题进行了集中的问题汇总和修改建议指导。可以通过查看?JIRA: HIVE-21304?了解系统性的?bucketing_version ?稳定性提高方法,此处不做过多赘述。
希望本篇文章对您的 HIVE 使用有所帮助。
References
|