项目概要
之前我们已经通过动态分流把数据分到了我们想要的位置,为了方便后续内容的讲解方便,所以接下来我们可以把配置表的信息进行导入了,然后通过动态分流的方法,把数据发往对应的kafka主题或者是hbase的维度表中:
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
`sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
先清空原来配置表table_process中的数据,建表成功之后,开启业务数据和对应的IDEA程序,生成对应的表:
分流 Sink 之保存业务数据到 Kafka 主题
dwd_cart_info
dwd_comment_info
dwd_coupon_use
dwd_display_log
dwd_favor_info
dwd_order_detail
dwd_order_detail_activity
dwd_order_detail_coupon
dwd_order_info
dwd_order_info_update
dwd_order_refund_info
dwd_page_log
dwd_payment_info
dwd_refund_payment
dwd_start_log
分流Sink之保存维度数据到Hbase主题:
GMALL0709_REALTIME | DIM_ACTIVITY_INFO | TABLE | | | | | | false | null |
| | GMALL0709_REALTIME | DIM_ACTIVITY_RULE | TABLE | | | | | | false | null |
| | GMALL0709_REALTIME | DIM_ACTIVITY_SKU | TABLE | | | | | | false | null |
| | GMALL0709_REALTIME | DIM_BASE_CATEGORY1 | TABLE | | | | | | false | null |
| | GMALL0709_REALTIME | DIM_BASE_CATEGORY2 | TABLE | | | | | | false | null |
| | GMALL0709_REALTIME | DIM_BASE_CATEGORY3 | TABLE | | | | | | false | null |
| | GMALL0709_REALTIME | DIM_BASE_DIC | TABLE | | | | | | false | null |
| | GMALL0709_REALTIME | DIM_BASE_PROVINCE | TABLE | | | | | | false | null |
| | GMALL0709_REALTIME | DIM_BASE_REGION | TABLE | | | | | | false | null |
| | GMALL0709_REALTIME | DIM_BASE_TRADEMARK | TABLE | | | | | | false | null |
| | GMALL0709_REALTIME | DIM_COUPON_INFO | TABLE | | | | | | false | null |
| | GMALL0709_REALTIME | DIM_COUPON_RANGE | TABLE | | | | | | false | null |
| | GMALL0709_REALTIME | DIM_FINANCIAL_SKU_COST | TABLE | | | | | | false | null |
| | GMALL0709_REALTIME | DIM_SKU_INFO | TABLE | | | | | | false | 4 |
| | GMALL0709_REALTIME | DIM_SPU_INFO | TABLE | | | | | | false | 3 |
| | GMALL0709_REALTIME | DIM_USER_INFO | TABLE | | | | | | false | 3 |
+------------+---------------------+-------------------------+---------------+----------+------------+----------------------------+-----------
我们应该能在kafka和Phoenix中看到这些表信息,尽管这些表并不是现在使用,但生成之后能更好的看出来动态分流之后的效果,这些主题或者维度表的内容对应的就是Sql数据库中gmall0709中的各种业务表的数据内容;(其中各个表的数据已经根据配置表中的字段名和信息进行了过滤,也即这些表的信息和MySql数据库中的业务数据是保持一致的)
DWM层业务设计
得到了日志分离数据、维度数据和事实数据后,基本上就完成了ODS、DWD、DIM层的设计,接下来就是DWM层的设计了,那么为什么需要DWM这么一个中间层呢?  以上是DWS层的主题宽表的一个设计思路,那么首先我们需要把所需要的维度表和事实表都制作完毕,然后再开始维度宽表的制作(这里涉及到维度建模理论,简单来讲这是OLAP不同于OLTP的一个明显特征,OLTP通过范式建模,而OLAP则通过维度建模来形成主题宽表,从而完成后续的数据分析工作)。DWM层的作用就是建立在DWD、DWS层之间的一个过渡层,所以这一层不是必要的,只是为了更进一步地区分业务数据,如果业务不是很复杂,也可以直接从dwd层把数据发往DWS层;结构如下所示: 
DWM层业务设计实现–UV计算
 需求分析与思路 //UV,全称是 Unique Visitor,即独立访客,对于实时计算中,也可以称为 DAU(Daily Active User),即每日活跃用户,因为实时计算中的 uv 通常是指当日的访客数。 那么如何从用户行为日志中识别出当日的访客,那么有两点: ? 其一,是识别出该访客打开的第一个页面,表示这个访客开始进入我们的应用 ? 其二,由于访客可以在一天中多次进入应用,所以我们要在一天的范围内进行去重
//核心过滤思路:
? 首先用 keyby 按照 mid 进行分组,每组表示当前设备的访问情况
? 分组后使用 keystate 状态,记录用户进入时间,实现 RichFilterFunction 完成过滤
? 重写 open 方法用来初始化状态
? 重写 filter 方法进行过滤
? 可以直接筛掉 last_page_id 不为空的字段,因为只要有上一页,说明这条不是这个用户进入的首个页面。
? 状态用来记录用户的进入时间,只要这个 lastVisitDate 是今天,就说明用户今天已经访问过了所以筛除掉。如果为空或者不是今天,说明今天还没访问过,则保留。
? 因为状态值主要用于筛选是否今天来过,所以这个记录过了今天基本上没有用了,这里 enableTimeToLive 设定了 1 天的过期时间,避免状态过大。
//从dwd_page_log中获取数据;
//Flink消费dwd_page_log主题中的数据并作uv计算;
//处理后的数据发送到dwm_unique_visit主题中;
//测试1:是否能读取到动态分流后的数据;
//? 启动 log.sh、zk、kafka
//? 运行 Idea 中的 BaseLogApp
//? 运行 Idea 中的 UniqueVisitApp
//? 查看控制台输出
//? 执行流程
//jsonObjDS.print("---->");
//测试2:
? 启动 log.sh、zk、kafka
? 运行 Idea 中的 BaseLogApp
? 运行 Idea 中的 UniqueVisitApp
? 查看控制台输出以及 kafka 的 dwm_unique_visit 主题
? 执行流程
//filteredDS.print(">>>>>");
以下是我在第一次运行时遇到的一个问题,如果大家看的不是我的代码,很可能会遇到这个问题,这里做一个提醒,这里实际上是脚本生成数据的一个问题,修改后的方案是:如果不是启动日志,那就直接输出到主流(页面日志),然后再在页面日志中去找曝光日志;
//此时的结果应该是 BaseLogApp控制台有数据输出,而UniqueVisitApp控制台没有数据输出;
//从前面可以直到dwd_page_log中是有数据的,所以数据出错是在dwd_page_log --> UniqueVisitApp的过程中;debug测试;
//发现是因为每条数据都有一个"last_page_id"属性,导致数据都被过滤掉了;
//是之前BaseLogApp的日志分流时的代码有问题,曝光日志中也包含了页面日志,不能区分开来;
//处理完之后,也可以顺带测试最后的kafka接收情况;
最后,把过滤后的数据输出到DWM层中,在这里我们数据保存到了dwm_unique_visit主题中,等待后续的使用。
DWM层业务设计实现–跳出明细计算
跳出就是用户成功访问了网站的一个页面后就退出,不在继续访问网站的其它页面。而跳出率就是用跳出次数除以访问次数。
关注跳出率,可以看出引流过来的访客是否能很快的被吸引,渠道引流过来的用户之间的质量对比,对于应用优化前后跳出率的对比也能看出优化改进的成果
首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么要抓住几个特征:
? 该页面是用户近期访问的第一个页面,这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明这是这个访客这次访问的第一个页面。
? 首次访问之后很长一段时间(自己设定),用户没继续再有其他页面的访问。
如果对Flink CEP编程不是非常了解,可以参考我的另一篇博客,这里有一个简短的介绍: CEP编程 具体代码思路:首先设定好事件时间戳,按照mid(机器id)进行分组,然后设定好对应模式,即10秒内没有发生页面跳转则认为没有发生跳出行为,把超时部分的内容放入侧输出流中进行输出;(只满足第一个条件的很明显就是我们要的跳出数据,但是不满足后续条件)将这部分数据存到dwm_user_jump_detail主题中,这就是我们要的跳出数据内容
测试: //采用代码中的测试数据来做;并行度改为1; //开启zk、kk、dwm_user_jump_detail消费主题,应用程序,查看是否有输出; //jumpDS.print(">>>>>");
DWM层业务设计实现–订单宽表
订单是统计分析的重要的对象,围绕订单有很多的维度统计需求,比如用户、地区、商品、品类、品牌等等。 为了之后统计计算更加方便,减少大表之间的关联,所以在实时计算过程中将围绕订单的相关数据整合成为一张订单的宽表。 那究竟哪些数据需要和订单整合在一起? 具体实现过程如图,订单表和订单明细表关联起来后,与用户表、地区表、品牌表、品类表、SPU表、SKU表等关联起来;  维度表和事实表进行关联后,可以得到我们所需要的这张支付宽表;那么他们之间是如何进行关联的呢?首先我们可以来看这两张事实表–订单表和订单明细表;
CREATE TABLE `order_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '订单ID',
`consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
`consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
`total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
`order_status` varchar(20) DEFAULT NULL COMMENT '订单状态',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
`payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
`delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
`order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
`out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
`trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`operate_time` datetime DEFAULT NULL COMMENT '操作时间',
`expire_time` datetime DEFAULT NULL COMMENT '失效时间',
`process_status` varchar(20) DEFAULT NULL COMMENT '进度状态',
`tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
`parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
`province_id` int(20) DEFAULT NULL COMMENT '地区',
`activity_reduce_amount` decimal(16,2) DEFAULT NULL COMMENT '促销金额',
`coupon_reduce_amount` decimal(16,2) DEFAULT NULL COMMENT '优惠券',
`original_total_amount` decimal(16,2) DEFAULT NULL COMMENT '原价金额',
`feight_fee` decimal(16,2) DEFAULT NULL COMMENT '运费',
`feight_fee_reduce` decimal(16,2) DEFAULT NULL COMMENT '运费减免',
`refundable_time` datetime DEFAULT NULL COMMENT '可退款日期(签收后30天)',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=29549 DEFAULT CHARSET=utf8 COMMENT='订单表 订单表'
CREATE TABLE `order_detail` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
`sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
`order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
`sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`source_type` varchar(20) DEFAULT NULL COMMENT '来源类型',
`source_id` bigint(20) DEFAULT NULL COMMENT '来源编号',
`split_total_amount` decimal(16,2) DEFAULT NULL,
`split_activity_amount` decimal(16,2) DEFAULT NULL,
`split_coupon_amount` decimal(16,2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=88048 DEFAULT CHARSET=utf8 COMMENT='订单明细表'
以上内容可以从MySql的数据库中的对应表中获得;对应表中事实数据则是由动态分流后获得;
因此,我们可以从dwd_order_info和dwd_order_detail两个主题中去获取我们的事实数据,然后进行关联;所以这里还需要两个Bean Pojo类,来吧两个表中的数据对应封装,根据刚才提到的order_id进行分组,然后用inner Join的方法,把两个对象中的字段合并起来;
public OrderWide(OrderInfo orderInfo, OrderDetail orderDetail){
mergeOrderInfo(orderInfo);
mergeOrderDetail(orderDetail);
}
public void mergeOrderInfo(OrderInfo orderInfo ) {
if (orderInfo != null) {
this.order_id = orderInfo.id;
this.order_status = orderInfo.order_status;
this.create_time = orderInfo.create_time;
this.create_date = orderInfo.create_date;
this.activity_reduce_amount = orderInfo.activity_reduce_amount;
this.coupon_reduce_amount = orderInfo.coupon_reduce_amount;
this.original_total_amount = orderInfo.original_total_amount;
this.feight_fee = orderInfo.feight_fee;
this.total_amount = orderInfo.total_amount;
this.province_id = orderInfo.province_id;
this.user_id = orderInfo.user_id;
}
}
public void mergeOrderDetail(OrderDetail orderDetail ) {
if (orderDetail != null) {
this.detail_id = orderDetail.id;
this.sku_id = orderDetail.sku_id;
this.sku_name = orderDetail.sku_name;
this.order_price = orderDetail.order_price;
this.sku_num = orderDetail.sku_num;
this.split_activity_amount=orderDetail.split_activity_amount;
this.split_coupon_amount=orderDetail.split_coupon_amount;
this.split_total_amount=orderDetail.split_total_amount;
}
}
注意,这里的AsyncDataStream.unorderedWait方法其实是一种优化方案,因为phoenix查询数据比较慢,所以在做维度关联的时候,考虑做优化;
其中这里我们选用的是unorderedWait,因为数据的顺序在这里并不重要;
? 无序等待(unorderedWait)
后来的数据,如果异步查询速度快可以超过先来的数据,这样性能会更好一些,但是会有乱序出现。
? 有序等待(orderedWait)
严格保留先来后到的顺序,所以后来的数据即使先完成也要等前面的数据。所以性能会差一些
接下来我的代码中的关联顺序为: 关联用户维度 --》 关联省市维度 --》关联SKU商品维度 --》关联SPU维度 --》关联品牌维度 --》关联品类维度,只要按照之前关联图中的顺序来关联即可;全部关联起来之后,将数据写入到dwm_order_wide主题中,这样就得到了我们所需的订单宽表的数据了; 现在一个整体的思路都有了,那么回过头来,我们应该要思考一个问题,如何去获取这些维度表的数据呢?两个事实表的数据我们已经从kafka主题表中获取,那么这些维度表数据从Hbase中如何获得呢? 按照我们的分析过程,这里一共需要关联6个维度表,我们可以根据每个维度表单独写一个AsyncFunction,但是这样比较繁琐,但是直接封装也面临一个问题,我们每个关联表的数据内容、关联的主键、分组主键都是不同的,怎么去封装这些内容呢?这里用到一个技巧:模板方法设计模式;
首先,我们需要知道哪些内容是变动的:
1.关联的主键;
2.关联方法;(不同维度表的属性赋值过程)
那么我们可以这么去思考这个问题,既然不能直接封装,那么我们在用的时候再去实现这个过程不就好了;
简单来说就是不对这两个动态内容进行封装,而只对其他内容进行封装,把这两个内容设定为抽象方法,具体怎么实现,到要实现的时候重写即可;
这么一想这个问题,我们就可以在实现对应的维度表关联时来实现具体的关联过程;
String key = getKey(obj);
JSONObject dimInfoJsonObj = DimUtil.getDimInfo(tableName, key);
if(dimInfoJsonObj != null){
join(obj,dimInfoJsonObj);
Phoenix查询
因为我们每次从Phoenix中查询的内容是不同的,所以是不能把查询的返回内容写死的,在PhoenixUtil类中: // 从Phoenix中查询数据 // select * from 表 where XXX=xxx //返回类型为class; public static List queryList(String sql,Class clazz){}的返回内容应该是不确定的,所以我们封装到List中的对象也应该是不确定的;写好了PhoenixUtil类之后,我们再来回过头看看 JSONObject dimInfoJsonObj = DimUtil.getDimInfo(tableName, key);到底是如何实现的: 进入到DimUtil类中,观察getDimInfoNoCache方法,实际上DimUtil底层的查询逻辑就是我们刚才写好的PhoenixUtil类的queryList方法,可以看出具体的一个实现就是通过先拼接好对应的查询Sql语句,然后让Phoenix去对应执行查询;查询的语句类型类似: select * from dim_base_trademark where id=10 and name=zs; 但是我们要面临一个问题: 即Hbase的查询速度是比较慢的,这也是我们前面做异步调用的一个重要原因;所以这里可以考虑再做一个优化,即旁路缓存:
旁路缓存实现
public static JSONObject getDimInfo(String tableName, String id) {
return getDimInfo(tableName, Tuple2.of("id", id));
}
所以,相比未优化之前的getDimInfoNoCache方法,getDimInfo中多增加了一步判断过程,即Redis中是否有对应的缓存数据,如果有直接从redis中获取数据,如果没有,则从Hbase中获取数据并且把数据导入到redis中,保存的key就以dim+dim表名+查询字段名来命名;注意,由于这里我们只用了id来做关联,所以在后面做删除缓存操作的时候,也只写了id的情况,即这里的查询字段名都是id(只不过是各个表自己的id); 在这里还要注意一个点,从redis或者Phoenix中查询出来的数据,不止一个,所以我们最后返回的结果是一个List集合,其中List中的每一个元素就是查询结果的一行封装好的JsonObject对象;
测试
? 测试用户维度关联
? 将 table_process 表中的数据删除掉,执行table_process 初始配置.sql(上一篇博客中也给出了)
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
`sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
? 启动 Maxwell、ZK、Kafka、HDFS、Hbase、Redis
? 运行运行 Idea 中的 BaseDBApp
? 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap)
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop202
--database gmall2021 --table user_info --client_id maxwell_1
? 运行 Idea 中的 OrderWideApp
? 执行模拟生成业务数据的 jar 包 ? 查看控制台输出可以看到用户的年龄以及性别
DWM层业务设计实现–支付宽表
 支付宽表的目的,最主要的原因是支付表没有关联到订单明细,支付金额没有细分到商品上,没有办法统计商品级的支付状况。所以本次宽表的核心就是要把支付表的信息与订单明细(也即上面的订单宽表)关联上。(通过odder_id进行关联) 为了更方便理解支付宽表的结果,在最下方的测试过程中,我给出了支付表和刚才做出来的订单宽表的内容,相当于此处是对支付表的一个内容扩展。 解决方案有两个 ? 一个是把订单宽表输出到Hbase上,在支付宽表计算时查询hbase,这相当于把订单宽表作为一种维度进行管理。(订单宽表的关联思想) ? 一个是用流的方式接收订单宽表数据,然后用双流join方式进行合并。因为订单与支付产生有一定的时差。所以必须用intervalJoin来管理流的状态时间,保证当支付到达时订单明细还保存在状态中。(这里用第二种方式,用流的方式更符合实时处理); 具体的代码流程这里就不展开讲解了,和订单宽表的intervalJoin方法基本一致,具体测试结果如下所示:
|