一、Hive 概述
二、Hive 数据类型和文件格式
2.1 数据类型
2.1.1 基本数据类型
类别 | 类型 |
---|
整型 Integers | TINYINT - 1 个字节的有符号整型 SAMLINT - 2 个字节有符号整型 INT - 4 个字节有符号整型 BIGINT - 8 个字节有符号整型 | 浮点数 Floating point numbers | FLOAT - 单精度浮点数 DOUBLE - 双精度浮点数 | 定点数 Fixed point numbers | DECIMAL - 17 字节,任意精度数字 | 字符串 String | STRING - 不定长字符串 VARCHAR - (1 - 65535)长度的不定长字符串 CHAR - (1 - 255) 定长字符串 | 时间日期 Datetime | TIMESTAMP - 时间戳,纳秒级 DATE - 时间日期类型 | 布尔类型 Boolean | BOOLEAN | 二进制类型 Binary types | BINARY 二进制字节序列 |
Hive 中的基本数据类型都是通过 java 中的接口实现的,各类型与 java 数据类型的对应关系:
Hive 数据类型 | java 数据类型 |
---|
TINYINT | byte | SAMLINT | short | INT | int | BIGINT | long | FLOAT | float | DOUBLE | double | DECIMAL | | STRING | String | VARCHAR | | CHAR | | TIMESTAMP | | DATE | | BOOLEAN | boolean | BINARY | |
2.1.2 数据类型隐式转化
规律:
2.1.3 集合数据类型
集合类型 | 描述 | 示例 | 索引方式 |
---|
array | 有序的,数据类型相同的,集合 | array(1, 2) | [index] | map | kv 键值对,key 必须是基本数据类型,value 不限制 | map(‘a’, 1, ‘b’, 2) | [key] | struct | 不同类型的, 集合, 类似 c 语言的结构体 | | struct名.字段名 | union | 不同类型的数据,但存储在同一字段的不同行中 | | |
hive (default)> select array(1, 2, 3);
OK
_c0
[1,2,3]
-- 通过 [下标] 索引
hive (default)> select arr[1] from (select array(1, 2, 3) arr) tmp;
OK
_c0
2
hive (default)> select map('a', 1, 'b', 2);
OK
_c0
{"a":1,"b":2}
-- 通过 [key] 索引
hive (default)> select tmap['b'] from (select map('a', 1, 'b', 2) tmap) tmp;
OK
_c0
2
-- struct 没有字段名时,默认字段名为 col_x
hive (default)> select struct('username1', 7, 1288.68);
OK
_c0
{"col1":"username1","col2":7,"col3":1288.68}
-- name_struct() 可以指定字段名
hive (default)> select named_struct("name", "username1", "id", 7, "salary", 12880.68);
OK
_c0
{"name":"username1","id":7,"salary":12880.68}
-- 通过 列名.字段名 的方式 索引
hive (default)> select userinfo.id, userinfo.name from (select named_struct("name", "username1", "id", 7, "salary", 12880.68) userinfo) tmp;
OK
id name
7 username1
2.2 文本文件的数据编码
Hive 表中的数据都存储在 HDFS 上,其定义了默认的存储格式,也支持自定义存储格式
用户定义数据格式需要指定三个属性:
-
列分隔符(通常为空格、"\t"、"\x001") -
行分隔符("\n") -
读取文件数据的方法。
2.2.1 默认存储格式的默认分隔符
分隔符 | 名称 | 说明 |
---|
\n | 换行符 | 分割行,每行一条记录 | ^A | 字段分割符 | 分割字段 | ^B | 元素分隔符 | 分割 array, map, struct 中的元素 | ^C | kv分隔符 | 分割 map 中的 key 和 value |
案例:
学生表
-- 字段和数据
id name age hobby(array) score(map)
666 thomas 18 read game java 90 hadoop 95
-- 加上分隔符
666 ^A thomas ^A 18 ^A read ^B game ^A java ^C 90 ^B hadoop ^C 95
-- 建表语句
create table s1 (
id int,
name string,
age int,
hobby array<string>,
score map<string, int>
);
-- 加载数据命令
load data local inpath '/home/hadoop/data/s1.data' into table s1;
2.2.2 读时模式
读取数据时才检查数据是否符合表的定义
Hive 采用 读时模式 schema on read , 加载数据时不进行数据格式的校验,读取数据时,如果不合法,则显示 Null
该模式的优点是写入、加载数据迅速
三、DDL
3.1 数据库操作
创建数据库
– 创建数据库,在HDFS上存储路径为 /user/hive/warehouse/*.db
CREATE DATABASE database_name IF NOT EXISTS database_name
COMMENT 'comments'
LOCATION 'hdfs_path'
;
查看数据库
SHOW DATABASES;
DESC DATABASE database_name;
DESC DATABASE EXTENDED database_name;
使用数据库
USE database_name
删除数据库
DROP DATABASE database_name;
DROP DATABASE database_name CASCADE;
3.2 建表语句
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type [column_constraint_specification] [COMMENT col_comment], ... [constraint_specification])]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[
[ROW FORMAT row_format]
[STORED AS file_format]
]
[LOCATION hdfs_path]
[TBLPROPERTIES (property_name=property_value, ...)]
[AS select_statement];
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
LIKE existing_table_or_view_name
[LOCATION hdfs_path];
-
EXTERNAL 关键字,创建外部表,否则创建内部表
删除内部表时,数据和表同时被删除
删除外部表时,只删除表
-
COMMENT 表注释 -
PARTITION BY 分区 -
CLUSTERED BY 分桶 -
SORTED BY 对桶表中的一个或多个字段排序 -
存储语句 ROW FORMAT DELIMITED
[FIELDS TERMINATED BY char]
[COLLECTION ITEMS TERMINATED BY char]
[MAP KEYS TERMINATED BY char]
[LINES TERMINATED BY char] | SERDE serde_name
[WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, ...)]
-
SRORED AS SEQUENCEFILE(二进制序列文件,用于压缩) | TEXTFILE(默认) | RCFILE -
LOCATION 表在 HDFS 上的存放位置 -
TBLPROPERTIES 表的属性 -
AS 表示根据其后面的查询语句的查询结果创建表 -
LIKE 用于复制现有的表结构,但是不复制数据
3.3 内部表 与 外部表
-
默认情况下,创建内部表 -
EXTERNAL 关键字,用于创建外部表 -
内部表可以转换为外部表 -
删除内部表时,表和数据同时被删除 -
删除外部表时,只删除表 -
生产环境中,多使用外部表
案例:
数据
2;zhangsan;book,TV,code;beijing:chaoyang,shagnhai:pudong
3;lishi;book,code;nanjing:jiangning,taiwan:taibei
4;wangwu;music,book;heilongjiang:haerbin
内部表
CREATE TABLE t1 (
id INT,
name STRING,
hobby ARRAY<STRING>,
addr MAP<STRING, STRING>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ";"
COLLECTION ITEMS TERMINATED BY ","
MAP KEYS TERMINATED BY ":"
;
DESC t1;
DESC FORMATTED t1;
LOAD DATA LOCAL INPATH '/home/hadoop/data/t1.dat' INTO TABLE t1;
dfs -ls /user/hive/warehouse/mydb.db/t1;
DROP TABLE t1;
外部表
CREATE EXTERNAL TABLE t2 (
id INT,
name STRING,
hobby ARRAY<STRING>,
addr MAP<STRING, STRING>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ";"
COLLECTION ITEMS TERMINATED BY ","
MAP KEYS TERMINATED BY ":"
;
DESC t2;
DESC FORMATTED t2;
LOAD DATA LOCAL INPATH '/home/hadoop/data/t1.dat' INTO TABLE t2;
dfs -ls /user/hive/warehouse/mydb.db/t2;
DROP TABLE t2;
内部表 与 外部表的转换
ALTER TABLE t1 SET TBLPROPERTIES('EXTERNAL'='TRUE');
ALTER TABLE t1 SET TBLPROPERTIES('EXTERNAL'='FALSE');
3.4 分区表
Hive 查询时,会扫描整个表的数据,但是表的数据量大会导致全表扫描消耗资源大,效率低
Hive 中的分区表,将表的数据存储在不同的子目录中,每个目录对应一个分区,当查询只需要扫描部分数据时,可以使用分区表,提高查询效率
通常根据时间、地区等信息进行分区
创建分区表
CREATE TABLE IF NOT EXISTS t3 (
id INT,
name STRING,
hobby ARRAY<STRING>,
addr MAP<STRING, STRING>
)
PARTITIONED BY (dt STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ";"
COLLECTION ITEMS TERMINATED BY ","
MAP KEYS TERMINATED BY ":"
;
LOAD DATA LOCAL INPATH '/home/hadoop/data/t1.dat' INTO TABLE T3 PARTITION(dt='2020-06-01');
LOAD DATA LOCAL INPATH '/home/hadoop/data/t1.dat' INTO TABLE T3 PARTITION(dt='2020-06-02');
分区字段不是表中已经存在的数据,可以将分区字段看成虚拟列
查看分区
SHOW PARTITIONS t3;
新增分区并设置数据
ALTER TABLE t3 ADD PARTITION(dt='2020-06-03');
ALTER TABLE t3 ADD PARTITION(dt='2020-06-04') PARTITION(dt='2020-06-05');
hdfs dfs -cp /user/hive/warehouse/mydb.db/t3/dt=2020-06-01 /user/hive/warehouse/mydb.db/t3/dt=2020-06-07
hdfs dfs -cp /user/hive/warehouse/mydb.db/t3/dt=2020-06-01 /user/hive/warehouse/mydb.db/t3/dt=2020-06-08
hdfs dfs -cp /user/hive/warehouse/mydb.db/t3/dt=2020-06-01 /user/hive/warehouse/mydb.db/t3/dt=2020-06-06
ALTER TABLE t3 ADD
PARTITION(dt='2020-06-07') LOCATION '/user/hive/warehouse/mydb.db/t3/dt=2020-06-07'
PARTITION(dt='2020-06-08') LOCATION '/user/hive/warehouse/mydb.db/t3/dt=2020-06-08'
;
修改分区的 HDFS 路径
ALTER TABLE t3 PARTITION(dt='2020-06-04') SET LOCATION '/user/hive/warehouse/mydb.db/t3/dt=2020-06-08';
删除分区
ALTER TABLE t3 DROP
PARTITION(dt='2020-06-03'),
PARTITION(dt='2020-06-05')
;
3.5 分桶表
当单个的分区或者表的数据量过大,分区不能更细粒度的划分数据,就需要使用分桶 技术将数据划分成更细的粒度。将数据按照指定的字段进行分成多个桶中去,即将数 据按照字段进行划分,数据按照字段划分到多个文件当中去
测试数据
java 90
1 c 78
1 python 91
1 hadoop 80
2 java 75
2 c 76
2 python 80
2 hadoop 93
3 java 98
3 c 74
3 python 89
3 hadoop 91
5 java 93
6 c 76
7 python 87
8 hadoop 88
创建分桶表
CREATE TABLE course (
id INT,
name STRING,
score INT
)
CLUSTERED BY (id) INTO 3 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
;
CREATE TABLE course_common (
id INT,
name STRING,
score INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
;
LOAD DATA LOCAL INPATH '/home/hadoop/data/course.dat' INTO TABLE course_common;
INSERT INTO TABLE course SELECT * FROM course_common;
分桶规则
3.6 修改表 与 删除表
表名重命名
ALTER TABLE course_common RENAME TO course_common1;
字段名重命名
ALTER TABLE course_common CHANGE COLUMN id cid INT;
ALTER TABLE course_common CHANGE COLUMN id cid STRING;
增加字段
ALTER TABLE course_common ADD COLUMNS (common STRING);
删除字段
ALTER TABLE course_common REPLACE COLUMNS (id STRING, cname STRING, score INT)
删除表
DROP TABLE course_common;
四、数据操作
4.1 数据导入
语法:
LOAD DATA [LOCAL] INPATH 'filepath'
[OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
或者在创建表时,添加 LOCATION 参数,指定数据路径,完成创建表时导入数据
插入数据
INSERT INTO TABLE tabC
PARTITION(month='202001')
VALUES (5, 'wangwu', 'BJ'), (4, 'lishi', 'SH'), (3, 'zhangsan', 'TJ');
INSERT INTO TABLE tabC
PARTITION(month='202002')
SELECT id, name, area FROM tabC WHERE month='202001';
FROM tabC
INSERT TABLE tabC
PARTITION(month='202003')
SELECT id, name, area WHERE month='202002'
INSERT TABLE tabC
PARTITION(month='202004')
SELECT id, name, area WHERE month='202002'
;
创建表 并插入数据
CREATE TABLE IF NOT EXISTS tabD
AS SELECT * FROM tabC;
import 导入数据
IMPORT TABLE student2 PARTITION(month='202009')
FROM '/user/hive/warehouse/export/student';
4.2 数据导出
查询结果导出到本地
INSERT OVERWRITE LOCAL DIRECTORY '/home/hadoop/data/tabC'
SELECT * FROM tabC;
查询结果格式化后 导出到本地
INSERT OVERWRITE LOCAL DIRECTORY '/home/hadoop/data/tabC'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
SELECT * FROM tabC;
查询结果格式化后 导出到HDFS
去掉 LOCAL 关键字, 将输出路径改为 HDFS 文件路径
INSERT OVERWRITE DIRECTORY '/user/hadoop/data/course'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
SELECT * FROM course;
dfs 命令到处数据导本地,其实是 hdfs 文件拷贝到本地的 get 命令
dfs -get 'hdfs_path' 'local_path';
hive 命令,导出数据到本地, 查询结果重定向到文件
hive -e "SELECT * FROM course" > a.log
hive -e "SELECT * FROM mydb.course" > a.log
export 导出数据到 HDFS, 此方式还包含表的元数据信息
EXPORT TABLE tabC TO 'hdfs_path';
export 导出的数据,可以用 import 命令导入到 表中
CREATE TABLE tabE LIKE tabC;
IMPORT TABLE tabE 'hdfs_path';
清空表,仅限内部表
```sql
TRUNCATE TABLE table_name;
```
五、DQL
语法:
[WITH CommonTableExpression (, CommonTableExpression)*] (Note: Only available starting with Hive 0.13.0)
SELECT [ALL | DISTINCT] select_expr, select_expr, ...
FROM table_reference
[WHERE where_condition]
[GROUP BY col_list]
[ORDER BY col_list]
[CLUSTER BY col_list
| [DISTRIBUTE BY col_list] [SORT BY col_list]
]
[LIMIT [offset,] rows]
注意
-
WHERE 子句中不能使用列的别名 -
WHERE 子句紧随 FROM 子句 -
正则表达式用 RLIKE 或者 REGEXP -
WHERE 子句不能有分组函数, HAVING 子句可以有分组函数 -
HAVING 只能用于 GROUP BY 分组统计后
WHERE 子句针对表中的数据发挥作用
HAVING 针对查询结果(聚组以后的结果) 发挥作用
-
笛卡尔积 在以下条件会产生笛卡尔积
-
没有连接条件 -
连接条件无效 -
所有表中的所有行互相连接 -
ORDER BY - 全局排序,对最终的结果进行排序,所以出现在 SELECT 语句的结尾
只有一个 reduce
排序字段要出现在select子句中
-
SORT BY - 每个 MR 内部排序,
每一个 reduce 内部进行排序, 得到局部有序的结果
-
DISTRIBUTE BY - 分区, 根据字段,将特定的行发送到特定的 reduce 中,需要配合 SORT BY 实现分区排序 -
CLUSTER BY - 简化分区排序语法,当 DISTRIBUTE BY 与 SORT BY 是用一个字段时使用
只能升序, 不能指定排序规则
SELECT * FROM emp DISTRIBUTE BY deptno SORT BY deptno;
-- 等价于
SELECT * FROM emp CLUSTER BY deptno;
六、函数
6.1 Hive 内置函数
查看系统函数
SHOW FUNCTIONS;
DESC FUNCTION function_name;
DESC FUNCTION EXTENDED function_name;
日期函数
SELECT CURRENT_DATE;
SELECT UNIX_TIMESTAMP();
SELECT CURRENT_TIMESTAMP();
SELECT FROM_UNIXTIME(1632881444);
SELECT FROM_UNIXTIME(1632881444, 'yyyyMMdd');
SELECT FROM_UNIXTIME(1632881444, 'yyyyMMdd HH:mm:ss');
SELECT UNIX_TIMESTAMP('2021-09-29 12:20:05');
SELECT DATEDIFF('2020-04-18', '2020-05-17');
SELECT DATEDIFF('2020-05-17', '2020-04-18');
SELECT DAYOFMONTH(CURRENT_DATE);
SELECT DATE_SUB(CURRENT_DATE, DAYOFMONTH(CURRENT_DATE)-1);
SELECT DATE_SUB('2020-03-01', 2);
SELECT ADD_MONTHS(DATE_SUB(CURRENT_DATE, DAYOFMONTH(CURRENT_DATE)-1), 1);
SELECT TO_DATE('2020-01-01');
SELECT TO_DATE('2020-01-01 12:20:05');
SELECT DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss');
SELECT DATE_FORMAT(CURRENT_DATE(), 'yyyy-MM-dd HH:mm:ss');
SELECT DATE_FORMAT('2020-06-20', 'yyyy-MM-dd HH:mm:ss');
SELECT *, ROUND(DATEDIFF(CURRENT_DATE, hiredate) / 365, 1) working_years FROM emp;
字符串函数
SELECT LOWER("HELLO WORLD");
SELECT UPPER(LOWER("HELLO WORLD"));
SELECT LENGTH(ename), ename FROM emp;
SELECT CONCAT(empno, " ", ename) idname from emp;
SELECT CONCAT_WS('.', 'www', array('lagou', 'com'));
SELECT CONCAT_WS(" ", ename, job) FROM emp;
SELECT SUBSTR('www.lagou.com', 5);
SELECT SUBSTR('www.lagou.com', -5);
SELECT SUBSTR('www.lagou.com', 5, 5);
SELECT SPLIT('www.lagou.com', '\\.');
数学函数
SELECT ROUND(3.1415926);
SELECT ROUND(3.1415926, 2);
SELECT ROUND(3.1415926, -2);
SELECT CEIL(3.1415926);
SELECT FLOOR(3.1415926);
SELECT
ABS(-10.5) abs,
POWER(-10.5, 2) power,
SQRT(-10.5) sqrt,
LOG2(-10.5) log2,
LOG10(-10.5) log10
;
SELECT
ABS(-10.5) abs,
POWER(-10.5, 2) power,
SQRT(100) sqrt,
LOG2(100) log2,
LOG10(100) log10
;
条件函数
if, case when, coalesce, isnull/isnotnull, nvl, nullif
SELECT sal, IF (sal<=1500, 1, if (sal<3000, 2, 3)) FROM emp;
SELECT sal,
CASE WHEN sal<=1500 THEN 1
WHEN sal<=3000 THEN 2
ELSE 3 END sal_level
FROM
emp
;
SELECT ename, deptno,
CASE deptno WHEN 10 THEN 'accounting'
WHEN 20 THEN 'research'
WHEN 30 THEN 'salse'
ELSE 'unknown' END dept_name
FROM
emp
;
SELECT sal, COALESCE(comm, 0) FROM emp;
SELECT * FROM emp WHERE ISNULL(comm);
SELECT * FROM emp WHERE ISNOTNULL(comm);
SELECT sal, NVL(comm, 0) FROM emp;
SELECT NULLIF(1, 2), NULLIF(2, 1), NULLIF(1, 1);
UDTD 函数, User Defined Table-Generating Functions
用户自定义 表生成函数
一行输入,多行输出
SELECT EXPLODE(ARRAY(1, 2, 3, 4, 5));
SELECT EXPLODE(MAP('a', 1, 'b', 2, 'c', 3));
LATERAL VIEW udtf(expression) tableAlias AS columnAlias (',' columnAlias)*
fromClause: FROM baseTable (lateralView)*
WITH t1 AS (
SELECT 'OK' cola, SPLIT('www.lagou.com', '\\.') colb
)
SELECT cola, colc
FROM t1
LATERAL VIEW EXPLODE(colb) t2 AS colc
;
UDTF 案例
id tags
1 1,2,3
2 2,3
3 1,2
id e_tags
1 1
1 2
1 3
2 2
2 3
3 1
3 2
WITH t1 AS (
SELECT
id,
SPLIT(tags, '\\,') s_tags
FROM
tab1
)
SELECT id, e_tags
FROM t1
LATERAL VIEW EXPLODE(s_tags) t2 AS e_tags
;
lisi|Chinese:90,Math:80,English:70
wangwu|Chinese:88,Math:90,English:96
maliu|Chinese:99,Math:65,English:60
SELECT EXPLODE(score) AS (subject, mark) FROM stu_score;
SELECT name, subject, mark
FROM stu_score
LATERAL VIEW EXPLODE(score) t1 AS subject, mark
;
WITH tmp AS
(SELECT name, subject, mark
FROM stu_score
LATERAL VIEW EXPLODE(score) t1 AS subject, mark
)
SELECT name, MAX(mark) max_score
FROM tmp
GROUP BY name;
SELECT
name, mark
FROM
(SELECT name, subject, mark
FROM stu_score
LATERAL VIEW EXPLODE(score) t1 AS subject, mark
) t1
GROUP BY name
ORDER BY mark DESC
;
6.2 窗口函数
分析函数的一种
用于计算基于组的某种聚合值,但是聚合函数只能对每个组返回一行
窗口函数对于每个组可以返回多行
窗口函数指定了分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变化而变化。
OVER 关键字
OVER() 没有参数的话,默认是全部结果集
SELECT ename, sal, SUM(sal) OVER() AS total_sal FROM emp;
SELECT ename, sal, SUM(sal) OVER() AS total_sal,
CONCAT(ROUND(sal / SUM(sal) OVER()*100, 1), "%") ratio_sal
FROM emp;
PARTITION BY 子句
在 OVER 窗口中进行分区,对某一列进行分区统计,窗口大小是分区大小
SELECT ename, sal, deptno, SUM(sal) OVER(PARTITION BY deptno) sal_sum_dep
FROM emp;
ORDER BY 子句
SELECT ename, sal, deptno, SUM(sal) OVER(PARTITION BY deptno ORDER BY sal) sal_sum_dep
FROM emp;
ename sal deptno sal_sum_dep
MILLER 1300 1 1300
CLARK 2450 10 2450
KING 5000 10 7450
SMITH 800 20 800
ADAMS 1100 20 1900
JONES 2975 20 4875
SCOTT 3000 20 10875
FORD 3000 20 10875
JAMES 950 30 950
MARTIN 1250 30 3450
WARD 1250 30 3450
TURNER 1500 30 4950
ALLEN 1600 30 6550
BLAKE 2850 30 9400
window 子句
ROWS BETWEENT ... AND ...
对窗口的结果做更细粒度的划分
-
UNDOUNDED PRECEDING 组内第一行数据 -
n PRECEDING 组内当前行的 前 n 行数据 -
CURRENT ROW 当前行数据 -
n FOLLOWING 组内当前行的 后 n 行数据 -
UNBOUNDED FOLLOWING 组内最后一行数据
SELECT
ename,
sal,
deptno,
SUM(sal) OVER(PARTITION BY deptno ORDER BY ename ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) sum_sal
FROM
emp
;
SELECT
ename,
sal,
deptno,
SUM(sal) OVER(PARTITION BY deptno ORDER BY ename ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) sum_sal
FROM
emp
;
SELECT
ename,
sal,
deptno,
SUM(sal) OVER(PARTITION BY deptno ORDER BY ename ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING ) sum_sal
FROM
emp
;
排名函数
-
row_number, 行号连续,无并列排名 -
rank, 行号不连续,有并列排名 -
dense_rank, 行好连续,有并列排名
SELECT
ename, sal, deptno,
ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) row_number,
RANK() OVER(PARTITION BY deptno ORDER BY sal DESC) rank,
DENSE_RANK() OVER(PARTITION BY deptno ORDER BY sal DESC) dense_rank
FROM
emp
;
ename sal deptno row_number rank dense_rank
MILLER 1300 1 1 1 1
KING 5000 10 1 1 1
CLARK 2450 10 2 2 2
SCOTT 3000 20 1 1 1
FORD 3000 20 2 1 1
JONES 2975 20 3 3 2
ADAMS 1100 20 4 4 3
SMITH 800 20 5 5 4
BLAKE 2850 30 1 1 1
ALLEN 1600 30 2 2 2
TURNER 1500 30 3 3 3
MARTIN 1250 30 4 4 4
WARD 1250 30 5 4 4
JAMES 950 30 6 6 5
SELECT
ename, deptno, sal, sal_rank
FROM
(SELECT
ename, deptno, sal,
DENSE_RANK() OVER(PARTITION BY deptno ORDER BY sal DESC) sal_rank
FROM
emp) tmp
WHERE sal_rank <= 3
;
序列函数
-
lag, 返回当前行的前一行数据 -
lead, 返回当前行的下一行数据 -
first_value, 返回分组排序后,分组内的第一个值 -
last_value, 返回分组排序后,分组内的最后一个值 -
ntile, 将分组的数据按照顺序切分成 n 片,返回当前的切片序号/值
SELECT
uid, log_date,
LAG(log_date) OVER (PARTITION BY uid) lag
FROM
logging
;
SELECT
uid, log_date,
LEAD(log_date) OVER (PARTITION BY uid ORDER BY log_date) lag
FROM
logging
;
SELECT
uid, log_date,
FIRST_VALUE(log_date) OVER (PARTITION BY uid ORDER BY log_date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS first_logdate
FROM
logging
;
SELECT
uid, log_date,
LAST_VALUE(log_date) OVER (PARTITION BY uid ORDER BY log_date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS first_logdate
FROM
logging
;
SELECT
uid, log_date,
NTILE(3) OVER (PARTITION BY uid ORDER BY log_date) log_ntiles
FROM
logging
;
SQL 题目
数据
ab141 2021-03-09 1
SQL
SELECT * FROM logging WHERE status = 1;
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY uid) rank
FROM
logging
WHERE status = 1
;
SELECT
uid,
count(log_tag) log_cnt
FROM
(
SELECT
*,
DATE_SUB(log_date, ROW_NUMBER() OVER (PARTITION BY uid ORDER BY log_date)) log_tag
FROM
logging
WHERE status = 1
) tmp
GROUP BY uid, log_tag
HAVING log_cnt >= 3
;
SELECT
uid, log_tag,
count(log_tag) log_cnt
FROM
(
SELECT
*,
DATE_SUB(dt, ROW_NUMBER() OVER (PARTITION BY uid ORDER BY dt)) log_tag
FROM
ulogin
WHERE status = 1
) tmp
GROUP BY uid, log_tag
HAVING log_cnt >= 3
;
select
uid,
count(log_tag) log_cnt
from (
select
*,
date_sub(dt, row_number() over (partition by uid order by dt)) log_tag
from (
select
*
from (
select
*,
row_number() over (partition by uid, dt) dt_sort
from ulogin
where status = 1
) tmp where dt_sort = 1
) t2
) t3
group by uid, log_tag
;
1 1901 90
2 1901 90
3 1901 83
4 1901 60
5 1902 66
6 1902 23
7 1902 99
8 1902 67
9 1902 87
class score rank lagscore
1901 90 1 0
1901 90 1 0
1901 83 2 -7
1901 60 3 -23
1902 99 1 0
1902 87 2 -12
1902 67 3 -20
SELECT
*, DENSE_RANK() OVER (PARTITION BY class ORDER BY score DESC) score_sort
FROM
st
;
WITH tmp AS (
SELECT
*, DENSE_RANK() OVER (PARTITION BY class ORDER BY score DESC) score_sort
FROM
stu
)
SELECT
class, score, score_sort,
NVL(score - LAG(score) OVER (PARTITION BY class ORDER BY score DESC), 0) score_diff
FROM
tmp
WHERE score_sort <= 3
;
1 java
1 hadoop
1 hive
1 hbase
2 java
2 hive
2 spark
2 flink
3 java
3 hadoop
3 hive
3 kafka
id java hadoop hive hbase spark flink kafka
1 1 1 1 0 0 0
1 0 1 0 1 1 0
1 1 1 0 0 0 1
SELECT
id,
SUM(CASE WHEN course='java' THEN 1 ELSE 0 END) AS java,
SUM(CASE WHEN course='hadoop' THEN 1 ELSE 0 END) AS hadoop,
SUM(CASE WHEN course='hive' THEN 1 ELSE 0 END) AS hive,
SUM(CASE WHEN course='hbase' THEN 1 ELSE 0 END) AS hbase,
SUM(CASE WHEN course='spark' THEN 1 ELSE 0 END) AS spark,
SUM(CASE WHEN course='flink' THEN 1 ELSE 0 END) AS flink,
SUM(CASE WHEN course='kafka' THEN 1 ELSE 0 END) AS kafka
FROM
rowline1
GROUP BY id
;
a b 2
a b 1
a b 3
c d 6
c d 8
c d 8
id1 id2 flag
a b 2|1|3
c d 6|8
SELECT
id1, id2,
COLLECT_SET(flag) flag,
COLLECT_LIST(flag) flag1
FROM
rowline2
GROUP BY id1, id2
;
SELECT
id1, id2,
CONCAT_WS('|', COLLECT_SET(CAST (flag AS STRING))) flag_str
FROM
rowline2
GROUP BY id1, id2
;
SELECT
id1, id2,
SPLIT(flag_str, '\\|') flag_split
FROM
rowline3
;
SELECT
id1, id2, flag
FROM
rowline3
LATERAL VIEW EXPLODE(SPLIT(flag_str, '\\|')) tmp AS flag
;
6.3 自定义函数
三类:
- UDF(User Defined Function), 一进一出, 如 power()
- UDAF(User Defined Aggregation Function), 聚集函数,多进一出, 如 sum(), count()
- UDTF(User Defined Table-Generating Functions), 表生成函数,如 explode()
UDF 开发要点:
- 继承 org.apache.hadoop.hive.ql.exec.UDF
- 需要实现 evaluate 函数
- UDF 必须有返回类型,可以返回 null,但是不能为 void
UDF 开放步骤:
- 创建maven java 工程,添加依赖
- 开发java类继承UDF,实现evaluate 方法
- 将项目打包上传服务器
- 添加开发的jar包
- 设置函数与自定义函数关联
- 使用自定义函数
案例:
扩展 NVL 函数的功能
-
创建 maven java 工程,添加依赖 -
开发 java 类继承 UDF,实现 evaluate 方法 -
将项目打包上传服务器 -
添加开发的 jar 包 ADD JAR /home/hadoop/hiveudf.jar;
-
设置函数与自定义函数关联 创建临时函数 CREATE TEMPORARY FUNCTION mynvl AS "com.lagou.hive.udf.nvl";
创建永久函数
-
使用自定义函数
七、元数据管理与存储
7.1 元数据
在 Hive 中需要描述清楚表跟文件之间的映 射关系、列和字段之间的关系等等信息
Metadata 即元数据。元数据包含用 Hive 创建的 database、table、表的字段等元信 息。元数据存储在关系型数据库中。如 Hive 内置的 Derby、第三方如 MySQL 等
Metastore 即元数据服务,是 Hive 用来管理库表元数据的一个服务。有了它上层的 服务不用再跟裸的文件数据打交道,而是可以基于结构化的库表信息构建计算框架。
通过 metastore 服务将 Hive 的元数据暴露出去,而不是需要通过对 Hive 元数据库 mysql的访问才能拿到Hive的元数据信息; metastore 服务实际上就是一种 thrift 服 务,通过它用户可以获取到 Hive 元数据,并且通过 thrift 获取元数据的方式,屏蔽了数据库访问需要驱动,url,用户名,密码等细节
Metastore 三种配置方式
HiveServer 2
HiveServer 2 是一个服务端接口,使远程客户端可以执行对 Hive 的查询并返回结果。
基于 Thrift RPC 的实现是 HiveServer 的改进版本,并支持多客户端并发和身份验 证,启动 HiveServer 2 服务后,就可以使用 jdbc、odbc、thrift 的方式连接
HiveServer 2 的作用:
- 为 Hive 提供了一种允许客户端远程访问的服务
- 基于 thrift 协议,支持跨平台,跨编程语言对 Hive 访问
- 允许远程访问 Hive
HCatalog
Hive 目录 服务
HCatalog 提供了一个统一的元数据服务,允许不同的工具如 Pig、MapReduce 等通 过 HCatalog 直接访问存储在 HDFS 上的底层文件
HCatalog 提供了一个称为 hcat 的命令行工具。这个工具和 Hive 的命令行工具类 似,两者最大的不同就是 hcat 只接受不会产生 MapReduce 任务的命令。
7.2 数据存储
八、Hive 调优
影响 Hive 效率的不仅仅是数据量过大; 数据倾斜、数据冗余、job 或 I/O 过多、 MapReduce 分配不合理等因素都对 Hive 的效率有影响。
对 Hive 的调优既包含对 HiveQL 语句本身的优化,也包含 Hive 配置项 和 MR 方面的调 整
8.1 架构优化
执行引擎
Hive 支持多种执行引擎,分别是 MapReduce、Tez、Spark、Flink。可以通过 hive-site.xml 文件中的 hive.execution.engine 属性控制
优化器
与关系型数据库类似,Hive 会在真正执行计算之前,生成和优化逻辑执行计划与物理 执行计划。Hive 有两种优化器: Vectorize (向量化优化器) 与 Cost-Based Optimization (CBO 成本优化器)
-
矢量化查询执行 矢量化查询(要求执行引擎为 Tez )执行通过一次批量执行 1024 行而不是每行一行来提 高扫描,聚合,过滤器和连接等操作的性能,这个功能一显着缩短查询执行时间 set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
矢量化查询,必须用 ORC 格式存储数据
-
成本优化器 Hive 的 CBO 是基于 apache Calcite 的,Hive 的 CBO 通过查询成本(有analyze 收集的统 计信息)会生成有效率的执行计划,最终会减少执行的时间和资源的利用,使用 CBO 的配置如下 : SET hive.cbo.enable=true;
SET hive.compute.query.using.stats=true;
SET hive.stats.fetch.column.stats=true;
SET hive.stats.fetch.partition.stats=true;
定期执行表的分析,分析后的数据放在元数据库中
分区表
对于一张比较大的表,将其设计成分区表可以提升查询的性能,对于一个特定分区的 查询,只会加载对应分区路径的文件数据,所以执行速度会比较快
分区字段的选择是影响查询性能的重要因素,尽量避免层级较深的分区,这样会造成 太多的子文件夹
常见的分区字段:
- 日期或时间。如year、month、day或者hour,当表中存在时间或者日期字段时
- 地理位置。如国家、省份、城市等
- 业务逻辑。如部门、销售区域、客户等等
分桶表
与分区表类似,分桶表的组织方式是将HDFS上的文件分割成多个文件。
分桶可以加快数据采样,也可以提升 join 的性能(join 的字段是分桶字段),因为分桶可以确保某个 key 对应的数据在一个特定的桶内(文件),巧妙地选择分桶字段可以大幅 度提升 join 的性能。
文件格式
在 HiveQL 的 create table 语句中,可以使用 stored as … 指定表的存储格式。
Hive 表支持的存储格式有 TextFile、SequenceFile、RCFile、ORC、Parquet 等。
生产环境中绝大多数表都采用 TextFile、 ORC、Parquet 存储格式之一。
TextFile 是最简单的存储格式,它是纯文本记录,也是 Hive 的默认格式。其磁盘开销大,查询效率低,更多的是作为跳板来使用。RCFile、ORC、Parquet 等格式的表都不能由文件直接导入数据,必须由 TextFile 来做中转。
Parquet 和 ORC 都是 Apache 旗下的开源列式存储格式。列式存储比起传统的行式存储更适合批量 OLAP 查询,并且也支持更好的压缩和编码。选择 Parquet 的原因主要是它支持 Impala 查询引擎,并且对 update、delete 和事务性操作需求很低。
数据压缩
压缩技术可以减少 map 与 reduce 之间的数据传输,从而可以提升查询性能,关于压 缩的配置可以在 hive 的命令行中或者 hive-site.xml 文件中进行配置。
压缩的编码器可以配置在 mapred-site.xml, hive-site.xml 中:
SET hive.intermediate.compression.codec=org.apache.hadoop.io.compre ss.SnappyCodec ;
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodc
8.2 参数优化
- 本地模式
- 严格模式
- JVM 重用
- 并行执行
- 推测还行
- 合并小文件
- Fetch模式
本地模式
当 Hive 处理的数据量小时,启动分布式去处理数据会浪费资源,可以启用本地模式
SET hive.exec.mode.local.auto=true;
SET hive.exec.mode.local.auto.inputbytes.max=50000000; SET hive.exec.mode.local.auto.input.files.max=5;
严格模式
强制不允许用户执行 3 种有风险的 HiveQL 语句
- 查询分区表时,不限定分区列的语句
- 两表 join 产生笛卡尔积的语句
- 用 order by 排序,但是没有指定 limit 语句
开启严格模式的参数设置
SET sethive.mapred.mode=nostrict;
JVM 重用
默认情况下,Hadoop 会为一个 map 或者 reduce 启动一个 JVM。这种情况下,会并行执行 map 和 reduce
当遇到 map 或者 reduce 是几秒钟的轻量级的 job 时,JVM 启动进程所耗费的时间会比执行的时间要长,通过重用 JVM,共享 JVM 以串行的方式运行 map 或者 reduce
开启 JVM 重用,需要配置最大的 task 数量,其默认值为 1, 如果设置为 -1 则表示不限制 重用 JVM 的 task 的数量
SET mapreduce.job.jvm.numtasks=5;
并行执行
Hive 的查询通常会被转换成一系列的 stage,这些 stage 之间并不是一直相互依赖的, 可以并行执行这些 stage
配置:
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=16;
并行执行可以增加集群资源的利用率,如果集群的资源使用率已经很高了,那么并行执行的效果不会很明显。
推测执行
在分布式集群环境下,因为程序 Bug、负载不均衡、资源分布不均等原因,会造成同 一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任 务(比如一个作业的某个任务进度只有 50%,而其他所有任务已经运行完毕),则这 些任务会拖慢作业的整体执行进度。
为了避免这种情况发生,Hadoop 采用了推测执行机制,它根据一定的规则推测出 “拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理 同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
配置:
set mapreduce.map.speculative=true
set mapreduce.reduce.speculative=true
set hive.mapred.reduce.tasks.speculative.execution=true
合并小文件
- 在 map 执行前 合并小文件,减少 map 数量
set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInput Format;
SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
SET hive.merge.size.per.task = 268435456;
SET hive.merge.smallfiles.avgsize = 16777216;
Fetch 模式
Fetch 模式是指 Hive 中对某些情况的查询可以不必使用 MapReduce 计算。select col1, col2 from tab; 可以简单地读取表对应的存储目录下的文件,然后输出查询结果到控制台。在开启 fetch 模式之后,在全局查找、字段查找、limit 查找等都不启动 MapReduce。
hive.fetch.task.conversion = more
8.3 SQL 优化
列裁剪 和 分区裁剪
- 列裁剪,查询时,只读取需要的列
- 分区裁剪,查询时,只读取需要的分区
sort by 代替 order by
order by 是某字段的全局排序,会导致所有的 map 端数据都进入同一个 reducer 中
sort by 会根据实际情况,启动多个 reducer 进行排序,每个 reducer 内部有序
为了控制 map 端数据分配到 reducer 的 key, 需要配合 distribute by 一同使用,否则 map 端数据会随机分配到 不同的 reducer
group by 代替 count(distinct)
count(distinct) 的逻辑会导致很少的 reducer 处理
select count(distinct uid)
from tab1;
select count(1)
from (
select uid
from tab1
group by uid
) tmp
;
外层的 count(1) 和 group by 会启动两个 MR job, 因此确保数据量大到启动两个 job 的耗时远小于 计算耗时才划算
group by 配置
group by 时,如果先起一个 combiner 在 map 端做部分预聚合,可以有效减少shuffle 数据量。
set hive.map.aggr = true;
Map端进行聚合操作的条目数
set hive.groupby.mapaggr.checkinterval = 100000;
当 map 预先聚合的行数超过该值时,会拆分 job
group by 时如果某些key对应的数据量过大,就会发生数据倾斜。Hive 自带了一个均 衡数据倾斜的配置项 hive.groupby.skewindata ,默认值 false
其实现方法是在 group by 时启动两个 MR job。第一个 job 会将 map 端数据随机输入 reducer,每个 reducer 做部分聚合,相同的 key 就会分布在不同的 reducer 中。第二 个 job 再将前面预处理过的数据按key聚合并输出结果,这样就起到了均衡的效果
join 基础优化
Hive join 三种方式
-
common join
操作简单,但是性能较差(需要将数据分区,有 shuffle 阶段)
-
map join
大表与小表连接,小表数据量(25 M) 可以完全加载到内存 不会有 reduce 阶段,连接在 map 端完成
-
bucket map join 通过两个表分桶在执行连接时会将小表的每个分桶映射成 hash 表,每个 task 节点都需要这个小表的所有 hash 表,但是在执行时只需要加载该 task 所持有大表分 桶对应的小表部分的 hash 表就可以,所以对内存的要求是能够加载小表中最大的 hash 块即可
小表与大表的分桶数量需要是倍数关系,这个是因为分桶策略决定的,分桶时会根据分桶字段对桶数取余后决定哪个桶的,所以要保证成倍数关系。
-
利用 map join 特性 -
利用 分桶表 map join
处理空值 或者 无意义值
这些值的数据量大的时候,会导致集中到同一个分区里,需要用 where 语句过滤掉
需要保留数据的话,将这些数据 添加随机数 打散到不同的分区
key 数据倾斜
添加随机数
调整 map 数
根据
调整 map 数量
对于 小文件采用策略是 合并
对于 复杂文件 采用的策略是 增加 map 数
computeSliteSize(max(minSize, min(maxSize, blocksize))) = blocksize
调整 reduce 数
参数 hive.exec.reducers.bytes.per.reducer 用来设定每个reducer能够处 理的最大数据量,默认值 256 M
参数 用来设定每个 job 的最大 reducer 数量,默认值 999(1.2 版本之前)或 1009(1.2版本之后)
reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)
reducer 数量太多,会产生大量小文件;太少,每个 reducer 需要处理更多的数据,会耗费更多的时间
8.4 优化总结
深入理解 Hadoop 的核心能力,对Hive优化很有帮助。Hadoop/Hive 处理数据过 程,有几个显著特征:
- 不怕数据多,就怕数据倾斜
- 对 job 数比较多的作业运行效率相对比较低,比如即使有几百行的表,多次关联 多次汇总,产生十几个jobs,执行也需要较长的时间。MapReduce 作业初始化 的时间是比较长的
- 对sum、count等聚合操作而言,不存在数据倾斜问题
- count(distinct) 效率较低,数据量大容易出问题
从大的方面来说,优化可以从几个方面着手:
- 好的模型设计,事半功倍
- 解决数据倾斜问题。仅仅依靠参数解决数据倾斜,是通用的优化手段,收获有 限。开发人员应该熟悉业务,了解数据规律,通过业务逻辑解决数据倾斜往往更 可靠
- 减少 job 数
- 设置合理的map、reduce task 数
- 对小文件进行合并,是行之有效的提高Hive效率的方法
- 优化把握整体,单一作业的优化不如整体最优
8.5 优化案例
建表语句
create table if not exists tuning.student_txt(
s_no string comment '学号',
s_name string comment '姓名',
s_birth string comment '出生日期',
s_age int comment '年龄',
s_sex string comment '性别',
s_score int comment '综合得分',
s_desc string comment '自我介绍'
)
row format delimited
fields terminated by '\t';
SQL 案例
查询 student_txt 表中,每个年龄最晚出生的人的出生日期,将结果存入到 student_stat 表中
CREATE TABLE student_stat (
age INT,
brith STRING
)
PARTITIONED BY (tp STRING)
;
set hive.exec.dynamic.partition.mode=true;
INSERT OVERWRITE TABLE student_stat PARTITION(tp)
SELECT s_age, MAX(s_birth) stat, 'max' tp
FROM student_txt
GROUP BY s_age
UNION ALL
SELECT s_age, MIN(s_birth) stat, 'min' tp
FROM student_txt
GROUP BY s_age
;
查看执行计划
EXPLAIN
SELECT * FROM student_txt;
EXPLAIN
SELECT s_score, MAX(s_birth)
FROM student_txt
WHERE s_age > 22
GROUP BY s_score
LIMIT 88
;
EXPLAIN
SELECT s_age, MAX(s_birth) stat, 'max' tp
FROM student_txt
GROUP BY s_age
UNION ALL
SELECT s_age, MIN(s_birth) stat, 'min' tp
FROM student_txt
GROUP BY s_age
;
一条 Hive SQL 语句会包含一个或多个Stage,不同的 Stage 间会存在着依赖关系
一个Stage可以是:
- Mapreduce 任务(最耗费资源)
- Move Operator (数据移动)
- Stats-Aggr Operator (搜集统计数据)
- Fetch Operator (读取数据)等
默认情况下,Hive一次只执行一个stage
执行过程中的要点
- 问题1: SQL 执行过程中有多少个 Stage(job)
- 问题2: 为什么在 Stage-1、Stage-9 中有 9 个 Map Task、9 个 Reduce Task
- 问题3: SQL 语句是否能优化,如何优化
EXPLAIN
INSERT OVERWRITE TABLE student_stat PARTITION(tp)
SELECT s_age, MAX(s_birth) stat, 'max' tp
FROM student_txt
GROUP BY s_age
UNION ALL
SELECT s_age, MIN(s_birth) stat, 'min' tp
FROM student_txt
GROUP BY s_age
;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1, Stage-9
Stage-8 depends on stages: Stage-2 , consists of Stage-5, Stage-4, Stage-6
Stage-5
Stage-0 depends on stages: Stage-5, Stage-4, Stage-7
Stage-3 depends on stages: Stage-0
Stage-4
Stage-6
Stage-7 depends on stages: Stage-6
Stage-9 is a root stage
stage 结构图
问题 2
hive (tuning)> set mapred.max.split.size;
mapred.max.split.size=256000000
hive (tuning)> set hive.exec.reducers.bytes.per.reducer;
hive.exec.reducers.bytes.per.reducer=256000000
Data size: 2193190912
2193190912 / 256000000 = 8.567152 = 9
SQL 优化
-
减少 Map Reduce task 的数量
即 增大 map 和 reduce 的大小,但是得注意选择合适的 map reduce 数量
-
减少 stage 使用Hive多表插入语句。可以在同一个查询中使用多个 insert 子句,这样的好处是 只需要扫描一次源表就可以生成多个不相交的输出。 from tab1
insert overwrite table tab2 partition (age)
select name, address, school,age
insert overwrite table tab3
select name, address
where age>24;
从同一张表选取数据,可以将选取的数据插入其他不同的表中(也可以是相同的表)
将 “from 表名”,放在SQL语句的头部
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
from student_txt
insert overwrite table student_stat partition(tp)
select s_age, max(s_birth) stat, 'max' tp
group by s_age
insert overwrite table student_stat partition(tp)
select s_age, min(s_birth) stat, 'min' tp
group by s_age;
善用文件格式,优化 SQL
create table student_parquet
stored as parquet
as
select * from student_txt;
select count(1) from student_parquet;
create table student_stat_parquet
stored as parquet
as
select * from student_stat where 1>2;
drop table student_stat_parquet;
create table student_stat_parquet
(age int,
b string)
partitioned by (tp string)
stored as parquet;
SQL 优化总结:
九、Hive 案例
数据:
建表语句
CREATE DATABASE IF NOT EXISTS sale;
CREATE TABLE sale.dimdate_ori (
dt date,
yearmonth int,
year smallint,
month tinyint,
day tinyint,
week tinyint,
weeks tinyint,
quat tinyint,
tendays tinyint,
halfmonth tinyint
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ","
;
CREATE TABLE sale.sale_ori (
orderid string,
locationid string,
dt date
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ","
;
CREATE TABLE sale.saledetail_ori (
orderid string,
rownum int,
goods string,
num int,
price double,
amount double
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ","
;
CREATE TABLE sale.dimdate (
dt date,
yearmonth int,
year smallint,
month tinyint,
day tinyint,
week tinyint,
weeks tinyint,
quat tinyint,
tendays tinyint,
halfmonth tinyint
)
STORED AS ORC
;
CREATE TABLE sale.sale (
orderid string,
locationid string,
dt date
)
STORED AS ORC
;
CREATE TABLE sale.saledetail (
orderid string,
rownum int,
goods string,
num int,
price double,
amount double
)
STORED AS ORC
;
加载数据
USE sale;
LOAD DATA LOCAL INPATH '/home/hadoop/data/tbDate.dat' OVERWRITE INTO TABLE dimdate_ori;
LOAD DATA LOCAL INPATH '/home/hadoop/data/tbSale.dat' OVERWRITE INTO TABLE sale_ori;
LOAD DATA LOCAL INPATH '/home/hadoop/data/tbSaleDetail.dat' OVERWRITE INTO TABLE saledetail_ori;
INSERT INTO TABLE dimdate SELECT * FROM dimdate_ori;
INSERT INTO TABLE sale SELECT * FROM sale_ori;
INSERT INTO TABLE saledetail SELECT * FROM saledetail_ori;
9.1 SQL
- 按年统计销售额
SELECT
year(s.dt) year, round(sum(sd.amount)/10000, 2) amount
FROM
saledetail AS sd
JOIN sale AS s
ON sd.orderid = s.orderid
GROUP BY year(s.dt)
;
- 销售金额在 10W 以上的订单
SELECT
orderid, round(sum(amount), 2) amount_cal
FROM
saledetail
GROUP BY orderid
HAVING sum(amount) > 100000
;
- 每年销售额的差值
SELECT
year, amount,
lag(amount) OVER (ORDER BY year) last_year_amount,
round(amount - lag(amount) OVER (ORDER BY year), 2) diff
FROM (
SELECT
year(s.dt) year, round(sum(sd.amount)/10000, 2) amount
FROM
saledetail AS sd
JOIN sale AS s
ON sd.orderid = s.orderid
GROUP BY year(s.dt)
) tmp
;
- 年度订单金额前10位(年度、订单号、订单金额、排名)
SELECT
year, orderid, amount_id amount, amount_rank
FROM (
SELECT
year, orderid, amount_id,
row_number() OVER (PARTITION BY year ORDER BY amount_id DESC) amount_rank
FROM (
SELECT
year(s.dt) year, sd.orderid, round(sum(sd.amount), 2) amount_id
FROM
saledetail sd
JOIN sale s
ON sd.orderid = s.orderid
GROUP BY s.dt, sd.orderid
) tmp
) tmp2
WHERE amount_rank <= 10
WITH tmp AS (
SELECT
year(s.dt) year, sd.orderid, round(sum(sd.amount), 2) amount_id
FROM
saledetail sd
JOIN sale s
ON sd.orderid = s.orderid
GROUP BY s.dt, sd.orderid
)
SELECT
year, orderid, amount_id amount, amount_rank
FROM (
SELECT
year, orderid, amount_id,
row_number() OVER (PARTITION BY year ORDER BY amount_id DESC) amount_rank
FROM tmp
) tmp2
WHERE amount_rank <= 10
;
- 季度订单金额前10位(年度、季度、订单id、订单金额、排名)
SELECT
year, quat, orderid, round(amount_order, 2) amount, amount_rank
FROM (
SELECT
year,
quat,
orderid,
amount_order,
row_number() OVER (PARTITION BY year, quat ORDER BY amount_order DESC) amount_rank
FROM (
SELECT
year(tmp.dt) year, d.quat, orderid, sum(amount) amount_order
FROM (
SELECT
s.dt, sd.orderid, sd.amount
FROM
saledetail sd
JOIN sale s
ON sd.orderid = s.orderid
) tmp
JOIN dimdate d
ON tmp.dt = d.dt
GROUP BY year(tmp.dt), d.quat, tmp.orderid
) tmp2
) tmp3
WHERE amount_rank <= 10
;
WITH tmp1 AS (
SELECT
year(tmp.dt) year, d.quat, orderid, sum(amount) amount_order
FROM (
SELECT
s.dt, sd.orderid, sd.amount
FROM
saledetail sd
JOIN sale s
ON sd.orderid = s.orderid
) tmp
JOIN dimdate d
ON tmp.dt = d.dt
GROUP BY year(tmp.dt), d.quat, tmp.orderid
)
SELECT
year, quat, orderid, round(amount_order, 2) amount, amount_rank
FROM (
SELECT
year,
quat,
orderid,
amount_order,
row_number() OVER (PARTITION BY year, quat ORDER BY amount_order DESC) amount_rank
FROM
tmp1
) tmp3
WHERE amount_rank <= 10
;
- 求所有交易日中订单金额最高的前10位
WITH tmp AS (
SELECT
s.dt, sd.orderid, sum(sd.amount) amount
FROM
saledetail sd
JOIN sale s
ON sd.orderid = s.orderid
GROUP BY s.dt, sd.orderid
)
SELECT
dt, orderid, round(amount, 2) amount, amount_rank
FROM (
SELECT
dt, orderid, amount, row_number() OVER (PARTITION BY dt ORDER BY amount DESC) amount_rank
FROM
tmp
) tmp2
WHERE amount_rank <= 10
;
- 每年度销售额最大的交易日
WITH tmp AS (
SELECT
s.dt, sum(amount) amount
FROM
saledetail sd
JOIN sale s
ON sd.orderid = s.orderid
GROUP BY s.dt
)
SELECT
year(dt) year,
round(max(amount), 2) max_amount_day
FROM
tmp
GROUP BY year(dt)
;
WITH tmp1 AS (
SELECT
dt, amount,
row_number() OVER (PARTITION BY year(dt) ORDER BY amount DESC) amount_rank
FROM (
SELECT
s.dt, sum(amount) amount
FROM
saledetail sd
JOIN sale s
ON sd.orderid = s.orderid
GROUP BY s.dt
) tmp
)
SELECT
year(dt) year,
dt,
round(amount, 2) max_amount_day
FROM
tmp1
WHERE amount_rank = 1
;
- 年度最畅销的商品(即每年销售金额最大的商品)
WITH tmp AS (
SELECT
year(s.dt) year, sd.goods, round(sum(amount), 2) amount_goods
FROM
saledetail sd
JOIN sale s
ON sd.orderid = s.orderid
GROUP BY year(s.dt), sd.goods
)
SELECT
year, goods, amount_goods
FROM (
SELECT
year, goods, amount_goods,
row_number() OVER (PARTITION BY year ORDER BY amount_goods DESC) rank
FROM
tmp
) tmp2
WHERE rank = 1
;
Hue 数据交互工具
1. 安装、编译
-
解压 hue -
安装依赖
yum install ant asciidoc cyrus-sasl-devel cyrus-sasl-gssapi cyrus-sasl-plain gcc gcc-c++ krb5-devel libffi-devel libxml2-devel libxslt-devel make mysql mysql-devel openldap-devel python-devel sqlite-devel gmp-devel
- 安装 maven
解压 maven 压缩包,添加环境变量
- 编译
cd /opt/software/hue-release-4.3.0
PREFIX=/opt/lagou/servers/ make install
- 修改集群配置
<property> <name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property> <name>dfs.permissions.enabled</name>
<value>false</value>
</property>
<property>
<name>hadoop.proxyuser.hue.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hue.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hdfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hdfs.groups</name>
<value>*</value>
</property>
增加 httpfs-site.xml 配置文件
<configuration>
<property>
<name>httpfs.proxyuser.hue.hosts</name>
<value>*</value>
</property>
<property>
<name>httpfs.proxyuser.hue.groups</name>
<value>*</value>
</property>
</configuration>
- 修改 Hue 配置
cd /opt/lagou.servers/hue
cd desktop/conf
cp pseudo-distributed.ini.tmpl pseudo-distributed.ini
vi vi pseudo-distributed.ini
http_host=linux122
http_port=8000
is_hue_4=true
time_zone=Asia/Shanghai
dev=true
server_user=hue
server_group=hue
default_user=hue
app_blacklist=search
engine=mysql
host=linux123
port=3306
user=hive
password=12345678
name=hue
hadoop_conf_dir=/opt/lagou/servers/hadoop-2.9.2/etc/hadoop
mysql> create database hue;
build/env/bin/hue syncdb
build/env/bin/hue migrate
groupadd hue
useradd -g hue hue
./supervisor
2. 整合 Hadoop, Hive
app_blacklist=search
fs_defaultfs=hdfs://linux121:9000
webhdfs_url=http://linux121:50070/webhdfs/v1
hadoop_conf_dir=/opt/lagou/servers/hadoop-2.9.2/etc/hadoop
resourcemanager_host=linux123
resourcemanager_port=8032
submit_to=True
resourcemanager_api_url=http://linux123:8088
proxy_api_url=http://linux123:8088
history_server_api_url=http://linux123:19888
开启 hue 的 WebUI(linux122:8000) 前,需要启动集群,以及 hiveserver2
|