设计思路
之前通过分流等处理手段,将数据拆分成了独立的kafka topic,接下来处理数据,我们应该考虑的是将实时计算使用的指标项进行处理,时效性是实时数仓所追求的,所以在一些场景没有必要和离线数仓一样,大而全的中间层,只需要中间层将一些计算指标保存即可,为下次计算使用提供便利。 所以需要考虑一些实时计算的指标需求,把这些指标以主题宽表的形式输出就是dws层 这里列出来一部分指标,主要为服务可视化大屏计算,说了这么多,dwm层怎么还没出现,别急,dwm层主要服务dws层,因为部分需求直接从dwd层到dws层会有一些复杂的计算,而且这些计算的结果很可能为dws层多个主题复用,所以dwd层形成一层dwm,主要涉及到业务
访客UV计算
UV(Unique Visitor),即独立访客,也称为 DAU(DailyActive User),即每日活跃用户 用户行为日志识别当日访客有两点:
- 识别出访客打开的第一个页面,表示这个访客开始进入我们的应用
- 由于访客可以在一天中多次进入应用,所以我们在一天的范围内进行去重
数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd) -> FlinkApp -> Kafka(dwm) 程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> BaseLogApp -> kafka -> UniqueVisitApp -> Kafka
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String groupId = "unique_visit_app_210325";
String sourceTopic = "dwd_page_log";
String sinkTopic = "dwm_unique_visit";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);
KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
SingleOutputStreamOperator<JSONObject> uvDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {
private ValueState<String> dateState;
private SimpleDateFormat simpleDateFormat;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("date-state", String.class);
StateTtlConfig stateTtlConfig = new StateTtlConfig
.Builder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build();
valueStateDescriptor.enableTimeToLive(stateTtlConfig);
dateState = getRuntimeContext().getState(valueStateDescriptor);
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
}
@Override
public boolean filter(JSONObject value) throws Exception {
String lastPageId = value.getJSONObject("page").getString("last_page_id");
if (lastPageId == null || lastPageId.length() <= 0) {
String lastDate = dateState.value();
String curDate = simpleDateFormat.format(value.getLong("ts"));
if (!curDate.equals(lastDate)) {
dateState.update(curDate);
return true;
}
}
return false;
}
});
uvDS.print();
uvDS.map(JSONAware::toJSONString)
.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));
env.execute("UniqueVisitApp");
数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd) -> FlinkApp -> Kafka(dwm) 程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> BaseLogApp -> kafka -> UserJumpDetailApp -> Kafka
跳出明细计算
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String sourceTopic = "dwd_page_log";
String groupId = "userJumpDetailApp";
String sinkTopic = "dwm_user_jump_detail";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject element, long recordTimestamp) {
return element.getLong("ts");
}
}));
Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
String lastPageId = value.getJSONObject("page").getString("last_page_id");
return lastPageId == null || lastPageId.length() <= 0;
}
}).next("next").where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
String lastPageId = value.getJSONObject("page").getString("last_page_id");
return lastPageId == null || lastPageId.length() <= 0;
}
}).within(Time.seconds(10));
Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
String lastPageId = value.getJSONObject("page").getString("last_page_id");
return lastPageId == null || lastPageId.length() <= 0;
}
})
.times(2)
.consecutive()
.within(Time.seconds(10));
PatternStream<JSONObject> patternStream = CEP
.pattern(jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"))
, pattern);
OutputTag<JSONObject> timeOutTag = new OutputTag<JSONObject>("timeOut") {
};
SingleOutputStreamOperator<JSONObject> selectDS = patternStream.select(timeOutTag,
new PatternTimeoutFunction<JSONObject, JSONObject>() {
@Override
public JSONObject timeout(Map<String, List<JSONObject>> map, long ts) throws Exception {
return map.get("start").get(0);
}
}, new PatternSelectFunction<JSONObject, JSONObject>() {
@Override
public JSONObject select(Map<String, List<JSONObject>> map) throws Exception {
return map.get("start").get(0);
}
});
DataStream<JSONObject> timeOutDS = selectDS.getSideOutput(timeOutTag);
DataStream<JSONObject> unionDS = selectDS.union(timeOutDS);
unionDS.print();
unionDS.map(JSONAware::toJSONString)
.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));
env.execute("UserJumpDetailApp");
}
订单宽表设计
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String orderInfoSourceTopic = "dwd_order_info";
String orderDetailSourceTopic = "dwd_order_detail";
String orderWideSinkTopic = "dwm_order_wide";
String groupId = "order_wide_group_0325";
SingleOutputStreamOperator<OrderInfo> orderInfoDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderInfoSourceTopic, groupId))
.map(line -> {
OrderInfo orderInfo = JSON.parseObject(line, OrderInfo.class);
String create_time = orderInfo.getCreate_time();
String[] dateTimeArr = create_time.split(" ");
orderInfo.setCreate_date(dateTimeArr[0]);
orderInfo.setCreate_hour(dateTimeArr[1].split(":")[0]);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
orderInfo.setCreate_ts(sdf.parse(create_time).getTime());
return orderInfo;
}).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
@Override
public long extractTimestamp(OrderInfo element, long recordTimestamp) {
return element.getCreate_ts();
}
}));
SingleOutputStreamOperator<OrderDetail> orderDetailDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderDetailSourceTopic, groupId))
.map(line -> {
OrderDetail orderDetail = JSON.parseObject(line, OrderDetail.class);
String create_time = orderDetail.getCreate_time();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
orderDetail.setCreate_ts(sdf.parse(create_time).getTime());
return orderDetail;
}).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderDetail>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<OrderDetail>() {
@Override
public long extractTimestamp(OrderDetail element, long recordTimestamp) {
return element.getCreate_ts();
}
}));
SingleOutputStreamOperator<OrderWide> orderWideWithNoDimDS = orderInfoDS.keyBy(OrderInfo::getId)
.intervalJoin(orderDetailDS.keyBy(OrderDetail::getOrder_id))
.between(Time.seconds(-5), Time.seconds(5))
.process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
@Override
public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector<OrderWide> out) throws Exception {
out.collect(new OrderWide(orderInfo, orderDetail));
}
});
orderWideWithNoDimDS.print("orderWideWithNoDimDS>>>>>>>>>");
SingleOutputStreamOperator<OrderWide> orderWideWithUserDS = AsyncDataStream.unorderedWait(
orderWideWithNoDimDS,
new DimAsyncFunction<OrderWide>("DIM_USER_INFO") {
@Override
public String getKey(OrderWide orderWide) {
return orderWide.getUser_id().toString();
}
@Override
public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException {
orderWide.setUser_gender(dimInfo.getString("GENDER"));
String birthday = dimInfo.getString("BIRTHDAY");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
long currentTs = System.currentTimeMillis();
long ts = sdf.parse(birthday).getTime();
long age = (currentTs - ts) / (1000 * 60 * 60 * 24 * 365L);
orderWide.setUser_age((int) age);
}
},
60,
TimeUnit.SECONDS);
SingleOutputStreamOperator<OrderWide> orderWideWithProvinceDS = AsyncDataStream.unorderedWait(orderWideWithUserDS,
new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") {
@Override
public String getKey(OrderWide orderWide) {
return orderWide.getProvince_id().toString();
}
@Override
public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException {
orderWide.setProvince_name(dimInfo.getString("NAME"));
orderWide.setProvince_area_code(dimInfo.getString("AREA_CODE"));
orderWide.setProvince_iso_code(dimInfo.getString("ISO_CODE"));
orderWide.setProvince_3166_2_code(dimInfo.getString("ISO_3166_2"));
}
}, 60, TimeUnit.SECONDS);
SingleOutputStreamOperator<OrderWide> orderWideWithSkuDS = AsyncDataStream.unorderedWait(
orderWideWithProvinceDS, new DimAsyncFunction<OrderWide>("DIM_SKU_INFO") {
@Override
public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
orderWide.setSku_name(jsonObject.getString("SKU_NAME"));
orderWide.setCategory3_id(jsonObject.getLong("CATEGORY3_ID"));
orderWide.setSpu_id(jsonObject.getLong("SPU_ID"));
orderWide.setTm_id(jsonObject.getLong("TM_ID"));
}
@Override
public String getKey(OrderWide orderWide) {
return String.valueOf(orderWide.getSku_id());
}
}, 60, TimeUnit.SECONDS);
SingleOutputStreamOperator<OrderWide> orderWideWithSpuDS = AsyncDataStream.unorderedWait(
orderWideWithSkuDS, new DimAsyncFunction<OrderWide>("DIM_SPU_INFO") {
@Override
public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
orderWide.setSpu_name(jsonObject.getString("SPU_NAME"));
}
@Override
public String getKey(OrderWide orderWide) {
return String.valueOf(orderWide.getSpu_id());
}
}, 60, TimeUnit.SECONDS);
SingleOutputStreamOperator<OrderWide> orderWideWithTmDS = AsyncDataStream.unorderedWait(
orderWideWithSpuDS, new DimAsyncFunction<OrderWide>("DIM_BASE_TRADEMARK") {
@Override
public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
orderWide.setTm_name(jsonObject.getString("TM_NAME"));
}
@Override
public String getKey(OrderWide orderWide) {
return String.valueOf(orderWide.getTm_id());
}
}, 60, TimeUnit.SECONDS);
SingleOutputStreamOperator<OrderWide> orderWideWithCategory3DS = AsyncDataStream.unorderedWait(
orderWideWithTmDS, new DimAsyncFunction<OrderWide>("DIM_BASE_CATEGORY3") {
@Override
public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
orderWide.setCategory3_name(jsonObject.getString("NAME"));
}
@Override
public String getKey(OrderWide orderWide) {
return String.valueOf(orderWide.getCategory3_id());
}
}, 60, TimeUnit.SECONDS);
orderWideWithCategory3DS.print("orderWideWithCategory3DS>>>>>>>>>>>");
orderWideWithCategory3DS
.map(JSONObject::toJSONString)
.addSink(MyKafkaUtil.getKafkaProducer(orderWideSinkTopic));
env.execute("OrderWideApp");
}
支付宽表设计
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String groupId = "payment_wide_group";
String paymentInfoSourceTopic = "dwd_payment_info";
String orderWideSourceTopic = "dwm_order_wide";
String paymentWideSinkTopic = "dwm_payment_wide";
SingleOutputStreamOperator<OrderWide> orderWideDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderWideSourceTopic, groupId))
.map(line -> JSON.parseObject(line, OrderWide.class))
.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderWide>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<OrderWide>() {
@Override
public long extractTimestamp(OrderWide element, long recordTimestamp) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
return sdf.parse(element.getCreate_time()).getTime();
} catch (ParseException e) {
e.printStackTrace();
return recordTimestamp;
}
}
}));
SingleOutputStreamOperator<PaymentInfo> paymentInfoDS = env.addSource(MyKafkaUtil.getKafkaConsumer(paymentInfoSourceTopic, groupId))
.map(line -> JSON.parseObject(line, PaymentInfo.class))
.assignTimestampsAndWatermarks(WatermarkStrategy.<PaymentInfo>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<PaymentInfo>() {
@Override
public long extractTimestamp(PaymentInfo element, long recordTimestamp) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
return sdf.parse(element.getCreate_time()).getTime();
} catch (ParseException e) {
e.printStackTrace();
return recordTimestamp;
}
}
}));
SingleOutputStreamOperator<PaymentWide> paymentWideDS = paymentInfoDS.keyBy(PaymentInfo::getOrder_id)
.intervalJoin(orderWideDS.keyBy(OrderWide::getOrder_id))
.between(Time.minutes(-15), Time.seconds(5))
.process(new ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>() {
@Override
public void processElement(PaymentInfo paymentInfo, OrderWide orderWide, Context ctx, Collector<PaymentWide> out) throws Exception {
out.collect(new PaymentWide(paymentInfo, orderWide));
}
});
paymentWideDS.print(">>>>>>>>>");
paymentWideDS
.map(JSONObject::toJSONString)
.addSink(MyKafkaUtil.getKafkaProducer(paymentWideSinkTopic));
env.execute("PaymentWideApp");
}
DWM层总结
主要通过计算把一种明细转换为另一种明细来应对后续的统计和计算 利用状态(state)进行去重 利用CEP可以针对一组数据进行筛选判断 学会使用intervaljoin处理流join 处理维度关联,通过缓存和异步查询对性能进行优化
|