什么是宽表
宽表顾名思义,就是有很多很多字段的一张表,那么我们为什么会用到宽表呢
为什么需要用到宽表
(为了减少关联,每一次关联,分组聚合都会产生shuffle,会很消耗时间,但是宽表也会有数据冗余,用空间换来时间) 比如我们有一些指标需要统计,这里面每一个SQL都需要使用到分组聚合,会非常消耗时间,每一个SQL都需要关联到很多表,每一次用到了join,速度就会变慢,我们要减少关联 假如我们这里有一堆一堆的数据需要做成报表,这里面的数据需要关联到很多张表,一开始我们只有一条一条查询,一条作为一个SQL进行查询,但这样会非常麻烦,那么我们就可以提前将这些表做成一张宽表,之后再进行查询输出
这里我们使用hive来计算每一个指标会很耗时间 比如一条SQL会运行很久 这里我们可以直接使用sparksql来运行,效率会高一些 spark-sql --master yarn-client 这里还可以设置它的shuffle个数 set spark.sql.shuffle.partitions=10;
做一张宽表
比如我们的宽表需要用到这些字段:
mdn string comment ‘手机号’ ,d_province_name string comment ‘旅游目的地市名’ ,o_city_name string comment ‘旅游来源地地市名’ ,o_province_name string comment ‘旅游来源地市名’ ,number_attr string comment ‘号码归属地’ ,d_distance_section string comment ‘出游距离’ ,d_stay_time string comment ‘停留时间按小时’ ,gender string comment ‘性别’ ,trmnl_brand string comment ‘终端品牌’ ,pckg_price int comment ‘套餐’ ,conpot int comment ‘消费潜力’ ,age int comment ‘年龄’
package com.shujia.ads
import com.shujia.common.{Constant, SparkTool}
import org.apache.spark.sql._
/**
* 省游客宽表
* 从
* 省游客表
* 用户画像表
* 行政区配置表
* 中提取出所有需要的字段构建成宽表
*
* 需要的字段
* mdn string comment '手机号' 省游客表
* ,d_province_name string comment '旅游目的地市名' admincode获取
* ,o_city_name string comment '旅游来源地地市名' admincode获取
* ,o_province_name string comment '旅游来源地省名' admincode获取
* ,number_attr string comment '号码归属地'
* ,d_distance_section string comment '出游距离'
* ,d_stay_time string comment '停留时间按小时'
* ,gender string comment '性别'
* ,trmnl_brand string comment '终端品牌'
* ,pckg_price int comment '套餐'
* ,conpot int comment '消费潜力'
* ,age int comment '年龄'
*
*/
object AdsProvinceTouristMskWideD extends SparkTool with Constant {
override def run(spark: SparkSession): Unit = {
import org.apache.spark.sql.functions._
import spark.implicits._
//1、读取省游客表
val province: DataFrame = spark.table(ADS_PROVINCE_TOURIST_TABLE_NAME)
.where($"day_id" === day_id)
//2、用户画像表
val usertag: DataFrame = spark.table(DIM_USERTAG_TABLE_NAME)
.where($"month_id" === month_id)
//3、行政区配置表
val admincode: DataFrame = spark.table(DIM_ADMINCODE_TABLE_NAME)
//年龄段
val ages: Column = when($"age" > 0 and $"age" < 20, "(0,20)")
.when($"age" >= 20 and $"age" < 25, "[20,25)")
.when($"age" >= 25 and $"age" < 30, "[25,30)")
.when($"age" >= 30 and $"age" < 35, "[30,35)")
.when($"age" >= 35 and $"age" < 40, "[35,40)")
.when($"age" >= 40 and $"age" < 45, "[40,45)")
.when($"age" >= 45 and $"age" < 50, "[45,50)")
.when($"age" >= 50 and $"age" < 55, "[50,55)")
.when($"age" >= 55 and $"age" < 60, "[55,60)")
.otherwise("[60,~)")
//距离分段
val d_distance_section: Column =
when($"d_max_distance" >= 10 and $"d_max_distance" < 50, "[10,50)")
.when($"d_max_distance" >= 50 and $"d_max_distance" < 80, "[50,80)")
.when($"d_max_distance" >= 80 and $"d_max_distance" < 120, "[80,120)")
.when($"d_max_distance" >= 120 and $"d_max_distance" < 200, "[120,200)")
.when($"d_max_distance" >= 200 and $"d_max_distance" < 400, "[200,400)")
.when($"d_max_distance" >= 400 and $"d_max_distance" < 800, "[400,800)")
.otherwise("[800,~)")
//停留时间分段
val d_stay_time: Column = when($"d_stay_time" >= 3 and $"d_stay_time" < 6, "[3,6)")
.when($"d_stay_time" >= 6 and $"d_stay_time" < 9, "[6,9)")
.when($"d_stay_time" >= 9 and $"d_stay_time" < 12, "[6,12)")
.when($"d_stay_time" >= 12 and $"d_stay_time" < 15, "[12,15)")
.when($"d_stay_time" >= 15 and $"d_stay_time" < 18, "[15,18)")
.when($"d_stay_time" >= 18 and $"d_stay_time" < 24, "[18,24)")
.otherwise("[24,~)")
//对多次使用的rdd进行缓存
admincode.cache()
//取出省编号和省名称,去重
val proIdAndName: Dataset[Row] = admincode
.select($"prov_id" as "d_province_id", $"prov_name" as "d_province_name")
.distinct()
//关联用户画像表
province
.join(usertag.hint("broadcast"), "mdn")
//对年龄分段
.withColumn("age", ages)
//d对出游距离分段
.withColumn("d_distance_section", d_distance_section)
//停留时间分段
.withColumn("d_stay_time", d_stay_time)
//关联行政区配置表获取省名
.join(proIdAndName.hint("broadcast"), "d_province_id")
//通过来源地区县关联行政区配置表获取来源的省和市
.join(admincode.hint("broadcast"), $"source_county_id" === $"county_id")
//整理数据
.select(
$"mdn",
$"d_province_name",
$"city_name" as "o_city_name",
$"prov_name" as "o_province_name",
$"number_attr",
$"d_distance_section",
$"d_stay_time" ,
$"gender",
$"trmnl_brand",
$"packg" as "pckg_price",
$"conpot",
$"age"
)
//保存数据
.write
.format("csv")
.option("sep", "\t")
.mode(SaveMode.Overwrite)
.save(s"${ADS_PROVINCE_WIDE_PATH}day_id=$day_id")
//增加分区
spark.sql(s"alter table $ADS_PROVINCE_WIDE_TABLE_NAME add if not exists partition(day_id='$day_id')")
}
}
宽表做完之后,再进行各类的查询和检索就方便很多,在宽表上面做检索就会方便一些,速度会快一些 这里只有一个job任务
而之前的任务 会产生四个job任务 时间也会延长很多
使用kylin操作项目中的宽表
为什么要用到kylin
我们对于宽表要实现随意查询(因为我们有很多的指标需要操作)(数据量大,延迟要低)(OLAP)
mr hive存储: 慢 spark sql: 快, 但是当数据量不断累积会出问题 mysql: 快,数据量只能支撑百万级别 hbase: 不能做聚合,只能做简单的查询 impala 和hive类似的一个工具,基于内存计算 kylin 预计算(提前计算好,需要时间),提交将所有的可能都计算出来
kylin的搭建安装
1、上传解压配置环境变量 tar -xvf apache-kylin-2.5.0-bin-hbase1x.tar.gz mv apache-kylin-2.5.0-bin-hbase1x kylin-2.5.0
在/etc/profile中增加 export HIVE_CONF=/usr/local/soft/hive-1.2.1/conf
删除kafka环境变量 unset KAFKA_HOME
在master启动jobhistory mr-jobhistory-daemon.sh start historyserver
启动zookeeper zkServer.sh start
启动hbase start-hbase.sh
2、验证环境是否可行 check-env.sh
3、启动kylin kylin.sh start
4、访问kylin http://master:7070/kylin
初始用户名和密码是 ADMIN/KYLIN
这里就进入了kylin
使用kylin
首先我们要对kylin进行剪枝,因为kylin首先会对你的表做一个预计算,也就是说,你给出的表中有多少个字段,会对这些字段先做一些预计算,提前算出你可能适配的所有结果,假如我们给出的表有四个纬度,就会产生十六个可能的结果,而我们的宽表的字段有一二十个,如果不做剪枝,最后会产生很多的计算,浪费资源和空间(留出四个纬度,最后我们所进行的查询就只能最多保留四个结果)
开始使用kylin
新建一个项目 点击数据源,将我们的表同步进来
构建module
在这里new 一个module出来 和宽表名称保持一致 选择需要的纬度 选择度量,没有不选 选择分区的字段
构建cube
增加纬度,全选 默认度量给了一个count计数,有需要还可以继续加 数据自动合并可以不用管 选择剪枝,三四个即可,下面选层级维度,省和市即可 下面的引擎选择mapreduce,比较稳定 配置完成
进行预计算
点击build进行预计算 选择时间 这样就开始进行预计算
这里可以看到进度 运行完成 回到module,可以看到数据量相比较之前有了一个提升,因为做了预计算
在这里就可以做各种自己做需要的查询
感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。
|