一 调度数据准备
将首日数据导入(6-14),其他日期的数据不要导入,如果导入,关闭除HDFS的所有集群,删除hdfs上的warehouse,origin_data,将表重新建立一遍。
gmall中有75张表。
lg.sh 2020-06-14
mysql_to_hdfs_init.sh all 2020-06-14
二 Azkaban部署
1 上传tar包
将azkaban-db-3.84.4.tar.gz,azkaban-exec-server-3.84.4.tar.gz,azkaban-web-server-3.84.4.tar.gz上传到hadoop101的/opt/software路径
其中101为主机,102,103两台从机。
- azkaban-db:azkaban数据库初始化的脚本
- azkaban-exec-server:从机
- azkaban-web-server:主机
新建/opt/module/azkaban目录,并将所有tar包解压到这个目录下
mkdir /opt/module/azkaban
解压azkaban-db-3.84.4.tar.gz、 azkaban-exec-server-3.84.4.tar.gz和azkaban-web-server-3.84.4.tar.gz到/opt/module/azkaban目录下
在101执行
tar -zxvf azkaban-web-server-3.84.4.tar.gz -C /opt/module/azkaban/
mv azkaban-web-server-3.84.4/ web
tar -zxvf azkaban-db-3.84.4.tar.gz -C /opt/module/azkaban/
在102,103执行
tar -zxvf azkaban-exec-server-3.84.4.tar.gz -C /opt/module/azkaban/
2 配置MySQL
启动MySQL
mysql -uroot -p000000
登陆MySQL,创建Azkaban数据库
create database azkaban;
创建azkaban用户并赋予权限
设置密码有效长度4位及以上
mysql> set global validate_password_length=4;
设置密码策略最低级别
mysql> set global validate_password_policy=0;
创建Azkaban用户,任何主机都可以访问Azkaban,密码是000000
mysql> CREATE USER 'azkaban'@'%' IDENTIFIED BY '000000';
赋予Azkaban用户增删改查权限
mysql> GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;
创建Azkaban表,完成后退出MySQL
mysql> use azkaban;
mysql> source /opt/module/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql
mysql> quit;
更改MySQL包大小;防止Azkaban连接MySQL阻塞
sudo vim /etc/my.cnf
在[mysqld]下面加一行max_allowed_packet=1024M
[mysqld]
max_allowed_packet=1024M
重启MySQL
sudo systemctl restart mysqld
sudo systemctl status mysqld
3 配置Executor Server
从机,在102,103执行
编辑azkaban.properties
vim /opt/module/azkaban/azkaban-exec/conf/azkaban.properties
修改如下属性
# 时区
default.timezone.id=Asia/Shanghai
# 主机的地址
azkaban.webserver.url=http://hadoop101:8081
# 手动添加
executor.port=12321
#...
database.type=mysql
mysql.port=3306
mysql.host=hadoop101
mysql.database=azkaban
mysql.user=azkaban
mysql.password=000000
mysql.numconnections=100
同步azkaban-exec到103
rsync -av /opt/module/azkaban/azkaban-exec hadoop103:/opt/module/azkaban/
必须进入到/opt/module/azkaban/azkaban-exec路径,分别在两台机器上,启动executor server
bin/start-exec.sh
注意:如果在/opt/module/azkaban/azkaban-exec目录下出现executor.port文件,说明启动成功
4)下面激活executor,需要
curl -G "hadoop102:12321/executor?action=activate" && echo
curl -G "hadoop103:12321/executor?action=activate" && echo
如果两台机器都出现如下提示,则表示激活成功
{"status":"success"}
或者登录数据库
mysql -uroot -p000000
use azkaban;
show tables;
select * from executors;
会看到有两个executor且active为1,每次重启azkaban都需要手动激活
4 配置Web Server
Azkaban Web Server处理项目管理,身份验证,计划和执行触发。
在101执行
编辑azkaban.properties
vim /opt/module/azkaban/azkaban-web/conf/azkaban.properties
修改如下属性
...
default.timezone.id=Asia/Shanghai
...
database.type=mysql
mysql.port=3306
mysql.host=hadoop101
mysql.database=azkaban
mysql.user=azkaban
mysql.password=000000
mysql.numconnections=100
...
azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
说明:
修改azkaban-users.xml文件,添加hzy用户
vim /opt/module/azkaban/azkaban-web/conf/azkaban-users.xml
<azkaban-users>
<user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/>
<user password="metrics" roles="metrics" username="metrics"/>
<user password="123456" roles="metrics,admin" username="hzy"/>
<role name="admin" permissions="ADMIN"/>
<role name="metrics" permissions="METRICS"/>
</azkaban-users>
必须进入到hadoop101的/opt/module/azkaban/azkaban-web路径,启动web server
bin/start-web.sh
访问http://hadoop101:8081,并用hzy用户登陆
三 创建MySQL数据库和表
1 创建数据库
创建gmall_report数据库

