项目需求
自2019年底,湖北省武汉市监测发现不明原因肺炎病例,中国第一时间报告疫情,迅速采取行动,开展病因学和流行病学调查,阻断疫情蔓延。 SARS-CoV-2是一种有着高扩散能力的病毒,通过飞沫、直接接触和被感染的物体传播,其潜伏时间为1到14天,并且也由无症状感染者传播。大多数感染者仅表现出轻度至中度的呼吸道症状,或根本不表现任何症状。只有5-10%的感染者显示出完全的严重呼吸综合征,称为冠状病毒病(COVID)-19,能够人传人,进而所引发的全球大流行疫情,是全球自第二次世界大战以来面临的最严峻危机。截至目前,全球已有200多个国家和地区累计报告超过2.1375亿确诊病例,导致超过445万名患者死亡。 在此基础上,大数据技术应用发挥出极大作用。通过城市监测,接触者追踪,疫苗接种等,将我们的疫情信息进行传达。为政府正确决策、精准施策提供了科学依据。强化了政府对疫情物资生产、筹集、投放的科学管控手段。为医疗救治、“群防群控”,防止疫情蔓延采取有效措施提供了科学数据和手段。科学分析预测疫情现状、趋势,适时准确地根据疫情变化把握防疫重点。 本项目为新型冠状病毒(COVID-19)疫情状况的时间序列数据仓库,选手可以通过对疫情历史数据的分析研究,以更好的了解疫情与疫情的发展态势,为抗击疫情之决策提供数据支持。
step1:创建ods层数据表
ods层是数据原始层,只需将原始数据拉去过来即可,ods层可以采用内部表(EXTERNAL 修饰)保证数据安全性。
数据库:covid_ods 原始数据表:covid_ods.covid
字段 | 解释 |
---|
continerName | 大洲 | countryName | 国家 | provinceName | 省份 | province_confirm | 省份确诊人数 | province_suspect | 省份感染人数 | province_cured | 省份治愈人数 | province_dead | 省份死亡人数 | cityName | 城市/地区 | city_confirm | 城市确诊人数 | city_suspect | 城市感染人数 | city_cured | 城市治愈人数 | city_dead | 城市死亡人数 | updateTime | 数据更新时间 |
数据加工表:covid_ods.covid_time
要求:保留干净数据,去重(去空值、脏数据处理),提取特征数据,只保留每天最后更新的数据;
\1. 特征数据:包括省份,城市/地区,城市确诊,城市感染,城市治愈,城市死亡,数据更新时间;
\2. 过滤重复值,数据中有同一天获取的多次疫情信息,根据时间只保留每天最后更新的数据;
\3. 同时要求国家为中国,省份不为中国,过滤地区空值。
字段 | 解释 |
---|
provinceName | 省份 | cityName | 城市/地区 | city_confirm | 城市确诊人数 | city_suspect | 城市感染人数 | city_cured | 城市治愈人数 | city_dead | 城市死亡人数 | updateTime | 数据更新时间 |
# 重复数据,同一天多次更新的数据,取每天最后更新的数据(按照时间进行条件判断)
亚洲,中国,福建省,1169,15,777,1,莆田,246,0,56,0,2021-09-20 19:14:20
亚洲,中国,福建省,1169,15,777,1,莆田,246,0,56,0,2021-09-20 18:54:20
亚洲,中国,福建省,1169,15,777,1,莆田,246,0,56,0,2021-09-20 09:50:20
# 地区空值数据
亚洲,中国,台湾,40,0,12,1,,,,,,2020-03-02 09:10:02
亚洲,中国,香港,10,0,0,0,,,,,,2020-01-29 19:12:29
注
sed` `-n ``'1,10'``p ``/etc/test1` `>>test2
#将file1的1-10行追加到file2
step2:创建dwd层数据表
在dwd层采用分区表将数据按照年/月维度进行分区存放,以便在获取某月数据时可快速获取,提高获取效率。在本层将获取想要的字段数据**,**对不规则数据做简单整理。
数据库:covid_dwd
添加昨天时间列表:province
指标:城市累计确诊,城市累计疑似,城市累计治愈,城市累计死亡。 维度:省份,城市,时间(更新时间updateTime,以及时间的前一天yesterday)。 分区:年、月 思路:将数据中的时间切割,获取年月日,并使用date_sub()函数获取昨天更新时间。
字段 | 解释 |
---|
provinceName | 省份 | cityName | 城市/地区 | city_confirm | 城市确诊人数 | city_suspect | 城市感染人数 | city_cured | 城市治愈人数 | city_dead | 城市死亡人数 | updateTime | 数据更新时间 | yesterday | 昨天更新时间 | yearinfo | 年(分区) | monthinfo | 月(分区) |
step3:创建dwm数据处理分析
统计每天各个省份中指标的增长量,因此需要去获取前一天或者后一天的数据,在本层将当天数据和前一天的数据进行汇总,通过join方式将数据合并为一条数据。对四个指标数据进行类型转换,转换为int类型(在dws层将参与运算)。
数据库:covid_dwd
创建当日数据和后一天数据汇总数据表:two_day
字段 | 解释 |
---|
provinceName | 省份 | cityName | 城市/地区 | city_confirm | 城市确诊人数 | city_suspect | 城市感染人数 | city_cured | 城市治愈人数 | city_dead | 城市死亡人数 | updateTime | 更新时间 | city_confirm_before | 一天前城市确诊人数 | city_suspect_before | 一天前城市感染人数 | city_cured_before | 一天前城市治愈人数 | city_dead_before | 一天前城市死亡人数 | yesterday | 昨天更新时间 | yearinfo | 年(分区) | monthinfo | 月(分区) |
合并数据注意考虑时间问题。
step4:创建dws层
在dwd层已经拿到前一天的数据,在本层计算各个地区的指标增量,计算方式为:
每日指标增量=前一天指标数据-今日指标数据
数据库:covid_dws
单日指标正常量表:covid_dws.day
字段 | 解释 |
---|
provinceName | 省份 | cityName | 城市/地区 | new_city_confirm | 日确诊增长人数 | new_city_suspect | 日疑似增长人数 | new_city_cured | 日治愈增长人数 | new_city_dead | 日死亡增长人数 | updateTime | 更新时间 | yearinfo | 年(分区) | monthinfo | 月(分区) |
step5:创建app层
针对疫情数据,在app层再对维度进行上卷,分析维度为各个省份每日的指标增量情况统计。
数据库:covid_app
app层业务表:covid_app.day_app
字段 | 解释 |
---|
provinceName | 省份 | new_city_confirm | 日确诊增长人数 | new_city_suspect | 日疑似增长人数 | new_city_cured | 日治愈增长人数 | new_city_dead | 日死亡增长人数 | updateTime | 更新时间 | yearinfo | 年(分区) | monthinfo | 月(分区) |
其他参考设置:
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.max.dynamic.partitions=100000;set hive.exec.max.created.files=100000;
set hive.exec.compress.output=true;
set mapreduce.reduce.memory.mb=1025;
set hive.exec.mode.local.auto.input.files.max=25;
题目
前置准备
修改主机名
hostnamectl set-hostname master
格式化HDFS,开启集群
cd /root/machine/hadoop-2.7.7
hadoop namenode -format
sbin/start-all.sh
开启mysql服务
systemctl start mysqld
环境中已经安装Hive2.3.4,需要开启mysql服务,初始化数据库,即可开启Hive客户端。
cd /usr/hive/apache-hive-2.3.4-bin
schematool -dbType mysql -initSchema
covid_ods库
进入hive客户端,创建名为covid_ods的数据库用于存储原始数据
create database covid_ods;
use covid_ods;
covid表
数据库covid_ods下创建covid表,导入去除表头后的原始数据/root/covid/covid_area.csv(文件名不变)
create table covid(
continerName STRING,
countryName STRING,
provinceName STRING,
province_confirm INT,
province_suspect INT,
province_cured INT,
province_dead INT,
cityName STRING,
city_confirm INT,
city_suspect INT,
city_cured INT,
city_dead INT,
updateTime TIMESTAMP
)row format delimited fields terminated by ',';
导入
load data local inpath '/root/covid/covid_area.csv' into table covid;
covid_time表
用于提取有用数据,过滤重复值,只保留每天最后更新的数据,具体参考步骤说明
创建表
create table covid_time(
provinceName STRING,
cityName STRING,
city_confirm INT,
city_suspect INT,
city_cured INT,
city_dead INT,
updateTime TIMESTAMP
)row format delimited fields terminated by ',';
按照要求向covid_ods.covid_time插入过滤后的数据
insert overwrite table covid_time
select
provinceName,
cityName,
city_confirm,
city_suspect,
city_cured,
city_dead,
updateTime
from (
select *
from (
select
row_number() over(partition by cityName,day order by minute desc) as rmp,*
from (
select
*,
date_format(regexp_replace(updateTime,'/','-'),'yyyy-MM-dd') as day,
date_format(regexp_replace(updateTime,'/','-'),'yyyy-MM-dd-HH-mm') as minute
from covid
where countryName ="中国" and provinceName !="中国" and cityName !=""
) a
) b
where rmp=1
) c
;
结果
重庆市 万州区 118 0 114 4 2021-08-19 09:52:19
重庆市 万州区 118 0 114 4 2021-08-23 10:36:23
重庆市 万州区 118 0 114 4 2021-09-03 19:55:03
重庆市 万盛经开区 1 0 1 0 2020-04-24 08:43:24
重庆市 万盛经开区 1 0 1 0 2020-06-14 09:10:14
重庆市 万盛经开区 1 0 1 0 2020-06-15 09:08:15
重庆市 万盛经开区 1 0 1 0 2020-07-03 09:40:03
重庆市 万盛经开区 1 0 1 0 2020-07-09 08:14:09
covid_dwd库
创建名为covid_dwd的数据库,此层将数据进行分区,便于数据的快速获取
create database covid_dwd;
use covid_dwd;
province表
数据库covid_dwd下创建province表,按照年、月分区,要求根据当天时间获取昨天对应时间列,并插入对应数据,具体要求查看步骤说明
create table province(
provinceName STRING,
cityName STRING,
city_confirm INT,
city_suspect INT,
city_cured INT,
city_dead INT,
updateTime TIMESTAMP,
yesterday TIMESTAMP)
partitioned by(yearinfo int,monthinfo int)
row format delimited fields terminated by ',';
两种的区别在于部分数据找不到前一天的日期,第一种是将查不到前一天日期设置为NULL,第二种是直接跳过此类数据
set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.mode.local.auto=true;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.max.created.files=100000;
INSERT OVERWRITE TABLE covid_dwd.province partition(yearinfo, monthinfo)
select a.provinceName,a.cityName,a.city_confirm,a.city_suspect,
a.city_cured,a.city_dead,a.updateTime,b.updateTime,year(a.updateTime) as yearinfo,month(a.updateTime) as monthinfo
from
(select
provinceName,cityName,city_confirm,city_suspect,city_cured,city_dead,updateTime,
year(updateTime) as yearinfo,
month(updateTime) as monthinfo,date_sub(updateTime,1) as yes
from covid_ods.covid_time t1
) a
left join
(select cityName,updateTime,date_format(regexp_replace(updateTime,'/','-'),'yyyy-MM-dd') as day
from covid_ods.covid_time t2
) b
on b.day= a.yes and b.cityName=a.cityName;
set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.mode.local.auto=true;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.max.created.files=100000;
INSERT OVERWRITE TABLE covid_dwd.province partition(yearinfo, monthinfo)
select a.provinceName,a.cityName,a.city_confirm,a.city_suspect,
a.city_cured,a.city_dead,a.updateTime,b.updateTime
from
(select
provinceName,cityName,city_confirm,city_suspect,city_cured,city_dead,updateTime,
year(updateTime) as yearinfo,
month(updateTime) as monthinfo,date_sub(updateTime,1) as yes
from covid_ods.covid_time t1
) a
join
(select cityName,updateTime,date_format(regexp_replace(updateTime,'/','-'),'yyyy-MM-dd') as day
from covid_ods.covid_time t2
) b
on b.day= a.yes and b.cityName=a.cityName;
结果
黑龙江省 七台河 17 0 17 0 2020-10-27 08:42:27 NULL 2020 10
重庆市 万州区 118 0 114 4 2020-10-02 08:49:02 NULL 2020 10
重庆市 万州区 118 0 114 4 2020-10-16 09:26:16 NULL 2020 10
重庆市 万州区 118 0 114 4 2020-10-20 08:45:20 NULL 2020 10
重庆市 万州区 118 0 114 4 2020-10-21 08:44:21 2020-10-20 08:45:20 2020 10
重庆市 万州区 118 0 114 4 2020-10-23 08:20:23 NULL 2020 10
重庆市 万盛经开区 1 0 1 0 2020-10-02 08:49:02 NULL 2020 10
重庆市 万盛经开区 1 0 1 0 2020-10-16 09:26:16 NULL 2020 10
重庆市 万盛经开区 1 0 1 0 2020-10-20 08:45:20 NULL 2020 10
重庆市 万盛经开区 1 0 1 0 2020-10-21 08:44:21 2020-10-20 08:45:20 2020 10
covid_dwm库
创建名为covid_dwm的数据库,用于统计每个省份的各指标增长量。
create database covid_dwm;
use covid_dwm;
two_day表
数据库covid_dwm下创建two_day表
create table two_day(
provinceName STRING,
cityName STRING,
city_confirm INT,
city_suspect INT,
city_cured INT,
city_dead INT,
updateTime TIMESTAMP,
city_confirm_before INT,
city_suspect_before INT,
city_cured_before INT,
city_dead_before INT,
yesterday TIMESTAMP)
partitioned by(yearinfo int,monthinfo int)
row format delimited fields terminated by ',';
将province中当天数据和前一天的数据进行汇总,通过join方式将数据合并为一条数据,具体查看步骤说明
set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.mode.local.auto=true;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.max.created.files=100000;
INSERT OVERWRITE TABLE covid_dwm.two_day partition(yearinfo, monthinfo)
select
a.provinceName,a.cityName,a.city_confirm,a.city_suspect,
a.city_cured,a.city_dead,a.updateTime,
b.city_confirm as city_confirm_before,b.city_suspect as city_suspect_before,
b.city_cured as city_cured_before,b.city_dead as city_dead_before,a.yesterday,
a.yearinfo,a.monthinfo from covid_dwd.province as a
left join covid_dwd.province as b on a.yesterday=b.updateTime and a.cityName=b.cityName and a.provinceName=b.provinceName;
将表two_day中所有内容保存至云主机/root/covid/two_day.csv
hive -e 'select
a.provinceName,a.cityName,a.city_confirm,a.city_suspect,
a.city_cured,a.city_dead,a.updateTime,
b.city_confirm as city_confirm_before,b.city_suspect as city_suspect_before,
b.city_cured as city_cured_before,b.city_dead as city_dead_before,a.yesterday,
a.yearinfo,a.monthinfo from covid_dwd.province as a
left join covid_dwd.province as b on a.yesterday=b.updateTime and a.cityName=b.cityName and a.provinceName=b.provinceName;' | sed 's/[[:space:]]\+/,/g' > /root/covid/two_day.csv
结果
黑龙江省 七台河 17 0 17 0 2020-10-27 08:42:27 NULL NULL NULL NULL NULL 2020 10
重庆市 万州区 118 0 114 4 2020-10-02 08:49:02 NULL NULL NULL NULL NULL 2020 10
重庆市 万州区 118 0 114 4 2020-10-16 09:26:16 NULL NULL NULL NULL NULL 2020 10
重庆市 万州区 118 0 114 4 2020-10-20 08:45:20 NULL NULL NULL NULL NULL 2020 10
重庆市 万州区 118 0 114 4 2020-10-21 08:44:21 118 0 114 4 2020-10-20 08:45:20 2020 10
重庆市 万州区 118 0 114 4 2020-10-23 08:20:23 NULL NULL NULL NULL NULL 2020 10
重庆市 万盛经开区 1 0 1 0 2020-10-02 08:49:02 NULL NULL NULL NULL NULL 2020 10
重庆市 万盛经开区 1 0 1 0 2020-10-16 09:26:16 NULL NULL NULL NULL NULL 2020 10
重庆市 万盛经开区 1 0 1 0 2020-10-20 08:45:20 NULL NULL NULL NULL NULL 2020 10
重庆市 万盛经开区 1 0 1 0 2020-10-21 08:44:21 1 0 1 0 2020-10-20 08:45:20 202010
covid_dws库
创建名为covid_dws的数据库,用于计算各个地区的指标增量。
create database covid_dws;
use covid_dws;
day表
数据库covid_dws下创建day表
create table day(
provinceName STRING,
cityName STRING,
new_city_confirm INT,
new_city_suspect INT,
new_city_cured INT,
new_city_dead INT,
updateTime TIMESTAMP)
partitioned by(yearinfo int,monthinfo int)
row format delimited fields terminated by ',';
计算地区每日指标增量,具体字段查看步骤说明
set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.mode.local.auto=true;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.max.created.files=100000;
INSERT OVERWRITE TABLE covid_dws.day partition(yearinfo, monthinfo)
select
provinceName,cityName,
(city_confirm-city_confirm_before) as new_city_confirm,
(city_suspect-city_suspect_before) as new_city_suspect,
(city_cured-city_cured_before) as new_city_cured,
(city_dead-city_dead_before) as new_city_dead,
a.updateTime,yearinfo,monthinfo from covid_dwm.two_day;
将表day中所有内容保存至云主机/root/covid/day.csv
hive -e '
select
provinceName,cityName,
(city_confirm-city_confirm_before) as new_city_confirm,
(city_suspect-city_suspect_before) as new_city_suspect,
(city_cured-city_cured_before) as new_city_cured,
(city_dead-city_dead_before) as new_city_dead,
a.updateTime,yearinfo,monthinfo from covid_dwm.two_day;
' | sed 's/[[:space:]]\+/,/g' > /root/covid/day.csv
结果
黑龙江省 七台河 NULL NULL NULL NULL 2020-10-27 08:42:27 2020 10
重庆市 万州区 NULL NULL NULL NULL 2020-10-02 08:49:02 2020 10
重庆市 万州区 NULL NULL NULL NULL 2020-10-16 09:26:16 2020 10
重庆市 万州区 NULL NULL NULL NULL 2020-10-20 08:45:20 2020 10
重庆市 万州区 0 0 0 0 2020-10-21 08:44:21 2020 10
重庆市 万州区 NULL NULL NULL NULL 2020-10-23 08:20:23 2020 10
重庆市 万盛经开区 NULL NULL NULL NULL 2020-10-02 08:49:02 2020 10
重庆市 万盛经开区 NULL NULL NULL NULL 2020-10-16 09:26:16 2020 10
重庆市 万盛经开区 NULL NULL NULL NULL 2020-10-20 08:45:20 2020 10
重庆市 万盛经开区 0 0 0 0 2020-10-21 08:44:21 2020 10
covid_app库
创建名为covid_app的数据库,此层用于各个省份每日的指标增量情况统计
create database covid_app;
use covid_app;
day_app表
数据库covid_app下创建app层业务表,进行各个省份每日的指标增量情况统计
create table day_app(
provinceName STRING,
new_city_confirm INT,
new_city_suspect INT,
new_city_cured INT,
new_city_dead INT,
updateTime TIMESTAMP)
partitioned by(yearinfo int,monthinfo int)
row format delimited fields terminated by ',';
将表day_app中所有内容保存至云主机/root/covid/day_app.csv
set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.mode.local.auto=true;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.max.created.files=100000;
INSERT OVERWRITE TABLE covid_app.day_app partition(yearinfo, monthinfo)
select distinct provinceName,new_city_confirm1,new_city_suspect1,new_city_cured1,new_city_dead1 ,updateTime,yearinfo,monthinfo from
(select provinceName,cityName,new_city_confirm,new_city_suspect,new_city_cured,new_city_dead,
sum(if(new_city_confirm is not null, new_city_confirm, 0)) over(partition by provinceName,updateTime) as new_city_confirm1,
sum(if(new_city_suspect is not null, new_city_suspect, 0)) over(partition by provinceName,updateTime ) as new_city_suspect1,
sum(if(new_city_cured is not null, new_city_cured, 0)) over(partition by provinceName,updateTime) as new_city_cured1,
sum(if(new_city_dead is not null, new_city_dead,0)) over(partition by provinceName,updateTime) as new_city_dead1,
updateTime,yearinfo,monthinfo from covid_dws.day
) t1;
hive -e '
select
* from covid_app.day_app;
' | sed 's/[[:space:]]\+/,/g' > /root/covid/day_app.csv
结果(多数数据显示为0,这里选取了中间连续有新增的数据)
福建省 21 0 1 0 2021-09-12 18:58:12 2021 9
福建省 22 0 1 0 2021-09-13 19:10:13 2021 9
福建省 29 0 0 0 2021-09-20 19:14:20 2021 9
福建省 31 0 1 0 2021-09-18 18:12:18 2021 9
福建省 43 0 0 0 2021-09-19 10:02:19 2021 9
福建省 48 0 2 0 2021-09-16 09:58:16 2021 9
福建省 51 0 1 0 2021-09-15 18:29:15 2021 9
福建省 60 0 2 0 2021-09-14 18:31:14 2021 9
福建省 61 0 1 0 2021-09-17 19:46:17 2021 9
|