?
?
-- 1 Mysql 创建mysql 表?
create table IF NOT EXISTS itcast_nev.web_chat_ems(
? id int auto_increment primary key comment '主键' ,
? create_date_time timestamp null comment '数据创建时间',
? session_id varchar(48) default '' not null comment 'sessionId',
? sid varchar(48) collate utf8_bin ?default '' not null comment '访客id',
? create_time datetime null comment '会话创建时间',
? seo_source varchar(255) collate utf8_bin default '' null comment '搜索来源',
? seo_keywords varchar(512) collate utf8_bin default '' null comment '关键字',
? ip varchar(48) collate utf8_bin ?default '' null comment 'IP地址',
? area varchar(255) collate utf8_bin default '' null comment '地域',
? country varchar(16) collate utf8_bin ?default '' null comment '所在国家',
? province varchar(16) collate utf8_bin ?default '' null comment '省',
? city varchar(255) collate utf8_bin default '' null comment '城市',
? origin_channel varchar(32) collate utf8_bin ?default '' null comment '投放渠道',
? user varchar(255) collate utf8_bin default '' null comment '所属坐席',
? manual_time datetime null comment '人工开始时间',
? begin_time datetime null comment '坐席领取时间 ',
? end_time datetime null comment '会话结束时间',
? last_customer_msg_time_stamp datetime null comment '客户最后一条消息的时间',
? last_agent_msg_time_stamp datetime null comment '坐席最后一下回复的时间',
? reply_msg_count int(12) default 0 ?null comment '客服回复消息数',
? msg_count int(12) default 0 ?null comment '客户发送消息数',
? browser_name varchar(255) collate utf8_bin default '' null comment '浏览器名称',
? os_info varchar(255) collate utf8_bin default '' null comment '系统名称'
);
-- 2 Flink sql 创建mysql-cdc表?
create table tbl_web_chat_ems_mysql (
? id string PRIMARY KEY NOT ENFORCED,
? create_date_time string,
? session_id string,
? sid string,
? create_time string,
? seo_source string,
? seo_keywords string,
? ip string,
? area string,
? country string,
? province string,
? city string,
? origin_channel string,
? `user` string,
? manual_time string,
? begin_time string,
? end_time string,
? last_customer_msg_time_stamp string,
? last_agent_msg_time_stamp string,
? reply_msg_count string,
? msg_count string,
? browser_name string,
? os_info string
)WITH(
? 'connector' = 'mysql-cdc',
? 'hostname' = 'node1.itcast.cn',
? 'port' = '3306',
? 'username' = 'root',
? 'password' = '123456',
? 'server-time-zone' = 'Asia/Shanghai',
? 'debezium.snapshot.mode' = 'initial',
? 'database-name' = 'itcast_nev',
? 'table-name' = 'web_chat_ems'
);
-- 3 Flink SQL Connector Hudi创建hudi表--?
create table edu_web_chat_ems_hudi (
? id string PRIMARY KEY NOT ENFORCED,
? create_date_time string,
? session_id string,
? sid string,
? create_time string,
? seo_source string,
? seo_keywords string,
? ip string,
? area string,
? country string,
? province string,
? city string,
? origin_channel string,
? `user` string,
? manual_time string,
? begin_time string,
? end_time string,
? last_customer_msg_time_stamp string,
? last_agent_msg_time_stamp string,
? reply_msg_count string,
? msg_count string,
? browser_name string,
? os_info string,
? part STRING
)
PARTITIONED BY (part)
WITH(
? 'connector'='hudi',
? 'path'= 'hdfs://node1.itcast.cn:8020/hudi-warehouse/edu_web_chat_ems_hudi',?
? 'table.type'= 'MERGE_ON_READ',
? 'hoodie.datasource.write.recordkey.field'= 'id',?
? 'write.precombine.field'= 'create_date_time',
? 'write.tasks'= '1',
? 'write.rate.limit'= '2000',?
? 'compaction.tasks'= '1',?
? 'compaction.async.enabled'= 'true',
? 'compaction.trigger.strategy'= 'num_commits',
? 'compaction.delta_commits'= '1',
? 'changelog.enabled'= 'true'
);
?
-- 插入数据
insert into edu_web_chat_ems_hudi?
select *, CAST(CURRENT_DATE AS STRING) AS part from tbl_web_chat_ems_mysql;
-- 4 创建hive-hudi-映射表?
CREATE EXTERNAL TABLE edu_hudi.tbl_web_chat_ems (
? id string,
? create_date_time string,
? session_id string,
? sid string,
? create_time string,
? seo_source string,
? seo_keywords string,
? ip string,
? area string,
? country string,
? province string,
? city string,
? origin_channel string,
? `user` string,
? manual_time string,
? begin_time string,
? end_time string,
? last_customer_msg_time_stamp string,
? last_agent_msg_time_stamp string,
? reply_msg_count string,
? msg_count string,
? browser_name string,
? os_info string
)
PARTITIONED BY (day_str string)
ROW FORMAT SERDE?
? 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'?
STORED AS INPUTFORMAT?
? 'org.apache.hudi.hadoop.HoodieParquetInputFormat'?
OUTPUTFORMAT?
? 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION?
? '/hudi-warehouse/edu_web_chat_ems_hudi' ;
ALTER TABLE edu_hudi.tbl_web_chat_ems ADD IF NOT EXISTS PARTITION(day_str='2021-11-29')?
location '/hudi-warehouse/edu_web_chat_ems_hudi/2021-11-29' ;
SELECT COUNT(1) AS total FROM edu_hudi.tbl_web_chat_ems WHERE day_str = '2021-11-29';
SELECT id, session_id, ip, province FROM edu_hudi.tbl_web_chat_ems WHERE day_str = '2021-11-29' LIMIT 10;
?
-- 5.6 presto保存MySQL数据库
-- presto可以操作不同数据源:将hive数据保存到mysql
presto:edu_hudi>?
INSERT INTO mysql.itcast_rpt.web_pv (report_date, report_total)?
SELECT day_value, COUNT(id) AS total FROM (
? SELECT?
? ? id, format_datetime(from_unixtime(cast(create_time as bigint) / 1000), 'yyyy-MM-dd') AS day_value
? FROM hive.edu_hudi.tbl_web_chat_ems?
? WHERE day_str = '2021-11-29'?
) GROUP BY day_value?
-- 实时 streaming=======================================================================================================
?
-- 1-1. MySQL 数据库表
CREATE TABLE `itcast_rpt`.`realtime_web_pv` (
? `report_date` varchar(255) NOT NULL,
? `report_total` bigint(20) NOT NULL,
? PRIMARY KEY (`report_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 1-2. Flink SQL Connector Hudi
CREATE TABLE edu_web_chat_ems_hudi (
? id string PRIMARY KEY NOT ENFORCED,
? create_date_time string,
? session_id string,
? sid string,
? create_time string,
? seo_source string,
? seo_keywords string,
? ip string,
? area string,
? country string,
? province string,
? city string,
? origin_channel string,
? `user` string,
? manual_time string,
? begin_time string,
? end_time string,
? last_customer_msg_time_stamp string,
? last_agent_msg_time_stamp string,
? reply_msg_count string,
? msg_count string,
? browser_name string,
? os_info string,
? part STRING
)
PARTITIONED BY (part)
WITH(
? 'connector'='hudi',
? 'path'= 'hdfs://node1.itcast.cn:8020/hudi-warehouse/edu_web_chat_ems_hudi',?
? 'table.type'= 'MERGE_ON_READ',
? 'hoodie.datasource.write.recordkey.field'= 'id',?
? 'write.precombine.field'= 'create_date_time',
? 'read.streaming.enabled' = 'true',
? 'read.streaming.check-interval' = '5',
? 'read.tasks' = '1'
);
-- ?1-3.Flink SQL ?保存MySQL数据库
-- 创建 Flink SQL Connector MySQL?
CREATE TABLE realtime_web_pv_mysql (
? report_date STRING,
? report_total BIGINT,?
? PRIMARY KEY (report_date) NOT ENFORCED
) WITH (
? 'connector' = 'jdbc',
? 'url' = 'jdbc:mysql://node1.itcast.cn:3306/itcast_rpt',
? 'driver' = 'com.mysql.cj.jdbc.Driver',
? 'username' = 'root',
? 'password' = '123456',
? 'table-name' = 'realtime_web_pv'
);?
--1-4. Flink SQL 指标计算 INSERT INTO 插入统计结果,存储至视图View
?
INSERT INTO ?realtime_web_pv_mysql?
SELECT day_value, COUNT(id) AS total FROM (
? SELECT
? ? FROM_UNIXTIME(CAST(create_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, id
? FROM edu_web_chat_ems_hudi
? WHERE part = '2021-12-28'
) GROUP BY ?day_value;
|