【尚硅谷】电商数仓V4.0丨大数据数据仓库项目实战【学习记录】
思考问题?
1. 为什么用hive on spark来处理数据?
2. 什么是零点漂移问题,怎么解决?
数据传输的过程中,是有延迟的。所以会存在将近零点的数据从采取日志的flume经过kafka缓冲,消费kafka数据的flume,再到HDFS中。HDFS会按照linux时间可能会判断出这是第二天的数据。
flume时间拦截器:能够按照日志的时间写入到HDFS中。
3. 事实表有哪几类?
事务型事实表:适用于不会发生变化的业务,同步策略:增量同步 周期型快照事实表:适用于不关心明细操作,只关心结果的业务。全量同步 累计型快照事实表: 适用于会发生周期性变化的业务。新增及变化同步
4. 维度建模的四个过程?
- 选择业务过程:确定有哪些事实表
- 声明粒度: 最小粒度,确认每张事实表中的每行数据所指代的内容
- 确认维度: 确定维度表
- 确认事实: 确定度量值
5.hive在装载dim后,会出现为null值的一行,为什么?
因为hive在insert数据时,会对map的小文件自动合并导致的。 insert会解析成计算任务,MR或者Spark计算任务,会去读取ODS层与商品相关的业务数据。 而这些业务数据是以lzo压缩+其索引存储的 所以hive在从这张表读取数据的时候,会误把这两个文件当成小文件进行合并。 也就是把索引文件误当成数据文件进行合并了。 从而会导致,索引文件没了,就不能切片了。 可以关闭map端的小文件合并选项。
6.为什么要做拉链表
能够更加高效的存储历史状态
1. 数据仓库概念
业务数据,用户行为数据,爬出数据
如何处理数据? Flume采集用户行为数据(开源免费,使用广泛,简单效率高)
Sqoop(底层是hadoop,4个map)处理业务数据
- mysql > Hive
- 可以通过JDBC来实现,
- Hadoop也可以在重写Inputformat接口是使用JDBC的方式来实现
1.1 原始数据备份到ODS中
1.2 DWD完成数据的清洗
1.3 join形成大的分表(DWS,DWT)
join一次去查数据 DWS:按天聚合数据,按照主题聚合 DWT: 累加数据进行存储
1.4 ADS 结果报表
存储在Mysql中
1.5 输出
1.5.1 报表系统
便于展示,柱状图,饼状图,折线图
1.5.2 用户画像
统计类标签 规则类标签 机器学习类标签
1.5.3 推荐系统
构建推荐模型
1.5.4 机器学习
语言,图像,
2. 项目需求及架构设计
2.1 项目需求
产品经理提出的项目需求: 老板,客户,设计等人提出
项目技术如何选型? 框架版本如何选项(Apache,CDH,HDP)? 服务器使用物理机还是云主机? 如何确实集群规模?
- 用户行为数据采集平台搭建,输入是什么样?怎样处理?输出是什么样?需要用kafka做数据缓存吗?
- 业务数据采集平台搭建
- 数据仓库的维度建模 核心
- 分析,设备,会员,商品,地区,活动等电商核心主题,统计的报表指标近100个
- 采用即席查询工具,随时进行指标分析。(临时的指标,快速查询到结果)
- 对集群性能进行监控,发生异常需要报警。
- 元数据管理 hive的元数据, 当前任务挂掉后,会影响哪些后续的任务,任务的影响范围。
- 质量监控(指标差异?是否正常)
- 权限管理(表的级别,字段的级别)
2.2 项目框架
2.2.1 技术选型
数据量大小: 大的 hive, 小的数据量 sql 业务需求:实时? 技术成熟度: 数仓1.0 — 数据中台(大厂) — 数据湖 开发维护成本:物理机(专业运维,对应风扇,地方)和阿里云 总成本预算:
数据采集传输:Flume(文件日志),sqoop(业务数据),Kafka(数据量大的时候,先缓冲到kafka),logstash(也能处理文件日志,ELK框架,数据量小,负责程度不高),DataX(市场占有率和Sqoop55开)
数据存储:MySql(存储的是小量的数据,ADS中的数据,几M的数据),HDFS(能存储海量的数据),Hbase(存储的是KyLin,多维分析,快速查询,结果数据存到Hbase),Redis(实时计算会用到),MongoDB(尤其爬虫数据)
数据计算:Hive(底层走mr,计算慢),Tez(消耗内存大,查询速度快),Spark(部分数据在内存,部分数据在磁盘),Flink,Storm(用来处理实时运算)
数据查询 :Presto(快速查询,Apache用这个),Kylin,Impala(CDH版用这个),Druid和ClickHouse(快速实时查询,实时数仓的查询)
数据可视化: Echarts,Superset,前面两个开源免费,QuickBI(离线数据展示),DataV(快速实时数据展示)
任务调度: Azkaban(简单实用,上手快),Oozie(功能多),DolphinScheduler(国人开发),Airflow(python所写)
集群监控: Zabbix, Prometheus
元数据管理: Atlas(几千个,几万个任务)
权限管理: Ranger, Sentry
2.2.2 系统数据流程设计
Nginx 起负载均衡的作用,均匀分配到每台服务器中。 日志文件保存30天。 kafka 消峰
2.2.3 框架发行版本选型
1)如何选择Apache/CDH/HDP版本? (1)Apache:运维麻烦,组件间兼容性需要自己调研。(一般大厂使用,技术实力雄厚,有专业的运维人员)(建议使用) (2)CDH:国内使用最多的版本,但CM不开源,今年开始收费,一个节点1万美金/年。 (3)HDP:开源,可以进行二次开发,但是没有CDH稳定,国内使用较少 2)云服务选择 (1)阿里云的EMR、MaxCompute、Data Works (2)亚马逊云EMR (3)腾讯云EMR (4)华为云EMR
2.2.4 服务器选型
服务器选择物理机还是云主机?
1)物理机: 以128G内存,20核物理CPU,40线程,8THDD和2TSSD硬盘,戴尔品牌单台报价4W出头。一般物理机寿命5年左右。 需要有专业的运维人员,平均一个月1万。电费也是不少的开销。
2)云主机: 云主机:以阿里云为例,差不多相同配置,每年5W。 很多运维工作都由阿里云完成,运维相对较轻松
3)企业选择 金融有钱公司和阿里没有直接冲突的公司选择阿里云 中小公司、为了融资上市,选择阿里云,拉倒融资后买物理机。 有长期打算,资金比较足,选择物理机。
2.2.5 集群规模
1)如何确认集群规模?(假设:每台服务器8T磁盘,128G内存) (1)每天日活跃用户100万,每人一天平均100条:100万*100条=1亿条 (2)每条日志1K左右,每天1亿条:100000000/1024/1024=约100G (3)半年内不扩容服务器来算:100G*180天=约18T (4)保存3副本:18T*3=54T (5)预留20%~30%Buf=54T/0.7=77T (6)算到这:约8T*10台服务器
2)如果考虑数仓分层?数据采用压缩?需要重新再计算
2.2.6 集群资源规划设计
在企业中通常会搭建一套生产集群和一套测试集群。生产集群运行生产任务,测试集群用于上线前代码编写和测试。
1)生产集群 (1)消耗内存的分开(nn,rm) (2)数据传输数据比较紧密的放在一起(Kafka 、Zookeeper) (3)客户端(hive客户端,spark客户端)尽量放在一到两台服务器上,方便外部访问(风险高一点) (4)有依赖关系的尽量放到同一台服务器(例如:Hive和Azkaban Executor)
2)测试集群服务器规划
3. 用户行为采集平台
3.2.2 埋点数据上报时机
3.3.6 环境变量配置说明
4.5 采集日志Flume
log->Flume->kafka
a1.sources=r1
a1.channels=c1
a1.sources.r1.type =TRILDIR
a1.sources.r1.filegroups =f1
a1.sources.r1.filegroups.f1=opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
a1.sources.r1.channels=c1
Flume拦截器
- 继承Interceptor接口
- 实现initialize,intercept,close方法
- 继承Builder 接口
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
public class ETLInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
if (JSONUtils.isJSONValidate(log)) {
return event;
} else {
return null;
}
}
@Override
public List<Event> intercept(List<Event> list) {
Iterator<Event> iterator = list.iterator();
while (iterator.hasNext()){
Event next = iterator.next();
if(intercept(next)==null){
iterator.remove();
}
}
return list;
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new ETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
@Override
public void close() {
}
}
JSONUtils 类
package com.atguigu.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
public class JSONUtils {
public static boolean isJSONValidate(String log){
try {
JSON.parse(log);
return true;
}catch (JSONException e){
return false;
}
}
}
结束进程
ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9
4.6 消费kafka数据flume
可以 kafka channel + hdfs sink,效率高。
这次用filechannel
FileChannel底层原理
编写Flume时间戳拦截器
package com.atguigu.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TimeStampInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
JSONObject jsonObject = JSONObject.parseObject(log);
String ts = jsonObject.getString("ts");
headers.put("timestamp",ts);
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TimeStampInterceptor();
}
@Override
public void configure(Context context) {
}
}
@Override
public void close() {
}
}
配置conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
启动命令
/opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1
2.电商业务数据采集平台
熟悉业务
SKU(库存量基本单位), SPU商品信息聚合的最小单位 平台属性, 销售属性
流程:
- 查看各个表名,有啥关系
- 针对每个表,查看表名和行数据
- 了解表结构之后,熟悉了业务流程,其数据是如何发生变化的
- 在开发过程中通过sqoop导入数据时,不可以写*,因为*代表着hive中的表顺序要完全和mysql保持一致,如果mysql字段顺序改了,hive中的数据代表的意思就不一样了。
sql
启动sql
[wanghaha@hadoop102 software]$ sudo systemctl start mysqld
[wanghaha@hadoop102 software]$ sudo systemctl status mysqld
● mysqld.service - MySQL Server
Loaded: loaded (/usr/lib/systemd/system/mysqld.service; enabled; vendor preset: disabled)
Active: active (running) since 六 2022-04-23 13:12:52 CST; 1 day 4h ago
Docs: man:mysqld(8)
http://dev.mysql.com/doc/refman/en/using-systemd.html
Process: 1477 ExecStart=/usr/sbin/mysqld --daemonize --pid-file=/var/run/mysqld/mysqld.pid $MYSQLD_OPTS (code=exited, status=0/SUCCESS)
Process: 1037 ExecStartPre=/usr/bin/mysqld_pre_systemd (code=exited, status=0/SUCCESS)
Main PID: 1480 (mysqld)
Tasks: 27
CGroup: /system.slice/mysqld.service
└─1480 /usr/sbin/mysqld --daemonize --pid-file=/var/run/mysqld/mysqld.pid
4月 23 13:12:50 hadoop102 systemd[1]: Starting MySQL Server...
4月 23 13:12:52 hadoop102 systemd[1]: Started MySQL Server.
同步策略
数据同步策略的类型包括:全量同步、增量同步、新增及变化同步、特殊情况 ? 全量表:存储完整的数据。 ? 增量表:存储新增加的数据。 ? 新增及变化表:存储新增加的数据和变化的数据。 ? 特殊表:只需要存储一次。
采用不同的同步策略
编写sqoop同步命令 将mysql中order_info 表数据导入到HDFS的/test路径
全量同步
bin/sqoop import \
--connect jdbc:mysql://hadoop102:3306/gmall \
--username root \
--password 000000 \
--query 'select * from order_info where $CONDITIONS '\
--target-dir /order_info/2020-06-15 \
--delete-target-dir \
--fields-terminated-by '\t' \
--num-mappers 2 \
--split-by id
增量同步
bin/sqoop import \
--connect jdbc:mysql://hadoop102:3306/gmall \
--username root \
--password 000000 \
--query 'select * from order_info where (date_format(create_time,'%Y-%m-%d')='2020-06-15' or date_format(operate_time,'%Y-%m-%d')='2020-06-15') and $CONDITIONS '\
--target-dir /order_info/2020-06-15 \
--delete-target-dir \
--fields-terminated-by '\t' \
--num-mappers 2 \
--split-by id
2.5.2 业务数据每日同步脚本
(1)在/home/atguigu/bin目录下创建
[atguigu@hadoop102 bin]$ vim mysql_to_hdfs_init.sh
添加如下内容: 注意点:
- sql存储的null值和hive中存储的null不一样,hive中存储的形式是/n。Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用–input-null-string和–input-null-non-string两个参数。导入数据时采用–null-string和–null-non-string
- 要采用lzo压缩且支持压缩,必须提前创建索引
- 需要传入两个参数,第一个是表名,第二个是同步数据的第一天日期
- 在开发过程中通过sqoop导入数据时,不可以写*,因为*代表着hive中的表顺序要完全和mysql保持一致,如果mysql字段顺序改了,hive中的数据代表的意思就不一样了。
#! /bin/bash
APP=gmall
sqoop=/opt/module/sqoop/bin/sqoop
if [ -n "$2" ] ;then
do_date=$2
else
echo "请传入日期参数"
exit
fi
import_data(){
$sqoop import \
--connect jdbc:mysql://hadoop102:3306/$APP \
--username root \
--password 000000 \
--target-dir /origin_data/$APP/db/$1/$do_date \
--delete-target-dir \
--query "$2 where \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/$APP/db/$1/$do_date
}
import_order_info(){
import_data order_info "select
id,
total_amount,
order_status,
user_id,
payment_way,
delivery_address,
out_trade_no,
create_time,
operate_time,
expire_time,
tracking_no,
province_id,
activity_reduce_amount,
coupon_reduce_amount,
original_total_amount,
feight_fee,
feight_fee_reduce
from order_info"
}
import_coupon_use(){
import_data coupon_use "select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time,
expire_time
from coupon_use"
}
import_order_status_log(){
import_data order_status_log "select
id,
order_id,
order_status,
operate_time
from order_status_log"
}
import_user_info(){
import_data "user_info" "select
id,
login_name,
nick_name,
name,
phone_num,
email,
user_level,
birthday,
gender,
create_time,
operate_time
from user_info"
}
import_order_detail(){
import_data order_detail "select
id,
order_id,
sku_id,
sku_name,
order_price,
sku_num,
create_time,
source_type,
source_id,
split_total_amount,
split_activity_amount,
split_coupon_amount
from order_detail"
}
import_payment_info(){
import_data "payment_info" "select
id,
out_trade_no,
order_id,
user_id,
payment_type,
trade_no,
total_amount,
subject,
payment_status,
create_time,
callback_time
from payment_info"
}
import_comment_info(){
import_data comment_info "select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
from comment_info"
}
import_order_refund_info(){
import_data order_refund_info "select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
refund_status,
create_time
from order_refund_info"
}
import_sku_info(){
import_data sku_info "select
id,
spu_id,
price,
sku_name,
sku_desc,
weight,
tm_id,
category3_id,
is_sale,
create_time
from sku_info"
}
import_base_category1(){
import_data "base_category1" "select
id,
name
from base_category1"
}
import_base_category2(){
import_data "base_category2" "select
id,
name,
category1_id
from base_category2"
}
import_base_category3(){
import_data "base_category3" "select
id,
name,
category2_id
from base_category3"
}
import_base_province(){
import_data base_province "select
id,
name,
region_id,
area_code,
iso_code,
iso_3166_2
from base_province"
}
import_base_region(){
import_data base_region "select
id,
region_name
from base_region"
}
import_base_trademark(){
import_data base_trademark "select
id,
tm_name
from base_trademark"
}
import_spu_info(){
import_data spu_info "select
id,
spu_name,
category3_id,
tm_id
from spu_info"
}
import_favor_info(){
import_data favor_info "select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from favor_info"
}
import_cart_info(){
import_data cart_info "select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time,
source_type,
source_id
from cart_info"
}
import_coupon_info(){
import_data coupon_info "select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
limit_num,
taken_count,
start_time,
end_time,
operate_time,
expire_time
from coupon_info"
}
import_activity_info(){
import_data activity_info "select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from activity_info"
}
import_activity_rule(){
import_data activity_rule "select
id,
activity_id,
activity_type,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from activity_rule"
}
import_base_dic(){
import_data base_dic "select
dic_code,
dic_name,
parent_code,
create_time,
operate_time
from base_dic"
}
import_order_detail_activity(){
import_data order_detail_activity "select
id,
order_id,
order_detail_id,
activity_id,
activity_rule_id,
sku_id,
create_time
from order_detail_activity"
}
import_order_detail_coupon(){
import_data order_detail_coupon "select
id,
order_id,
order_detail_id,
coupon_id,
coupon_use_id,
sku_id,
create_time
from order_detail_coupon"
}
import_refund_payment(){
import_data refund_payment "select
id,
out_trade_no,
order_id,
sku_id,
payment_type,
trade_no,
total_amount,
subject,
refund_status,
create_time,
callback_time
from refund_payment"
}
import_sku_attr_value(){
import_data sku_attr_value "select
id,
attr_id,
value_id,
sku_id,
attr_name,
value_name
from sku_attr_value"
}
import_sku_sale_attr_value(){
import_data sku_sale_attr_value "select
id,
sku_id,
spu_id,
sale_attr_value_id,
sale_attr_id,
sale_attr_name,
sale_attr_value_name
from sku_sale_attr_value"
}
case $1 in
"order_info")
import_order_info
;;
"base_category1")
import_base_category1
;;
"base_category2")
import_base_category2
;;
"base_category3")
import_base_category3
;;
"order_detail")
import_order_detail
;;
"sku_info")
import_sku_info
;;
"user_info")
import_user_info
;;
"payment_info")
import_payment_info
;;
"base_province")
import_base_province
;;
"base_region")
import_base_region
;;
"base_trademark")
import_base_trademark
;;
"activity_info")
import_activity_info
;;
"cart_info")
import_cart_info
;;
"comment_info")
import_comment_info
;;
"coupon_info")
import_coupon_info
;;
"coupon_use")
import_coupon_use
;;
"favor_info")
import_favor_info
;;
"order_refund_info")
import_order_refund_info
;;
"order_status_log")
import_order_status_log
;;
"spu_info")
import_spu_info
;;
"activity_rule")
import_activity_rule
;;
"base_dic")
import_base_dic
;;
"order_detail_activity")
import_order_detail_activity
;;
"order_detail_coupon")
import_order_detail_coupon
;;
"refund_payment")
import_refund_payment
;;
"sku_attr_value")
import_sku_attr_value
;;
"sku_sale_attr_value")
import_sku_sale_attr_value
;;
"all")
import_base_category1
import_base_category2
import_base_category3
import_order_info
import_order_detail
import_sku_info
import_user_info
import_payment_info
import_base_region
import_base_province
import_base_trademark
import_activity_info
import_cart_info
import_comment_info
import_coupon_use
import_coupon_info
import_favor_info
import_order_refund_info
import_order_status_log
import_spu_info
import_activity_rule
import_base_dic
import_order_detail_activity
import_order_detail_coupon
import_refund_payment
import_sku_attr_value
import_sku_sale_attr_value
;;
esac
说明1:
[ -n 变量值 ] 判断变量的值,是否为空
-- 变量的值,非空,返回true
-- 变量的值,为空,返回false
说明2: 查看date命令的使用,[atguigu@hadoop102 ~]$ date --help (2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod +x mysql_to_hdfs_init.sh
2)脚本使用
[atguigu@hadoop102 bin]$ mysql_to_hdfs_init.sh all 2020-06-14
2.5.3 业务数据每日同步脚本
1)脚本编写 (1)在/home/atguigu/bin目录下创建
[atguigu@hadoop102 bin]$ vim mysql_to_hdfs.sh
添加如下内容:
#! /bin/bash
APP=gmall
sqoop=/opt/module/sqoop/bin/sqoop
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d '-1 day' +%F`
fi
import_data(){
$sqoop import \
--connect jdbc:mysql://hadoop102:3306/$APP \
--username root \
--password 000000 \
--target-dir /origin_data/$APP/db/$1/$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/$APP/db/$1/$do_date
}
import_order_info(){
import_data order_info "select
id,
total_amount,
order_status,
user_id,
payment_way,
delivery_address,
out_trade_no,
create_time,
operate_time,
expire_time,
tracking_no,
province_id,
activity_reduce_amount,
coupon_reduce_amount,
original_total_amount,
feight_fee,
feight_fee_reduce
from order_info
where (date_format(create_time,'%Y-%m-%d')='$do_date'
or date_format(operate_time,'%Y-%m-%d')='$do_date')"
}
import_coupon_use(){
import_data coupon_use "select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time,
expire_time
from coupon_use
where (date_format(get_time,'%Y-%m-%d')='$do_date'
or date_format(using_time,'%Y-%m-%d')='$do_date'
or date_format(used_time,'%Y-%m-%d')='$do_date'
or date_format(expire_time,'%Y-%m-%d')='$do_date')"
}
import_order_status_log(){
import_data order_status_log "select
id,
order_id,
order_status,
operate_time
from order_status_log
where date_format(operate_time,'%Y-%m-%d')='$do_date'"
}
import_user_info(){
import_data "user_info" "select
id,
login_name,
nick_name,
name,
phone_num,
email,
user_level,
birthday,
gender,
create_time,
operate_time
from user_info
where (DATE_FORMAT(create_time,'%Y-%m-%d')='$do_date'
or DATE_FORMAT(operate_time,'%Y-%m-%d')='$do_date')"
}
import_order_detail(){
import_data order_detail "select
id,
order_id,
sku_id,
sku_name,
order_price,
sku_num,
create_time,
source_type,
source_id,
split_total_amount,
split_activity_amount,
split_coupon_amount
from order_detail
where DATE_FORMAT(create_time,'%Y-%m-%d')='$do_date'"
}
import_payment_info(){
import_data "payment_info" "select
id,
out_trade_no,
order_id,
user_id,
payment_type,
trade_no,
total_amount,
subject,
payment_status,
create_time,
callback_time
from payment_info
where (DATE_FORMAT(create_time,'%Y-%m-%d')='$do_date'
or DATE_FORMAT(callback_time,'%Y-%m-%d')='$do_date')"
}
import_comment_info(){
import_data comment_info "select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
from comment_info
where date_format(create_time,'%Y-%m-%d')='$do_date'"
}
import_order_refund_info(){
import_data order_refund_info "select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
refund_status,
create_time
from order_refund_info
where date_format(create_time,'%Y-%m-%d')='$do_date'"
}
import_sku_info(){
import_data sku_info "select
id,
spu_id,
price,
sku_name,
sku_desc,
weight,
tm_id,
category3_id,
is_sale,
create_time
from sku_info where 1=1"
}
import_base_category1(){
import_data "base_category1" "select
id,
name
from base_category1 where 1=1"
}
import_base_category2(){
import_data "base_category2" "select
id,
name,
category1_id
from base_category2 where 1=1"
}
import_base_category3(){
import_data "base_category3" "select
id,
name,
category2_id
from base_category3 where 1=1"
}
import_base_province(){
import_data base_province "select
id,
name,
region_id,
area_code,
iso_code,
iso_3166_2
from base_province
where 1=1"
}
import_base_region(){
import_data base_region "select
id,
region_name
from base_region
where 1=1"
}
import_base_trademark(){
import_data base_trademark "select
id,
tm_name
from base_trademark
where 1=1"
}
import_spu_info(){
import_data spu_info "select
id,
spu_name,
category3_id,
tm_id
from spu_info
where 1=1"
}
import_favor_info(){
import_data favor_info "select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from favor_info
where 1=1"
}
import_cart_info(){
import_data cart_info "select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time,
source_type,
source_id
from cart_info
where 1=1"
}
import_coupon_info(){
import_data coupon_info "select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
limit_num,
taken_count,
start_time,
end_time,
operate_time,
expire_time
from coupon_info
where 1=1"
}
import_activity_info(){
import_data activity_info "select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from activity_info
where 1=1"
}
import_activity_rule(){
import_data activity_rule "select
id,
activity_id,
activity_type,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from activity_rule
where 1=1"
}
import_base_dic(){
import_data base_dic "select
dic_code,
dic_name,
parent_code,
create_time,
operate_time
from base_dic
where 1=1"
}
import_order_detail_activity(){
import_data order_detail_activity "select
id,
order_id,
order_detail_id,
activity_id,
activity_rule_id,
sku_id,
create_time
from order_detail_activity
where date_format(create_time,'%Y-%m-%d')='$do_date'"
}
import_order_detail_coupon(){
import_data order_detail_coupon "select
id,
order_id,
order_detail_id,
coupon_id,
coupon_use_id,
sku_id,
create_time
from order_detail_coupon
where date_format(create_time,'%Y-%m-%d')='$do_date'"
}
import_refund_payment(){
import_data refund_payment "select
id,
out_trade_no,
order_id,
sku_id,
payment_type,
trade_no,
total_amount,
subject,
refund_status,
create_time,
callback_time
from refund_payment
where (DATE_FORMAT(create_time,'%Y-%m-%d')='$do_date'
or DATE_FORMAT(callback_time,'%Y-%m-%d')='$do_date')"
}
import_sku_attr_value(){
import_data sku_attr_value "select
id,
attr_id,
value_id,
sku_id,
attr_name,
value_name
from sku_attr_value
where 1=1"
}
import_sku_sale_attr_value(){
import_data sku_sale_attr_value "select
id,
sku_id,
spu_id,
sale_attr_value_id,
sale_attr_id,
sale_attr_name,
sale_attr_value_name
from sku_sale_attr_value
where 1=1"
}
case $1 in
"order_info")
import_order_info
;;
"base_category1")
import_base_category1
;;
"base_category2")
import_base_category2
;;
"base_category3")
import_base_category3
;;
"order_detail")
import_order_detail
;;
"sku_info")
import_sku_info
;;
"user_info")
import_user_info
;;
"payment_info")
import_payment_info
;;
"base_province")
import_base_province
;;
"activity_info")
import_activity_info
;;
"cart_info")
import_cart_info
;;
"comment_info")
import_comment_info
;;
"coupon_info")
import_coupon_info
;;
"coupon_use")
import_coupon_use
;;
"favor_info")
import_favor_info
;;
"order_refund_info")
import_order_refund_info
;;
"order_status_log")
import_order_status_log
;;
"spu_info")
import_spu_info
;;
"activity_rule")
import_activity_rule
;;
"base_dic")
import_base_dic
;;
"order_detail_activity")
import_order_detail_activity
;;
"order_detail_coupon")
import_order_detail_coupon
;;
"refund_payment")
import_refund_payment
;;
"sku_attr_value")
import_sku_attr_value
;;
"sku_sale_attr_value")
import_sku_sale_attr_value
;;
"all")
import_base_category1
import_base_category2
import_base_category3
import_order_info
import_order_detail
import_sku_info
import_user_info
import_payment_info
import_base_trademark
import_activity_info
import_cart_info
import_comment_info
import_coupon_use
import_coupon_info
import_favor_info
import_order_refund_info
import_order_status_log
import_spu_info
import_activity_rule
import_base_dic
import_order_detail_activity
import_order_detail_coupon
import_refund_payment
import_sku_attr_value
import_sku_sale_attr_value
;;
esac (2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod +x mysql_to_hdfs.sh
2)脚本使用
[atguigu@hadoop102 bin]$ mysql_to_hdfs.sh all 2020-06-15
3. 数据仓库系统
1 数仓分层
2 数仓理论
2.1 范式理论
2.1.1 范式概念
1)定义 数据建模必须遵循一定的规则,在关系建模中,这种规则就是范式。
2)目的 采用范式,可以降低数据的冗余性。 为什么要降低数据冗余性? (1)十几年前,磁盘很贵,为了减少磁盘存储。 (2)以前没有分布式系统,都是单机,只能增加磁盘,磁盘个数也是有限的 (3)一次修改,需要修改多个表,很难保证数据一致性
3)缺点 范式的缺点是获取数据时,需要通过Join拼接出最后的数据。
4)分类 目前业界范式有:第一范式(1NF)、第二范式(2NF)、第三范式(3NF)、巴斯-科德范式(BCNF)、第四范式(4NF)、第五范式(5NF)。
2.1.2 函数依赖
2.1.3 三范式区分
第一范式: 属性不可切割 第二范式: 不能存在 非主键字段部分函数依赖主键字段的现象 第三范式: 不能存在 非主键字段传递函数依赖主键字段的现象
2.2 关系建模与维度建模
关系建模和维度建模是两种数据仓库的建模技术。关系建模由Bill Inmon所倡导,维度建模由Ralph Kimball所倡导。
2.2.1 关系建模
关系建模将复杂的数据抽象为两个概念——实体和关系,并使用规范化的方式表示出来。关系模型如图所示,从图中可以看出,较为松散、零碎,物理表数量多。 关系模型严格遵循第三范式(3NF),数据冗余程度低,数据的一致性容易得到保证。由于数据分布于众多的表中,查询会相对复杂,在大数据的场景下,查询效率相对较低。
2.2.2 维度建模
维度模型如图所示,从图中可以看出,模型相对清晰、简洁。
图 维度模型示意图 维度模型以数据分析作为出发点,不遵循三范式,故数据存在一定的冗余。维度模型面向业务,将业务用事实表和维度表呈现出来。表结构简单,故查询简单,查询效率较高。
2.3 维度表和事实表(重点)
2.3.1 维度表
维度表:一般是对事实的描述信息。每一张维表对应现实世界中的一个对象或者概念。 例如:用户、商品、日期、地区等。 维表的特征: ? 维表的范围很宽(具有多个属性、列比较多) ? 跟事实表相比,行数相对较小:通常< 10万条 ? 内容相对固定:编码表
时间维度表:
2.3.2 事实表
事实表中的每行数据代表一个业务事件(下单、支付、退款、评价等)。“事实”这个术语表示的是业务事件的度量值(可统计次数、个数、金额等),例如,2020年5月21日,宋宋老师在京东花了250块钱买了一瓶海狗人参丸。维度表:时间、用户、商品、商家。事实表:250块钱、一瓶 每一个事实表的行包括:具有可加性的数值型的度量值、与维表相连接的外键,通常具有两个和两个以上的外键。 事实表的特征: ? 非常的大 ? 内容相对的窄:列数较少(主要是外键id和度量值) ? 经常发生变化,每天会新增加很多。
1)事务型事实表 以每个事务或事件为单位,例如一个销售订单记录,一笔支付记录等,作为事实表里的一行数据。一旦事务被提交,事实表数据被插入,数据就不再进行更改,其更新方式为增量更新。
2)周期型快照事实表 周期型快照事实表中不会保留所有数据,只保留固定时间间隔的数据,例如每天或者每月的销售额,或每月的账户余额等。 例如购物车,有加减商品,随时都有可能变化,但是我们更关心每天结束时这里面有多少商品,方便我们后期统计分析。
3)累积型快照事实表 累计快照事实表用于跟踪业务事实的变化。例如,数据仓库中可能需要累积或者存储订单从下订单开始,到订单商品被打包、运输、和签收的各个业务阶段的时间点数据来跟踪订单声明周期的进展情况。当这个业务过程进行时,事实表的记录也要不断更新。
2.4 维度模型分类
在维度建模的基础上又分为三种模型:星型模型、雪花模型、星座模型。
2.5 数据仓库建模(绝对重点)
2.5.1 ODS层
1)HDFS用户行为数据 一张表,一个字段string,按天分区
2)HDFS业务数据 同步哪些表,建那些表。 类型 字段 分区:按天分区
3)针对HDFS上的用户行为数据和业务数据,我们如何规划处理? (1)保持数据原貌不做任何修改,起到备份数据的作用。 (2)数据采用压缩,减少磁盘存储空间(例如:原始数据100G,可以压缩到10G左右) (3)创建分区表,防止后续的全表扫描
2.5.2 DIM层和DWD层
DIM层DWD层需构建维度模型,一般采用星型模型,呈现的状态一般为星座模型。 维度建模一般按照以下四个步骤: 选择业务过程→声明粒度→确认维度→确认事实
(1)选择业务过程 在业务系统中,挑选我们感兴趣的业务线,比如下单业务,支付业务,退款业务,物流业务,一条业务线对应一张事实表。
确定有哪些事实表
(2)声明粒度 数据粒度指数据仓库的数据中保存数据的细化程度或综合程度的级别。 声明粒度意味着精确定义事实表中的一行数据表示什么,应该尽可能选择最小粒度,以此来应各种各样的需求。 典型的粒度声明如下: 订单事实表中一行数据表示的是一个订单中的一个商品项。 支付事实表中一行数据表示的是一个支付记录。
(3)确定维度 维度的主要作用是描述业务是事实,主要表示的是“谁,何处,何时”等信息。 确定维度的原则是:后续需求中是否要分析相关维度的指标。例如,需要统计,什么时间下的订单多,哪个地区下的订单多,哪个用户下的订单多。需要确定的维度就包括:时间维度、地区维度、用户维度。
确定每张事实表中的维度外键有哪些?
(4)确定事实 此处的“事实”一词,指的是业务中的度量值(次数、个数、件数、金额,可以进行累加),例如订单金额、下单次数等。
确定每张事实表中的度量值
在DWD层,以业务过程为建模驱动,基于每个具体业务过程的特点,构建最细粒度的明细层事实表。事实表可做适当的宽表化处理。 事实表和维度表的关联比较灵活,但是为了应对更复杂的业务需求,可以将能关联上的表尽量关联上。
至此,数据仓库的维度建模已经完毕,DWD层是以业务过程为驱动。 DWS层、DWT层和ADS层都是以需求为驱动,和维度建模已经没有关系了。 DWS和DWT都是建宽表,按照主题去建表。主题相当于观察问题的角度。对应着维度表。
2.5.3 DWS层与DWT层
DWS层和DWT层统称宽表层,这两层的设计思想大致相同,通过以下案例进行阐述。 1)问题引出:两个需求,统计每个省份订单的个数、统计每个省份订单的总金额
2)处理办法:都是将省份表和订单表进行join,group by省份,然后计算。同样数据被计算了两次,实际上类似的场景还会更多。 那怎么设计能避免重复计算呢? 针对上述场景,可以设计一张地区宽表,其主键为地区ID,字段包含为:下单次数、下单金额、支付次数、支付金额等。上述所有指标都统一进行计算,并将结果保存在该宽表中,这样就能有效避免数据的重复计算。
3)总结: (1)需要建哪些宽表:以维度为基准。 (2)宽表里面的字段:是站在不同维度的角度去看事实表,重点关注事实表聚合后的度量值。 (3)DWS和DWT层的区别:DWS层存放的所有主题对象当天的汇总行为,例如每个地区当天的下单次数,下单金额等,DWT层存放的是所有主题对象的累积行为,例如每个地区最近7天(15天、30天、60天)的下单次数、下单金额等。
2.5.4 ADS层
对电商系统各大主题指标分别进行分析。
第4章 数仓搭建-ODS层
1)保持数据原貌不做任何修改,起到备份数据的作用。 2)数据采用LZO压缩,减少磁盘存储空间。100G数据可以压缩到10G以内。 3)创建分区表,防止后续的全表扫描,在企业开发中大量使用分区表。 4)创建外部表。在企业开发中,除了自己用的临时表,创建内部表外,绝大多数场景都是创建外部表。
4.1 ODS层(用户行为数据)
4.1.1 创建日志表ods_log
1)创建支持lzo压缩的分区表 (1)建表语句
drop table if exists ods_log;
CREATE EXTERNAL TABLE ods_log (`line` string)
PARTITIONED BY (`dt` string)
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/gmall/ods/ods_log'
;
2)加载数据
hive (gmall)>
load data inpath '/origin_data/gmall/log/topic_log/2020-06-14' into table ods_log partition(dt='2020-06-14');
注意:时间格式都配置成YYYY-MM-DD格式,这是Hive默认支持的时间格式
3)为lzo压缩文件创建索引
[atguigu@hadoop102 bin]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /warehouse/gmall/ods/ods_log/dt=2020-06-14
4.1.2 Shell中单引号和双引号区别
1)在/home/atguigu/bin创建一个test.sh文件
[atguigu@hadoop102 bin]$ vim test.sh
在文件中添加如下内容
#!/bin/bash
do_date=$1
echo '$do_date'
echo "$do_date"
echo "'$do_date'"
echo '"$do_date"'
echo `date`
2)查看执行结果
[atguigu@hadoop102 bin]$ test.sh 2020-06-14
$do_date
2020-06-14
'2020-06-14'
"$do_date"
2020年 06月 18日 星期四 21:02:08 CST
3)总结: (1)单引号不取变量值 (2)双引号取变量值 (3)反引号`,执行引号中命令 (4)双引号内部嵌套单引号,取出变量值 (5)单引号内部嵌套双引号,不取出变量值
4.1.3 ODS层日志表加载数据脚本
1)编写脚本 (1)在hadoop102的/home/atguigu/bin目录下创建脚本
[atguigu@hadoop102 bin]$ vim hdfs_to_ods_log.sh
在脚本中编写如下内容
#!/bin/bash
APP=gmall
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
echo ================== 日志日期为 $do_date ==================
sql="
load data inpath '/origin_data/$APP/log/topic_log/$do_date' into table ${APP}.ods_log partition(dt='$do_date');
"
hive -e "$sql"
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /warehouse/$APP/ods/ods_log/dt=$do_date
(2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 hdfs_to_ods_log.sh
2)脚本使用 (1)执行脚本
[atguigu@hadoop102 module]$ hdfs_to_ods_log.sh 2020-06-14
(2)查看导入数据
4.2.28 ODS层业务表首日数据装载脚本
#!/bin/bash
APP=gmall
if [ -n "$2" ] ;then
do_date=$2
else
echo "请传入日期参数"
exit
fi
ods_order_info="
load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table ${APP}.ods_order_info partition(dt='$do_date');"
ods_order_detail="
load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table ${APP}.ods_order_detail partition(dt='$do_date');"
ods_sku_info="
load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table ${APP}.ods_sku_info partition(dt='$do_date');"
ods_user_info="
load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table ${APP}.ods_user_info partition(dt='$do_date');"
ods_payment_info="
load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table ${APP}.ods_payment_info partition(dt='$do_date');"
ods_base_category1="
load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table ${APP}.ods_base_category1 partition(dt='$do_date');"
ods_base_category2="
load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table ${APP}.ods_base_category2 partition(dt='$do_date');"
ods_base_category3="
load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table ${APP}.ods_base_category3 partition(dt='$do_date'); "
ods_base_trademark="
load data inpath '/origin_data/$APP/db/base_trademark/$do_date' OVERWRITE into table ${APP}.ods_base_trademark partition(dt='$do_date'); "
ods_activity_info="
load data inpath '/origin_data/$APP/db/activity_info/$do_date' OVERWRITE into table ${APP}.ods_activity_info partition(dt='$do_date'); "
ods_cart_info="
load data inpath '/origin_data/$APP/db/cart_info/$do_date' OVERWRITE into table ${APP}.ods_cart_info partition(dt='$do_date'); "
ods_comment_info="
load data inpath '/origin_data/$APP/db/comment_info/$do_date' OVERWRITE into table ${APP}.ods_comment_info partition(dt='$do_date'); "
ods_coupon_info="
load data inpath '/origin_data/$APP/db/coupon_info/$do_date' OVERWRITE into table ${APP}.ods_coupon_info partition(dt='$do_date'); "
ods_coupon_use="
load data inpath '/origin_data/$APP/db/coupon_use/$do_date' OVERWRITE into table ${APP}.ods_coupon_use partition(dt='$do_date'); "
ods_favor_info="
load data inpath '/origin_data/$APP/db/favor_info/$do_date' OVERWRITE into table ${APP}.ods_favor_info partition(dt='$do_date'); "
ods_order_refund_info="
load data inpath '/origin_data/$APP/db/order_refund_info/$do_date' OVERWRITE into table ${APP}.ods_order_refund_info partition(dt='$do_date'); "
ods_order_status_log="
load data inpath '/origin_data/$APP/db/order_status_log/$do_date' OVERWRITE into table ${APP}.ods_order_status_log partition(dt='$do_date'); "
ods_spu_info="
load data inpath '/origin_data/$APP/db/spu_info/$do_date' OVERWRITE into table ${APP}.ods_spu_info partition(dt='$do_date'); "
ods_activity_rule="
load data inpath '/origin_data/$APP/db/activity_rule/$do_date' OVERWRITE into table ${APP}.ods_activity_rule partition(dt='$do_date');"
ods_base_dic="
load data inpath '/origin_data/$APP/db/base_dic/$do_date' OVERWRITE into table ${APP}.ods_base_dic partition(dt='$do_date'); "
ods_order_detail_activity="
load data inpath '/origin_data/$APP/db/order_detail_activity/$do_date' OVERWRITE into table ${APP}.ods_order_detail_activity partition(dt='$do_date'); "
ods_order_detail_coupon="
load data inpath '/origin_data/$APP/db/order_detail_coupon/$do_date' OVERWRITE into table ${APP}.ods_order_detail_coupon partition(dt='$do_date'); "
ods_refund_payment="
load data inpath '/origin_data/$APP/db/refund_payment/$do_date' OVERWRITE into table ${APP}.ods_refund_payment partition(dt='$do_date'); "
ods_sku_attr_value="
load data inpath '/origin_data/$APP/db/sku_attr_value/$do_date' OVERWRITE into table ${APP}.ods_sku_attr_value partition(dt='$do_date'); "
ods_sku_sale_attr_value="
load data inpath '/origin_data/$APP/db/sku_sale_attr_value/$do_date' OVERWRITE into table ${APP}.ods_sku_sale_attr_value partition(dt='$do_date'); "
ods_base_province="
load data inpath '/origin_data/$APP/db/base_province/$do_date' OVERWRITE into table ${APP}.ods_base_province;"
ods_base_region="
load data inpath '/origin_data/$APP/db/base_region/$do_date' OVERWRITE into table ${APP}.ods_base_region;"
case $1 in
"ods_order_info"){
hive -e "$ods_order_info"
};;
"ods_order_detail"){
hive -e "$ods_order_detail"
};;
"ods_sku_info"){
hive -e "$ods_sku_info"
};;
"ods_user_info"){
hive -e "$ods_user_info"
};;
"ods_payment_info"){
hive -e "$ods_payment_info"
};;
"ods_base_category1"){
hive -e "$ods_base_category1"
};;
"ods_base_category2"){
hive -e "$ods_base_category2"
};;
"ods_base_category3"){
hive -e "$ods_base_category3"
};;
"ods_base_trademark"){
hive -e "$ods_base_trademark"
};;
"ods_activity_info"){
hive -e "$ods_activity_info"
};;
"ods_cart_info"){
hive -e "$ods_cart_info"
};;
"ods_comment_info"){
hive -e "$ods_comment_info"
};;
"ods_coupon_info"){
hive -e "$ods_coupon_info"
};;
"ods_coupon_use"){
hive -e "$ods_coupon_use"
};;
"ods_favor_info"){
hive -e "$ods_favor_info"
};;
"ods_order_refund_info"){
hive -e "$ods_order_refund_info"
};;
"ods_order_status_log"){
hive -e "$ods_order_status_log"
};;
"ods_spu_info"){
hive -e "$ods_spu_info"
};;
"ods_activity_rule"){
hive -e "$ods_activity_rule"
};;
"ods_base_dic"){
hive -e "$ods_base_dic"
};;
"ods_order_detail_activity"){
hive -e "$ods_order_detail_activity"
};;
"ods_order_detail_coupon"){
hive -e "$ods_order_detail_coupon"
};;
"ods_refund_payment"){
hive -e "$ods_refund_payment"
};;
"ods_sku_attr_value"){
hive -e "$ods_sku_attr_value"
};;
"ods_sku_sale_attr_value"){
hive -e "$ods_sku_sale_attr_value"
};;
"ods_base_province"){
hive -e "$ods_base_province"
};;
"ods_base_region"){
hive -e "$ods_base_region"
};;
"all"){
hive -e "$ods_order_info$ods_order_detail$ods_sku_info$ods_user_info$ods_payment_info$ods_base_category1$ods_base_category2$ods_base_category3$ods_base_trademark$ods_activity_info$ods_cart_info$ods_comment_info$ods_coupon_info$ods_coupon_use$ods_favor_info$ods_order_refund_info$ods_order_status_log$ods_spu_info$ods_activity_rule$ods_base_dic$ods_order_detail_activity$ods_order_detail_coupon$ods_refund_payment$ods_sku_attr_value$ods_sku_sale_attr_value$ods_base_province$ods_base_region"
};;
esac
#!/bin/bash
APP=gmall
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
ods_order_info="
load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table ${APP}.ods_order_info partition(dt='$do_date');"
ods_order_detail="
load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table ${APP}.ods_order_detail partition(dt='$do_date');"
ods_sku_info="
load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table ${APP}.ods_sku_info partition(dt='$do_date');"
ods_user_info="
load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table ${APP}.ods_user_info partition(dt='$do_date');"
ods_payment_info="
load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table ${APP}.ods_payment_info partition(dt='$do_date');"
ods_base_category1="
load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table ${APP}.ods_base_category1 partition(dt='$do_date');"
ods_base_category2="
load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table ${APP}.ods_base_category2 partition(dt='$do_date');"
ods_base_category3="
load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table ${APP}.ods_base_category3 partition(dt='$do_date'); "
ods_base_trademark="
load data inpath '/origin_data/$APP/db/base_trademark/$do_date' OVERWRITE into table ${APP}.ods_base_trademark partition(dt='$do_date'); "
ods_activity_info="
load data inpath '/origin_data/$APP/db/activity_info/$do_date' OVERWRITE into table ${APP}.ods_activity_info partition(dt='$do_date'); "
ods_cart_info="
load data inpath '/origin_data/$APP/db/cart_info/$do_date' OVERWRITE into table ${APP}.ods_cart_info partition(dt='$do_date'); "
ods_comment_info="
load data inpath '/origin_data/$APP/db/comment_info/$do_date' OVERWRITE into table ${APP}.ods_comment_info partition(dt='$do_date'); "
ods_coupon_info="
load data inpath '/origin_data/$APP/db/coupon_info/$do_date' OVERWRITE into table ${APP}.ods_coupon_info partition(dt='$do_date'); "
ods_coupon_use="
load data inpath '/origin_data/$APP/db/coupon_use/$do_date' OVERWRITE into table ${APP}.ods_coupon_use partition(dt='$do_date'); "
ods_favor_info="
load data inpath '/origin_data/$APP/db/favor_info/$do_date' OVERWRITE into table ${APP}.ods_favor_info partition(dt='$do_date'); "
ods_order_refund_info="
load data inpath '/origin_data/$APP/db/order_refund_info/$do_date' OVERWRITE into table ${APP}.ods_order_refund_info partition(dt='$do_date'); "
ods_order_status_log="
load data inpath '/origin_data/$APP/db/order_status_log/$do_date' OVERWRITE into table ${APP}.ods_order_status_log partition(dt='$do_date'); "
ods_spu_info="
load data inpath '/origin_data/$APP/db/spu_info/$do_date' OVERWRITE into table ${APP}.ods_spu_info partition(dt='$do_date'); "
ods_activity_rule="
load data inpath '/origin_data/$APP/db/activity_rule/$do_date' OVERWRITE into table ${APP}.ods_activity_rule partition(dt='$do_date');"
ods_base_dic="
load data inpath '/origin_data/$APP/db/base_dic/$do_date' OVERWRITE into table ${APP}.ods_base_dic partition(dt='$do_date'); "
ods_order_detail_activity="
load data inpath '/origin_data/$APP/db/order_detail_activity/$do_date' OVERWRITE into table ${APP}.ods_order_detail_activity partition(dt='$do_date'); "
ods_order_detail_coupon="
load data inpath '/origin_data/$APP/db/order_detail_coupon/$do_date' OVERWRITE into table ${APP}.ods_order_detail_coupon partition(dt='$do_date'); "
ods_refund_payment="
load data inpath '/origin_data/$APP/db/refund_payment/$do_date' OVERWRITE into table ${APP}.ods_refund_payment partition(dt='$do_date'); "
ods_sku_attr_value="
load data inpath '/origin_data/$APP/db/sku_attr_value/$do_date' OVERWRITE into table ${APP}.ods_sku_attr_value partition(dt='$do_date'); "
ods_sku_sale_attr_value="
load data inpath '/origin_data/$APP/db/sku_sale_attr_value/$do_date' OVERWRITE into table ${APP}.ods_sku_sale_attr_value partition(dt='$do_date'); "
ods_base_province="
load data inpath '/origin_data/$APP/db/base_province/$do_date' OVERWRITE into table ${APP}.ods_base_province;"
ods_base_region="
load data inpath '/origin_data/$APP/db/base_region/$do_date' OVERWRITE into table ${APP}.ods_base_region;"
case $1 in
"ods_order_info"){
hive -e "$ods_order_info"
};;
"ods_order_detail"){
hive -e "$ods_order_detail"
};;
"ods_sku_info"){
hive -e "$ods_sku_info"
};;
"ods_user_info"){
hive -e "$ods_user_info"
};;
"ods_payment_info"){
hive -e "$ods_payment_info"
};;
"ods_base_category1"){
hive -e "$ods_base_category1"
};;
"ods_base_category2"){
hive -e "$ods_base_category2"
};;
"ods_base_category3"){
hive -e "$ods_base_category3"
};;
"ods_base_trademark"){
hive -e "$ods_base_trademark"
};;
"ods_activity_info"){
hive -e "$ods_activity_info"
};;
"ods_cart_info"){
hive -e "$ods_cart_info"
};;
"ods_comment_info"){
hive -e "$ods_comment_info"
};;
"ods_coupon_info"){
hive -e "$ods_coupon_info"
};;
"ods_coupon_use"){
hive -e "$ods_coupon_use"
};;
"ods_favor_info"){
hive -e "$ods_favor_info"
};;
"ods_order_refund_info"){
hive -e "$ods_order_refund_info"
};;
"ods_order_status_log"){
hive -e "$ods_order_status_log"
};;
"ods_spu_info"){
hive -e "$ods_spu_info"
};;
"ods_activity_rule"){
hive -e "$ods_activity_rule"
};;
"ods_base_dic"){
hive -e "$ods_base_dic"
};;
"ods_order_detail_activity"){
hive -e "$ods_order_detail_activity"
};;
"ods_order_detail_coupon"){
hive -e "$ods_order_detail_coupon"
};;
"ods_refund_payment"){
hive -e "$ods_refund_payment"
};;
"ods_sku_attr_value"){
hive -e "$ods_sku_attr_value"
};;
"ods_sku_sale_attr_value"){
hive -e "$ods_sku_sale_attr_value"
};;
"all"){
hive -e "$ods_order_info$ods_order_detail$ods_sku_info$ods_user_info$ods_payment_info$ods_base_category1$ods_base_category2$ods_base_category3$ods_base_trademark$ods_activity_info$ods_cart_info$ods_comment_info$ods_coupon_info$ods_coupon_use$ods_favor_info$ods_order_refund_info$ods_order_status_log$ods_spu_info$ods_activity_rule$ods_base_dic$ods_order_detail_activity$ods_order_detail_coupon$ods_refund_payment$ods_sku_attr_value$ods_sku_sale_attr_value"
};;
esac
第5章 数仓搭建-DIM层
5.1 商品维度表(全量)
– 1.每行数据是什么 一行数据是sku – 2.字段有哪些 sku_info,spu_info,… – 3.分区规划 按天分区 存储格式: parquet列式存储+lao压缩。
1)Hive读取索引文件问题 (1)两种方式,分别查询数据有多少行
hive (gmall)> select * from ods_log;
Time taken: 0.706 seconds, Fetched: 2955 row(s)
hive (gmall)> select count(*) from ods_log;
2959
(2)两次查询结果不一致。 原因是select * from ods_log不执行MR操作,直接采用的是ods_log建表语句中指定的DeprecatedLzoTextInputFormat,能够识别lzo.index为索引文件。 select count(*) from ods_log执行MR操作,会先经过hive.input.format,其默认值为CombineHiveInputFormat,其会先将索引文件当成小文件合并,将其当做普通文件处理。更严重的是,这会导致LZO文件无法切片。
hive (gmall)>
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
解决办法:修改CombineHiveInputFormat为HiveInputFormat
hive (gmall)>
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
hive在装载dim后,会出现为null值的一行,为什么?
因为hive在insert数据时,会对map的小文件自动合并导致的。 insert会解析成计算任务,MR或者Spark计算任务,会去读取ODS层与商品相关的业务数据。 而这些业务数据是以lzo压缩+其索引存储的 所以hive在从这张表读取数据的时候,会误把这两个文件当成小文件进行合并。 也就是把索引文件误当成数据文件进行合并了。 从而会导致,索引文件没了,就不能切片了。
5.2 优惠券维度表(全量)
DROP TABLE IF EXISTS dim_coupon_info;
CREATE EXTERNAL TABLE dim_coupon_info(
`id` STRING COMMENT '购物券编号',
`coupon_name` STRING COMMENT '购物券名称',
`coupon_type` STRING COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',
`condition_amount` DECIMAL(16,2) COMMENT '满额数',
`condition_num` BIGINT COMMENT '满件数',
`activity_id` STRING COMMENT '活动编号',
`benefit_amount` DECIMAL(16,2) COMMENT '减金额',
`benefit_discount` DECIMAL(16,2) COMMENT '折扣',
`create_time` STRING COMMENT '创建时间',
`range_type` STRING COMMENT '范围类型 1、商品 2、品类 3、品牌',
`limit_num` BIGINT COMMENT '最多领取次数',
`taken_count` BIGINT COMMENT '已领取次数',
`start_time` STRING COMMENT '可以领取的开始日期',
`end_time` STRING COMMENT '可以领取的结束日期',
`operate_time` STRING COMMENT '修改时间',
`expire_time` STRING COMMENT '过期时间'
) COMMENT '优惠券维度表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dim/dim_coupon_info/'
TBLPROPERTIES ("parquet.compression"="lzo");
5.3 活动维度表(全量)
5.4 地区维度表(特殊)
地区维度表数据相对稳定,变化概率较低,故无需每日装载,也没有分区
5.5 时间维度表(特殊)
每行数据是一个日期 字段: 不分区
1.建表语句
DROP TABLE IF EXISTS dim_date_info;
CREATE EXTERNAL TABLE dim_date_info(
`date_id` STRING COMMENT '日',
`week_id` STRING COMMENT '周ID',
`week_day` STRING COMMENT '周几',
`day` STRING COMMENT '每月的第几天',
`month` STRING COMMENT '第几月',
`quarter` STRING COMMENT '第几季度',
`year` STRING COMMENT '年',
`is_workday` STRING COMMENT '是否是工作日',
`holiday_id` STRING COMMENT '节假日'
) COMMENT '时间维度表'
STORED AS PARQUET
LOCATION '/warehouse/gmall/dim/dim_date_info/'
TBLPROPERTIES ("parquet.compression"="lzo");
2.数据装载 通常情况下,时间维度表的数据并不是来自于业务系统,而是手动写入,并且由于时间维度表数据的可预见性,无须每日导入,一般可一次性导入一年的数据。 1)创建临时表
DROP TABLE IF EXISTS tmp_dim_date_info;
CREATE EXTERNAL TABLE tmp_dim_date_info (
`date_id` STRING COMMENT '日',
`week_id` STRING COMMENT '周ID',
`week_day` STRING COMMENT '周几',
`day` STRING COMMENT '每月的第几天',
`month` STRING COMMENT '第几月',
`quarter` STRING COMMENT '第几季度',
`year` STRING COMMENT '年',
`is_workday` STRING COMMENT '是否是工作日',
`holiday_id` STRING COMMENT '节假日'
) COMMENT '时间维度表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/warehouse/gmall/tmp/tmp_dim_date_info/';
2)将数据文件上传到HFDS上临时表指定路径/warehouse/gmall/tmp/tmp_dim_date_info/
2020-01-01 1 3 1 1 1 2020 0 元旦
2020-01-02 1 4 2 1 1 2020 1 \N
2020-01-03 1 5 3 1 1 2020 1 \N
2020-01-04 1 6 4 1 1 2020 0 \N
2020-01-05 1 7 5 1 1 2020 0 \N
2020-01-06 2 1 6 1 1 2020 1 \N
2020-01-07 2 2 7 1 1 2020 1 \N
2020-01-08 2 3 8 1 1 2020 1 \N
3)执行以下语句将其导入时间维度表
insert overwrite table dim_date_info select * from tmp_dim_date_info;
4)检查数据是否导入成功
select * from dim_date_info;
5.6 用户维度表(拉链表)
每行数据是一个用户的一个状态 字段: 用户基本信息,开始日期,结束日期 分区: 按天分区,9999-99-99为最新数据
5.6.1 拉链表概述
1)什么是拉链表
2)为什么要做拉链表
能够更加高效的存储历史状态
3)如何使用拉链表
4)拉链表形成过程
5.6.2 制作拉链表
1.建表语句
DROP TABLE IF EXISTS dim_user_info;
CREATE EXTERNAL TABLE dim_user_info(
`id` STRING COMMENT '用户id',
`login_name` STRING COMMENT '用户名称',
`nick_name` STRING COMMENT '用户昵称',
`name` STRING COMMENT '用户姓名',
`phone_num` STRING COMMENT '手机号码',
`email` STRING COMMENT '邮箱',
`user_level` STRING COMMENT '用户等级',
`birthday` STRING COMMENT '生日',
`gender` STRING COMMENT '性别',
`create_time` STRING COMMENT '创建时间',
`operate_time` STRING COMMENT '操作时间',
`start_date` STRING COMMENT '开始日期',
`end_date` STRING COMMENT '结束日期'
) COMMENT '用户表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dim/dim_user_info/'
TBLPROPERTIES ("parquet.compression"="lzo");
数据装载
每日装载:
with
tmp as
(
select
old.id old_id,
old.login_name old_login_name,
old.nick_name old_nick_name,
old.name old_name,
old.phone_num old_phone_num,
old.email old_email,
old.user_level old_user_level,
old.birthday old_birthday,
old.gender old_gender,
old.create_time old_create_time,
old.operate_time old_operate_time,
old.start_date old_start_date,
old.end_date old_end_date,
new.id new_id,
new.login_name new_login_name,
new.nick_name new_nick_name,
new.name new_name,
new.phone_num new_phone_num,
new.email new_email,
new.user_level new_user_level,
new.birthday new_birthday,
new.gender new_gender,
new.create_time new_create_time,
new.operate_time new_operate_time,
new.start_date new_start_date,
new.end_date new_end_date
from
(
select
id,
login_name,
nick_name,
name,
phone_num,
email,
user_level,
birthday,
gender,
create_time,
operate_time,
start_date,
end_date
from dim_user_info
where dt='9999-99-99'
)old
full outer join
(
select
id,
login_name,
nick_name,
md5(name) name,
md5(phone_num) phone_num,
md5(email) email,
user_level,
birthday,
gender,
create_time,
operate_time,
'2020-06-15' start_date,
'9999-99-99' end_date
from ods_user_info
where dt='2020-06-15'
)new
on old.id=new.id
)
insert overwrite table dim_user_info partition(dt)
select
nvl(new_id,old_id),
nvl(new_login_name,old_login_name),
nvl(new_nick_name,old_nick_name),
nvl(new_name,old_name),
nvl(new_phone_num,old_phone_num),
nvl(new_email,old_email),
nvl(new_user_level,old_user_level),
nvl(new_birthday,old_birthday),
nvl(new_gender,old_gender),
nvl(new_create_time,old_create_time),
nvl(new_operate_time,old_operate_time),
nvl(new_start_date,old_start_date),
nvl(new_end_date,old_end_date),
nvl(new_end_date,old_end_date) dt
from tmp
union all
select
old_id,
old_login_name,
old_nick_name,
old_name,
old_phone_num,
old_email,
old_user_level,
old_birthday,
old_gender,
old_create_time,
old_operate_time,
old_start_date,
cast(date_add('2020-06-15',-1) as string),
cast(date_add('2020-06-15',-1) as string) dt
from tmp
where new_id is not null and old_id is not null;
5.7 DIM层首日数据装载脚本
1)编写脚本 (1)在/home/atguigu/bin目录下创建脚本ods_to_dim_db_init.sh
[atguigu@hadoop102 bin]$ vim ods_to_dim_db_init.sh
在脚本中填写如下内容
#!/bin/bash
APP=gmall
if [ -n "$2" ] ;then
do_date=$2
else
echo "请传入日期参数"
exit
fi
dim_user_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_user_info partition(dt='9999-99-99')
select
id,
login_name,
nick_name,
md5(name),
md5(phone_num),
md5(email),
user_level,
birthday,
gender,
create_time,
operate_time,
'$do_date',
'9999-99-99'
from ${APP}.ods_user_info
where dt='$do_date';
"
dim_sku_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
with
sku as
(
select
id,
price,
sku_name,
sku_desc,
weight,
is_sale,
spu_id,
category3_id,
tm_id,
create_time
from ${APP}.ods_sku_info
where dt='$do_date'
),
spu as
(
select
id,
spu_name
from ${APP}.ods_spu_info
where dt='$do_date'
),
c3 as
(
select
id,
name,
category2_id
from ${APP}.ods_base_category3
where dt='$do_date'
),
c2 as
(
select
id,
name,
category1_id
from ${APP}.ods_base_category2
where dt='$do_date'
),
c1 as
(
select
id,
name
from ${APP}.ods_base_category1
where dt='$do_date'
),
tm as
(
select
id,
tm_name
from ${APP}.ods_base_trademark
where dt='$do_date'
),
attr as
(
select
sku_id,
collect_set(named_struct('attr_id',attr_id,'value_id',value_id,'attr_name',attr_name,'value_name',value_name)) attrs
from ${APP}.ods_sku_attr_value
where dt='$do_date'
group by sku_id
),
sale_attr as
(
select
sku_id,
collect_set(named_struct('sale_attr_id',sale_attr_id,'sale_attr_value_id',sale_attr_value_id,'sale_attr_name',sale_attr_name,'sale_attr_value_name',sale_attr_value_name)) sale_attrs
from ${APP}.ods_sku_sale_attr_value
where dt='$do_date'
group by sku_id
)
insert overwrite table ${APP}.dim_sku_info partition(dt='$do_date')
select
sku.id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.is_sale,
sku.spu_id,
spu.spu_name,
sku.category3_id,
c3.name,
c3.category2_id,
c2.name,
c2.category1_id,
c1.name,
sku.tm_id,
tm.tm_name,
attr.attrs,
sale_attr.sale_attrs,
sku.create_time
from sku
left join spu on sku.spu_id=spu.id
left join c3 on sku.category3_id=c3.id
left join c2 on c3.category2_id=c2.id
left join c1 on c2.category1_id=c1.id
left join tm on sku.tm_id=tm.id
left join attr on sku.id=attr.sku_id
left join sale_attr on sku.id=sale_attr.sku_id;
"
dim_base_province="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_base_province
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.iso_3166_2,
bp.region_id,
br.region_name
from ${APP}.ods_base_province bp
join ${APP}.ods_base_region br on bp.region_id = br.id;
"
dim_coupon_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_coupon_info partition(dt='$do_date')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
limit_num,
taken_count,
start_time,
end_time,
operate_time,
expire_time
from ${APP}.ods_coupon_info
where dt='$do_date';
"
dim_activity_rule_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_activity_rule_info partition(dt='$do_date')
select
ar.id,
ar.activity_id,
ai.activity_name,
ar.activity_type,
ai.start_time,
ai.end_time,
ai.create_time,
ar.condition_amount,
ar.condition_num,
ar.benefit_amount,
ar.benefit_discount,
ar.benefit_level
from
(
select
id,
activity_id,
activity_type,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from ${APP}.ods_activity_rule
where dt='$do_date'
)ar
left join
(
select
id,
activity_name,
start_time,
end_time,
create_time
from ${APP}.ods_activity_info
where dt='$do_date'
)ai
on ar.activity_id=ai.id;
"
case $1 in
"dim_user_info"){
hive -e "$dim_user_info"
};;
"dim_sku_info"){
hive -e "$dim_sku_info"
};;
"dim_base_province"){
hive -e "$dim_base_province"
};;
"dim_coupon_info"){
hive -e "$dim_coupon_info"
};;
"dim_activity_rule_info"){
hive -e "$dim_activity_rule_info"
};;
"all"){
hive -e "$dim_user_info$dim_sku_info$dim_coupon_info$dim_activity_rule_info$dim_base_province"
};;
esac
(2)增加执行权限
[atguigu@hadoop102 bin]$ chmod +x ods_to_dim_db_init.sh
2)脚本使用 (1)执行脚本
[atguigu@hadoop102 bin]$ ods_to_dim_db_init.sh all 2020-06-14
注意:该脚本不包含时间维度表的装载,时间维度表需手动装载数据,参考5.5节。 (2)查看数据是否导入成功
5.8 DIM层每日数据装载脚本
1)编写脚本 (1)在/home/atguigu/bin目录下创建脚本ods_to_dim_db.sh [atguigu@hadoop102 bin]$ vim ods_to_dim_db.sh 变化:将hive的动态分区的模式设为非严格模式。(动态分区的模式,默认 strict,表示必须指定至少一个分区为静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。)
在脚本中填写如下内容
#!/bin/bash
APP=gmall
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
dim_user_info="
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
with
tmp as
(
select
old.id old_id,
old.login_name old_login_name,
old.nick_name old_nick_name,
old.name old_name,
old.phone_num old_phone_num,
old.email old_email,
old.user_level old_user_level,
old.birthday old_birthday,
old.gender old_gender,
old.create_time old_create_time,
old.operate_time old_operate_time,
old.start_date old_start_date,
old.end_date old_end_date,
new.id new_id,
new.login_name new_login_name,
new.nick_name new_nick_name,
new.name new_name,
new.phone_num new_phone_num,
new.email new_email,
new.user_level new_user_level,
new.birthday new_birthday,
new.gender new_gender,
new.create_time new_create_time,
new.operate_time new_operate_time,
new.start_date new_start_date,
new.end_date new_end_date
from
(
select
id,
login_name,
nick_name,
name,
phone_num,
email,
user_level,
birthday,
gender,
create_time,
operate_time,
start_date,
end_date
from ${APP}.dim_user_info
where dt='9999-99-99'
and start_date<'$do_date'
)old
full outer join
(
select
id,
login_name,
nick_name,
md5(name) name,
md5(phone_num) phone_num,
md5(email) email,
user_level,
birthday,
gender,
create_time,
operate_time,
'$do_date' start_date,
'9999-99-99' end_date
from ${APP}.ods_user_info
where dt='$do_date'
)new
on old.id=new.id
)
insert overwrite table ${APP}.dim_user_info partition(dt)
select
nvl(new_id,old_id),
nvl(new_login_name,old_login_name),
nvl(new_nick_name,old_nick_name),
nvl(new_name,old_name),
nvl(new_phone_num,old_phone_num),
nvl(new_email,old_email),
nvl(new_user_level,old_user_level),
nvl(new_birthday,old_birthday),
nvl(new_gender,old_gender),
nvl(new_create_time,old_create_time),
nvl(new_operate_time,old_operate_time),
nvl(new_start_date,old_start_date),
nvl(new_end_date,old_end_date),
nvl(new_end_date,old_end_date) dt
from tmp
union all
select
old_id,
old_login_name,
old_nick_name,
old_name,
old_phone_num,
old_email,
old_user_level,
old_birthday,
old_gender,
old_create_time,
old_operate_time,
old_start_date,
cast(date_add('$do_date',-1) as string),
cast(date_add('$do_date',-1) as string) dt
from tmp
where new_id is not null and old_id is not null;
"
dim_sku_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
with
sku as
(
select
id,
price,
sku_name,
sku_desc,
weight,
is_sale,
spu_id,
category3_id,
tm_id,
create_time
from ${APP}.ods_sku_info
where dt='$do_date'
),
spu as
(
select
id,
spu_name
from ${APP}.ods_spu_info
where dt='$do_date'
),
c3 as
(
select
id,
name,
category2_id
from ${APP}.ods_base_category3
where dt='$do_date'
),
c2 as
(
select
id,
name,
category1_id
from ${APP}.ods_base_category2
where dt='$do_date'
),
c1 as
(
select
id,
name
from ${APP}.ods_base_category1
where dt='$do_date'
),
tm as
(
select
id,
tm_name
from ${APP}.ods_base_trademark
where dt='$do_date'
),
attr as
(
select
sku_id,
collect_set(named_struct('attr_id',attr_id,'value_id',value_id,'attr_name',attr_name,'value_name',value_name)) attrs
from ${APP}.ods_sku_attr_value
where dt='$do_date'
group by sku_id
),
sale_attr as
(
select
sku_id,
collect_set(named_struct('sale_attr_id',sale_attr_id,'sale_attr_value_id',sale_attr_value_id,'sale_attr_name',sale_attr_name,'sale_attr_value_name',sale_attr_value_name)) sale_attrs
from ${APP}.ods_sku_sale_attr_value
where dt='$do_date'
group by sku_id
)
insert overwrite table ${APP}.dim_sku_info partition(dt='$do_date')
select
sku.id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.is_sale,
sku.spu_id,
spu.spu_name,
sku.category3_id,
c3.name,
c3.category2_id,
c2.name,
c2.category1_id,
c1.name,
sku.tm_id,
tm.tm_name,
attr.attrs,
sale_attr.sale_attrs,
sku.create_time
from sku
left join spu on sku.spu_id=spu.id
left join c3 on sku.category3_id=c3.id
left join c2 on c3.category2_id=c2.id
left join c1 on c2.category1_id=c1.id
left join tm on sku.tm_id=tm.id
left join attr on sku.id=attr.sku_id
left join sale_attr on sku.id=sale_attr.sku_id;
"
dim_base_province="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_base_province
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.iso_3166_2,
bp.region_id,
bp.name
from ${APP}.ods_base_province bp
join ${APP}.ods_base_region br on bp.region_id = br.id;
"
dim_coupon_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_coupon_info partition(dt='$do_date')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
limit_num,
taken_count,
start_time,
end_time,
operate_time,
expire_time
from ${APP}.ods_coupon_info
where dt='$do_date';
"
dim_activity_rule_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_activity_rule_info partition(dt='$do_date')
select
ar.id,
ar.activity_id,
ai.activity_name,
ar.activity_type,
ai.start_time,
ai.end_time,
ai.create_time,
ar.condition_amount,
ar.condition_num,
ar.benefit_amount,
ar.benefit_discount,
ar.benefit_level
from
(
select
id,
activity_id,
activity_type,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from ${APP}.ods_activity_rule
where dt='$do_date'
)ar
left join
(
select
id,
activity_name,
start_time,
end_time,
create_time
from ${APP}.ods_activity_info
where dt='$do_date'
)ai
on ar.activity_id=ai.id;
"
case $1 in
"dim_user_info"){
hive -e "$dim_user_info"
};;
"dim_sku_info"){
hive -e "$dim_sku_info"
};;
"dim_base_province"){
hive -e "$dim_base_province"
};;
"dim_coupon_info"){
hive -e "$dim_coupon_info"
};;
"dim_activity_rule_info"){
hive -e "$dim_activity_rule_info"
};;
"all"){
hive -e "$dim_user_info$dim_sku_info$dim_coupon_info$dim_activity_rule_info"
};;
esac
(2)增加执行权限
[atguigu@hadoop102 bin]$ chmod +x ods_to_dim_db.sh
2)脚本使用 (1)执行脚本
[atguigu@hadoop102 bin]$ ods_to_dim_db.sh all 2020-06-14
(2)查看数据是否导入成功
|