一 DWM层-跳出明细计算
1 需求分析与思路
(1)什么是跳出
跳出: 用户成功访问了网站的一个页面后就退出,不再继续访问网站的其它页面。而跳出率就是用跳出次数除以访问次数。
关注跳出率,可以看出从某几个网站引流过来的访客是否能很快的被吸引,渠道引流过来的用户之间的质量对比,对于应用优化前后跳出率的对比也能看出优化改进的成果。
(2)计算跳出行为的思路
首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么要抓住几个特征:
这第一个特征的识别很简单,保留last_page_id为空的就可以了。但是第二个访问的判断,其实有点麻烦,首先这不是用一条数据就能得出结论的,需要组合判断,要用一条存在的数据和不存在的数据进行组合判断。而且要通过一个不存在的数据求得一条存在的数据。更麻烦的它并不是永远不存在,而是在一定时间范围内不存在。那么如何识别有一定失效的组合行为呢?
最简单的办法就是Flink自带的复杂事件处理(CEP)技术。CEP非常适合通过多条数据组合来识别某个事件。
用户跳出事件,本质上就是一个条件事件加一个超时事件的组合。
(3)实现思路
实现思路如下:
- 从kafka中读取数据。
- 使用CEP对数据进行过滤。
- 该页面是用户近期访问的第一个页面。
- 如果在指定时间内,有当前设备对网站其他页面的访问,说明发生了跳转。
- 反之,则发生了跳出。
- 可以使用within指定匹配的时间;涉及到了时间,flink1.12默认的时间语义就是事件时间语义,需要指定watermark以及提取事件时间字段。
- 使用CEP编程步骤
- 定义pattern
- 将pattern应用到流上
- 从流中按照指定的模式提取数据
2 读取数据
从kafka的dwd_page_log主题中读取页面日志。
(1)代码编写
public class UserJumpDetailAPP {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
String topic = "dwd_page_log";
String groupId = "user_jump_detail_app_group";
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);
jsonObjDS.print(">>>");
env.execute();
}
}
(2)测试
启动相关进程,模拟日志生成,查看是否可以正常接收到数据。
3 通过Flink的CEP完成跳出判断
(1)确认添加了CEP的依赖包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
(2)设定时间语义为事件时间并指定数据中的ts字段为事件时间
由于这里涉及到时间的判断,所以必须设定数据流的EventTime和水位线。这里没有设置延迟时间,实际生产情况可以视乱序情况增加一些延迟。
增加延迟把forMonotonousTimestamps 换为forBoundedOutOfOrderness 即可。
注意:flink1.12默认的时间语义就是事件时间,所以不需要执行。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<JSONObject> jsonObjWithWatermark = jsonObjDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObj, long recordTimestamp) {
return jsonObj.getLong("ts");
}
}
)
);
(3)根据日志数据的mid进行分组
因为用户的行为都是要基于相同的Mid的行为进行判断,所以要根据Mid进行分组。
KeyedStream<JSONObject, String> keyedDS = jsonObjWithWatermark.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
(4)配置CEP表达式
跳出 or 跳转的3个条件
- 必须是一个新会话,lastPageId为空
- 访问了其他页面
- 不能超过一定时间
Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("first").where(
new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
if (lastPageId == null || lastPageId.length() == 0) {
return true;
}
return false;
}
}
).next("second").where(
new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
String pageId = jsonObj.getJSONObject("page").getString("page_id");
if (pageId != null && pageId.length() > 0) {
return true;
}
return false;
}
}
)
.within(Time.seconds(10));
(5)根据表达式筛选流
PatternStream<JSONObject> patternDS = CEP.pattern(keyedDS, pattern);
(6)提取命中的数据
a 从模式中提取
在获得到一个PatternStream 之后,可以应用各种转换来发现事件序列。推荐使用PatternProcessFunction 。
PatternProcessFunction 有一个processMatch 的方法在每找到一个匹配的事件序列时都会被调用。 它按照Map<String, List<IN>> 的格式接收一个匹配,映射的键是模式序列中的每个模式的名称,值是被接受的事件列表(IN 是输入事件的类型)。 模式的输入事件按照时间戳进行排序。为每个模式返回一个接受的事件列表的原因是当使用循环模式(比如oneToMany() 和times() )时, 对一个模式会有不止一个事件被接受。
官网说明:从模式中提取。
b 处理超时的部分匹配
当一个模式上通过within 加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。可以使用TimedOutPartialMatchHandler 接口 来处理超时的部分匹配。这个接口可以和其它的混合使用。也就是说你可以在自己的PatternProcessFunction 里另外实现这个接口。 TimedOutPartialMatchHandler 提供了另外的processTimedOutMatch 方法,这个方法对每个超时的部分匹配都会调用。
c 便捷的API
前面提到的PatternProcessFunction 是在Flink 1.8之后引入的,从那之后推荐使用这个接口来处理匹配到的结果。 用户仍然可以使用像select /flatSelect 这样旧格式的API,它们会在内部被转换为PatternProcessFunction 。
使用方法:
- 设定超时时间标识 timeoutTag。
- flatSelect方法中,实现PatternFlatTimeoutFunction中的timeout方法。
- 所有out.collect的数据都被打上了超时标记。
- 本身的flatSelect方法因为不需要未超时的数据所以不接受数据。
- 通过SideOutput侧输出流输出超时数据。
OutputTag<String> timeoutTag = new OutputTag<>("timeoutTag");
SingleOutputStreamOperator<String> resDS = patternDS.flatSelect(
timeoutTag,
new PatternFlatTimeoutFunction<JSONObject, String>() {
@Override
public void timeout(Map<String, List<JSONObject>> pattern, long timestamp, Collector<String> out) throws Exception {
List<JSONObject> jsonObjectList = pattern.get("first");
for (JSONObject jsonObj : jsonObjectList) {
out.collect(jsonObj.toJSONString());
}
}
},
new PatternFlatSelectFunction<JSONObject, String>() {
@Override
public void flatSelect(Map<String, List<JSONObject>> map, Collector<String> collector) throws Exception {
}
}
);
DataStream<String> jumpDS = resDS.getSideOutput(timeoutTag);
jumpDS.print(">>>>");
(7)利用测试数据完成测试
{common:{mid:101},page:{page_id:home},ts:10000} ,
{common:{mid:102},page:{page_id:home},ts:12000},
{common:{mid:102},page:{page_id:good_list,last_page_id:home},ts:15000},
{common:{mid:102},page:{page_id:good_list,last_page_id:detail},ts:30000}
DataStream<String> kafkaDS = env
.fromElements(
"{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
"{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
"{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
"\"home\"},\"ts\":15000} ",
"{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
"\"detail\"},\"ts\":30000} "
);
输出结果:
>>>>:3> {"common":{"mid":"101"},"page":{"page_id":"home"},"ts":10000}
4 写回kafka
将跳出数据写回到kafka的DWM层。
jumpDS.addSink(MyKafkaUtil.getKafkaSink("dwm_user_jump_detail"));
5 测试
将测试数据注释掉,打开kafka数据源。
打开zookeeper、kafka、日志采集服务、kafka消费者、BaseLogApp、UserJumpDetailAPP,查看输出结果。
|