SQL语句
CREATE DATABASE `gmall_report` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
2 创建表
create database gmall_report character set utf8mb4 collate utf8mb4_general_ci;
use gmall_report;
DROP TABLE IF EXISTS ads_visit_stats;
CREATE TABLE `ads_visit_stats`
(
`dt` DATE NOT NULL COMMENT '统计日期',
`is_new` VARCHAR(255) NOT NULL COMMENT '新老标识,1:新,0:老',
`channel` VARCHAR(255) NOT NULL COMMENT '渠道',
`uv_count` BIGINT(20) DEFAULT NULL COMMENT '日活(访问人数)',
`duration_sec` BIGINT(20) DEFAULT NULL COMMENT '页面停留总时长',
`avg_duration_sec` BIGINT(20) DEFAULT NULL COMMENT '一次会话,页面停留平均时长',
`page_count` BIGINT(20) DEFAULT NULL COMMENT '页面总浏览数',
`avg_page_count` BIGINT(20) DEFAULT NULL COMMENT '一次会话,页面平均浏览数',
`sv_count` BIGINT(20) DEFAULT NULL COMMENT '会话次数',
`bounce_count` BIGINT(20) DEFAULT NULL COMMENT '跳出数',
`bounce_rate` DECIMAL(16, 2) DEFAULT NULL COMMENT '跳出率',
PRIMARY KEY (`dt`, `is_new`, `channel`)
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS ads_page_path;
CREATE TABLE `ads_page_path`
(
`dt` DATE NOT NULL COMMENT '统计日期',
`source` VARCHAR(255) DEFAULT NULL COMMENT '跳转起始页面',
`target` VARCHAR(255) DEFAULT NULL COMMENT '跳转终到页面',
`path_count` BIGINT(255) DEFAULT NULL COMMENT '跳转次数',
UNIQUE KEY (`dt`, `source`, `target`) USING BTREE
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC;
DROP TABLE IF EXISTS ads_user_total;
CREATE TABLE `ads_user_total`
(
`dt` DATE NOT NULL COMMENT '统计日期',
`new_user_count` BIGINT(20) DEFAULT NULL COMMENT '新注册用户数',
`new_order_user_count` BIGINT(20) DEFAULT NULL COMMENT '新增下单用户数',
`order_final_amount` DECIMAL(16, 2) DEFAULT NULL COMMENT '下单总金额',
`order_user_count` BIGINT(20) DEFAULT NULL COMMENT '下单用户数',
`no_order_user_count` BIGINT(20) DEFAULT NULL COMMENT '未下单用户数(具体指活跃用户中未下单用户)',
PRIMARY KEY (`dt`)
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS ads_user_change;
CREATE TABLE `ads_user_change`
(
`dt` DATE NOT NULL COMMENT '统计日期',
`user_churn_count` BIGINT(20) DEFAULT NULL COMMENT '流失用户数',
`user_back_count` BIGINT(20) DEFAULT NULL COMMENT '回流用户数',
PRIMARY KEY (`dt`)
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS ads_user_action;
CREATE TABLE `ads_user_action`
(
`dt` DATE NOT NULL COMMENT '统计日期',
`home_count` BIGINT(20) DEFAULT NULL COMMENT '浏览首页人数',
`good_detail_count` BIGINT(20) DEFAULT NULL COMMENT '浏览商品详情页人数',
`cart_count` BIGINT(20) DEFAULT NULL COMMENT '加入购物车人数',
`order_count` BIGINT(20) DEFAULT NULL COMMENT '下单人数',
`payment_count` BIGINT(20) DEFAULT NULL COMMENT '支付人数',
PRIMARY KEY (`dt`) USING BTREE
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC;
DROP TABLE IF EXISTS ads_user_retention;
CREATE TABLE `ads_user_retention`
(
`dt` DATE DEFAULT NULL COMMENT '统计日期',
`create_date` VARCHAR(255) NOT NULL COMMENT '用户新增日期',
`retention_day` BIGINT(20) NOT NULL COMMENT '截至当前日期留存天数',
`retention_count` BIGINT(20) DEFAULT NULL COMMENT '留存用户数量',
`new_user_count` BIGINT(20) DEFAULT NULL COMMENT '新增用户数量',
`retention_rate` DECIMAL(16, 2) DEFAULT NULL COMMENT '留存率',
PRIMARY KEY (`create_date`) USING BTREE
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC;
DROP TABLE IF EXISTS ads_user_continuity;
CREATE TABLE `ads_user_continuity`
(
`dt` DATE NOT NULL COMMENT '统计日期',
`user_count` BIGINT(20) DEFAULT NULL COMMENT '最近七天内连续三天活跃用户数',
PRIMARY KEY (`dt`) USING BTREE
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC;
DROP TABLE IF EXISTS ads_user_online_count_min;
CREATE TABLE ads_user_online_count_min
(
`dt` DATE NOT NULL COMMENT '统计日期',
`mins` VARCHAR(255) NOT NULL COMMENT '分钟,要求格式为yyyy-MM-dd HH:mm',
`user_online_count` BIGINT(20) DEFAULT NULL COMMENT '在线用户数',
PRIMARY KEY (`dt`, `mins`) USING BTREE
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC;
DROP TABLE IF EXISTS ads_order_total;
CREATE TABLE `ads_order_total`
(
`dt` DATE NOT NULL COMMENT '统计日期',
`order_count` BIGINT(255) DEFAULT NULL COMMENT '订单数',
`order_amount` DECIMAL(16, 2) DEFAULT NULL COMMENT '订单金额',
`order_user_count` BIGINT(255) DEFAULT NULL COMMENT '下单人数',
PRIMARY KEY (`dt`)
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC;
DROP TABLE IF EXISTS ads_order_by_province;
CREATE TABLE `ads_order_by_province`
(
`dt` DATE NOT NULL,
`province_id` VARCHAR(255) NOT NULL COMMENT '统计日期',
`province_name` VARCHAR(255) DEFAULT NULL COMMENT '省份名称',
`area_code` VARCHAR(255) DEFAULT NULL COMMENT '地区编码',
`iso_code` VARCHAR(255) DEFAULT NULL COMMENT '国际标准地区编码',
`iso_code_3166_2` VARCHAR(255) DEFAULT NULL COMMENT '国际标准地区编码',
`order_count` BIGINT(20) DEFAULT NULL COMMENT '订单数',
`order_amount` DECIMAL(16, 2) DEFAULT NULL COMMENT '订单金额',
PRIMARY KEY (`dt`, `province_id`) USING BTREE
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC;
DROP TABLE IF EXISTS ads_repeat_purchase;
CREATE TABLE `ads_repeat_purchase`
(
`dt` DATE NOT NULL COMMENT '统计日期',
`tm_id` VARCHAR(255) NOT NULL COMMENT '品牌ID',
`tm_name` VARCHAR(255) DEFAULT NULL COMMENT '品牌名称',
`order_repeat_rate` DECIMAL(16, 2) DEFAULT NULL COMMENT '复购率',
PRIMARY KEY (`dt`, `tm_id`)
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC;
DROP TABLE IF EXISTS ads_order_spu_stats;
CREATE TABLE `ads_order_spu_stats`
(
`dt` DATE NOT NULL COMMENT '统计日期',
`spu_id` VARCHAR(255) NOT NULL COMMENT '商品ID',
`spu_name` VARCHAR(255) DEFAULT NULL COMMENT '商品名称',
`tm_id` VARCHAR(255) NOT NULL COMMENT '品牌ID',
`tm_name` VARCHAR(255) DEFAULT NULL COMMENT '品牌名称',
`category3_id` VARCHAR(255) NOT NULL COMMENT '三级品类ID',
`category3_name` VARCHAR(255) DEFAULT NULL COMMENT '三级品类名称',
`category2_id` VARCHAR(255) NOT NULL COMMENT '二级品类ID',
`category2_name` VARCHAR(255) DEFAULT NULL COMMENT '二级品类名称',
`category1_id` VARCHAR(255) NOT NULL COMMENT '一级品类ID',
`category1_name` VARCHAR(255) NOT NULL COMMENT '一级品类名称',
`order_count` BIGINT(20) DEFAULT NULL COMMENT '订单数',
`order_amount` DECIMAL(16, 2) DEFAULT NULL COMMENT '订单金额',
PRIMARY KEY (`dt`, `spu_id`)
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS ads_activity_stats;
CREATE TABLE `ads_activity_stats`
(
`dt` DATE NOT NULL COMMENT '统计日期',
`activity_id` VARCHAR(255) NOT NULL COMMENT '活动ID',
`activity_name` VARCHAR(255) DEFAULT NULL COMMENT '活动名称',
`start_date` DATE DEFAULT NULL COMMENT '开始日期',
`order_count` BIGINT(11) DEFAULT NULL COMMENT '参与活动订单数',
`order_original_amount` DECIMAL(16, 2) DEFAULT NULL COMMENT '参与活动订单原始金额',
`order_final_amount` DECIMAL(16, 2) DEFAULT NULL COMMENT '参与活动订单最终金额',
`reduce_amount` DECIMAL(16, 2) DEFAULT NULL COMMENT '优惠金额',
`reduce_rate` DECIMAL(16, 2) DEFAULT NULL COMMENT '补贴率',
PRIMARY KEY (`dt`, `activity_id`)
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC;
DROP TABLE IF EXISTS ads_coupon_stats;
CREATE TABLE `ads_coupon_stats`
(
`dt` DATE NOT NULL COMMENT '统计日期',
`coupon_id` VARCHAR(255) NOT NULL COMMENT '优惠券ID',
`coupon_name` VARCHAR(255) DEFAULT NULL COMMENT '优惠券名称',
`start_date` DATE DEFAULT NULL COMMENT '开始日期',
`rule_name` VARCHAR(200) DEFAULT NULL COMMENT '优惠规则',
`get_count` BIGINT(20) DEFAULT NULL COMMENT '领取次数',
`order_count` BIGINT(20) DEFAULT NULL COMMENT '使用(下单)次数',
`expire_count` BIGINT(20) DEFAULT NULL COMMENT '过期次数',
`order_original_amount` DECIMAL(16, 2) DEFAULT NULL COMMENT '使用优惠券订单原始金额',
`order_final_amount` DECIMAL(16, 2) DEFAULT NULL COMMENT '使用优惠券订单最终金额',
`reduce_amount` DECIMAL(16, 2) DEFAULT NULL COMMENT '优惠金额',
`reduce_rate` DECIMAL(16, 2) DEFAULT NULL COMMENT '补贴率',
PRIMARY KEY (`dt`, `coupon_id`)
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4
ROW_FORMAT = DYNAMIC;
四 Sqoop导出脚本
1 编写Sqoop导出脚本
在/home/hzy/bin目录下创建脚本hdfs_to_mysql.sh
vim hdfs_to_mysql.sh
2 在脚本中填写如下内容
hive_db_name='gmall'
mysql_db_name='gmall_report'
mysql_host='hadoop101'
mysql_user='root'
mysql_password='000000'
SQOOP_HOME='/opt/module/sqoop'
export_data() {
$SQOOP_HOME/bin/sqoop export \
}
case $1 in
"ads_activity_stats" )
export_data "ads_activity_stats" "dt,activity_id"
;;
"ads_coupon_stats" )
export_data "ads_coupon_stats" "dt,coupon_id"
;;
"ads_order_by_province" )
export_data "ads_order_by_province" "dt,province_id"
;;
"ads_order_spu_stats" )
export_data "ads_order_spu_stats" "dt,spu_id"
;;
"ads_order_total" )
export_data "ads_order_total" "dts"
;;
"ads_page_path" )
export_data "ads_page_path" "dt,source,target"
;;
"ads_repeat_purchase" )
export_data "ads_repeat_purchase" "dt,tm_id"
;;
"ads_user_action" )
export_data "ads_user_action" "dt"
;;
"ads_user_change" )
export_data "ads_user_change" "dt"
;;
"ads_user_continuity" )
export_data "ads_user_continuity" "dt"
;;
"ads_user_online_count_min" )
export_data "ads_user_online_count_min" "dt,mins"
;;
"ads_user_retention" )
export_data "ads_user_retention" "create_date"
;;
"ads_user_total" )
export_data "ads_user_total" "dt"
;;
"ads_visit_stats" )
export_data "ads_visit_stats" "dt,is_new,channel"
;;
"all" )
export_data "ads_activity_stats" "dt,activity_id"
export_data "ads_coupon_stats" "dt,coupon_id"
export_data "ads_order_by_province" "dt,province_id"
export_data "ads_order_spu_stats" "dt,spu_id"
export_data "ads_order_total" "dts"
export_data "ads_page_path" "dt,source,target"
export_data "ads_repeat_purchase" "dt,tm_id"
export_data "ads_user_action" "dt"
export_data "ads_user_change" "dt"
export_data "ads_user_continuity" "dt"
export_data "ads_user_online_count_min" "dt,mins"
export_data "ads_user_retention" "create_date"
export_data "ads_user_total" "dt"
export_data "ads_visit_stats" "dt,is_new,channel"
;;
esac
3 关于导出update还是insert的问题
分别表示,将字符串列和非字符串列的空串和“null”转义。
官网地址:http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html
Sqoop will by default import NULL values as string null. Hive is however using string \N to denote NULL values and therefore predicates dealing with NULL(like IS NULL) will not work correctly. You should append parameters --null-string and --null-non-string in case of import job or --input-null-string and --input-null-non-string in case of an export job if you wish to properly preserve NULL values. Because sqoop is using those parameters in generated code, you need to properly escape value \N to [\\N](file://N):
Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用–input-null-string和–input-null-non-string两个参数。导入数据时采用–null-string和–null-non-string。
4 使用脚本
chmod 777 hdfs_to_mysql.sh
hdfs_to_mysql.sh all
五 数据准备
用户行为数据准备
lg.sh 2020-06-15
mock.sh 2020-06-15
六 编写Azkaban工作流程配置文件
工作流文件:由一系列工作组成的整体,整体之间相互依赖
azkaban-flow-version: 2.0
nodes:
- name: mysql_to_hdfs
type: command
config:
command: /home/hzy/bin/mysql_to_hdfs.sh all ${dt}
- name: hdfs_to_ods_log
type: command
config:
command: /home/hzy/bin/hdfs_to_ods_log.sh ${dt}
- name: hdfs_to_ods_db
type: command
dependsOn:
- mysql_to_hdfs
config:
command: /home/hzy/bin/hdfs_to_ods_db.sh all ${dt}
- name: ods_to_dim_db
type: command
dependsOn:
- hdfs_to_ods_db
config:
command: /home/hzy/bin/ods_to_dim_db.sh all ${dt}
- name: ods_to_dwd_log
type: command
dependsOn:
- hdfs_to_ods_log
config:
command: /home/hzy/bin/ods_to_dwd_log.sh all ${dt}
- name: ods_to_dwd_db
type: command
dependsOn:
- hdfs_to_ods_db
config:
command: /home/hzy/bin/ods_to_dwd_db.sh all ${dt}
- name: dwd_to_dws
type: command
dependsOn:
- ods_to_dim_db
- ods_to_dwd_log
- ods_to_dwd_db
config:
command: /home/hzy/bin/dwd_to_dws.sh all ${dt}
- name: dws_to_dwt
type: command
dependsOn:
- dwd_to_dws
config:
command: /home/hzy/bin/dws_to_dwt.sh all ${dt}
- name: dwt_to_ads
type: command
dependsOn:
- dws_to_dwt
config:
command: /home/hzy/bin/dwt_to_ads.sh all ${dt}
- name: hdfs_to_mysql
type: command
dependsOn:
- dwt_to_ads
config:
command: /home/hzy/bin/hdfs_to_mysql.sh all
将azkaban.project、gmall.flow文件压缩到一个zip文件,文件名称必须是英文。
在WebServer新建项目:http://hadoop101:8081/index、给项目名称命名和添加项目描述、上传gmall.zip文件、选择上传的文件、查看任务流、详细任务流展示 
配置输入dt时间参数

脚本需要执行hive -e ,只在101上面有,所以可以将任务限制在101上执行,或者将hive -e同步到所有机器
限制在101执行:需要知道101的id是什么 – 执行
mysql -uroot -p000000
use azkaban;
select * from executors;
之后再添加一个属性 name:useExecutor value:1
执行,蓝色表示正在执行,绿色表示执行完成,红色表示执行失败,浅红色表示kill或者放弃执行
执行成功标志

查看导入结果
七 Azkaban多Executor模式下注意事项
Azkaban多Executor模式是指,在集群中多个节点部署Executor。在这种模式下, Azkaban web Server会根据策略,选取其中一个Executor去执行任务。
由于我们需要交给Azkaban调度的脚本,以及脚本需要的Hive,Sqoop等应用只在hadoop101部署了,为保证任务顺利执行,我们须在以下两种方案任选其一,推荐使用方案二。
方案一
指定特定的Executor(hadoop101)去执行任务。
1)在MySQL中azkaban数据库executors表中,查询hadoop101上的Executor的id。
mysql> use azkaban;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select * from executors;
+----+-----------+-------+--------+
| id | host | port | active |
+----+-----------+-------+--------+
| 1 | hadoop101 | 35985 | 1 |
| 2 | hadoop102 | 36363 | 1 |
| 3 | hadoop103 | 12321 | 1 |
+----+-----------+-------+--------+
3 rows in set (0.00 sec)
2)在执行工作流程时加入useExecutor属性
方案二
在Executor所在所有节点部署任务所需脚本和应用。
分发脚本、sqoop、spark、my_env.sh
xsync /home/hzy/bin/
xsync /opt/module/hive
xsync /opt/module/sqoop
xsync /opt/module/spark
sudo /home/hzy/bin/xsync /etc/profile.d/my_env.sh
|