IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 大数据项目之Flink实时数仓(DWM层) -> 正文阅读

[大数据]大数据项目之Flink实时数仓(DWM层)

设计思路

之前通过分流等处理手段,将数据拆分成了独立的kafka topic,接下来处理数据,我们应该考虑的是将实时计算使用的指标项进行处理,时效性是实时数仓所追求的,所以在一些场景没有必要和离线数仓一样,大而全的中间层,只需要中间层将一些计算指标保存即可,为下次计算使用提供便利。
所以需要考虑一些实时计算的指标需求,把这些指标以主题宽表的形式输出就是dws层在这里插入图片描述
在这里插入图片描述
这里列出来一部分指标,主要为服务可视化大屏计算,说了这么多,dwm层怎么还没出现,别急,dwm层主要服务dws层,因为部分需求直接从dwd层到dws层会有一些复杂的计算,而且这些计算的结果很可能为dws层多个主题复用,所以dwd层形成一层dwm,主要涉及到业务

  • 访客UV计算
  • 跳出明细计算
  • 订单宽表
  • 支付宽表

访客UV计算

UV(Unique Visitor),即独立访客,也称为 DAU(DailyActive User),即每日活跃用户
用户行为日志识别当日访客有两点:

  1. 识别出访客打开的第一个页面,表示这个访客开始进入我们的应用
  2. 由于访客可以在一天中多次进入应用,所以我们在一天的范围内进行去重

数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd) -> FlinkApp -> Kafka(dwm)
程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> BaseLogApp -> kafka -> UniqueVisitApp -> Kafka

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //TODO 2.读取Kafka dwd_page_log 主题的数据
        String groupId = "unique_visit_app_210325";
        String sourceTopic = "dwd_page_log";
        String sinkTopic = "dwm_unique_visit";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
        
        //TODO 3.将每行数据转换为JSON对象
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);

        //TODO 4.过滤数据  状态编程  只保留每个mid每天第一次登陆的数据
        KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
        SingleOutputStreamOperator<JSONObject> uvDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {

            private ValueState<String> dateState;
            private SimpleDateFormat simpleDateFormat;
            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("date-state", String.class);
                //设置状态的超时时间以及更新时间的方式
                StateTtlConfig stateTtlConfig = new StateTtlConfig
                        .Builder(Time.hours(24))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build();
                valueStateDescriptor.enableTimeToLive(stateTtlConfig);
                dateState = getRuntimeContext().getState(valueStateDescriptor);

                simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
            }
            @Override
            public boolean filter(JSONObject value) throws Exception {
                //取出上一条页面信息
                String lastPageId = value.getJSONObject("page").getString("last_page_id");
                //判断上一条页面是否为Null
                if (lastPageId == null || lastPageId.length() <= 0) {
                    //取出状态数据
                    String lastDate = dateState.value();
                    //取出今天的日期
                    String curDate = simpleDateFormat.format(value.getLong("ts"));
                    //判断两个日期是否相同
                    if (!curDate.equals(lastDate)) {
                        dateState.update(curDate);
                        return true;
                    }
                }
                return false;
            }
        });
        //TODO 5.将数据写入Kafka
        uvDS.print();
        uvDS.map(JSONAware::toJSONString)
                .addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));

        //TODO 6.启动任务
        env.execute("UniqueVisitApp");

数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd) -> FlinkApp -> Kafka(dwm)
程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> BaseLogApp -> kafka -> UserJumpDetailApp -> Kafka

跳出明细计算

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //生产环境,与Kafka分区数保持一致

        //TODO 2.读取Kafka主题的数据创建流
        String sourceTopic = "dwd_page_log";
        String groupId = "userJumpDetailApp";
        String sinkTopic = "dwm_user_jump_detail";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));

        //TODO 3.将每行数据转换为JSON对象并提取时间戳生成Watermark
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject)
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                                return element.getLong("ts");
                            }
                        }));

        //TODO 4.定义模式序列
        Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                String lastPageId = value.getJSONObject("page").getString("last_page_id");
                return lastPageId == null || lastPageId.length() <= 0;
            }
        }).next("next").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                String lastPageId = value.getJSONObject("page").getString("last_page_id");
                return lastPageId == null || lastPageId.length() <= 0;
            }
        }).within(Time.seconds(10));

        //使用循环模式  定义模式序列
        Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                String lastPageId = value.getJSONObject("page").getString("last_page_id");
                return lastPageId == null || lastPageId.length() <= 0;
            }
        })
                .times(2)
                .consecutive() //指定严格近邻(next)
                .within(Time.seconds(10));

        //TODO 5.将模式序列作用到流上
        PatternStream<JSONObject> patternStream = CEP
                .pattern(jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"))
                        , pattern);

        //TODO 6.提取匹配上的和超时事件
        OutputTag<JSONObject> timeOutTag = new OutputTag<JSONObject>("timeOut") {
        };
        SingleOutputStreamOperator<JSONObject> selectDS = patternStream.select(timeOutTag,
                new PatternTimeoutFunction<JSONObject, JSONObject>() {
                    @Override
                    public JSONObject timeout(Map<String, List<JSONObject>> map, long ts) throws Exception {
                        return map.get("start").get(0);
                    }
                }, new PatternSelectFunction<JSONObject, JSONObject>() {
                    @Override
                    public JSONObject select(Map<String, List<JSONObject>> map) throws Exception {
                        return map.get("start").get(0);
                    }
                });
        DataStream<JSONObject> timeOutDS = selectDS.getSideOutput(timeOutTag);

        //TODO 7.UNION两种事件
        DataStream<JSONObject> unionDS = selectDS.union(timeOutDS);

        //TODO 8.将数据写入Kafka
        unionDS.print();
        unionDS.map(JSONAware::toJSONString)
                .addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));

        //TODO 9.启动任务
        env.execute("UserJumpDetailApp");

    }

订单宽表设计

                //TODO 1.获取执行环境
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);

                //TODO 2.读取Kafka 主题的数据 并转换为JavaBean对象&提取时间戳生成WaterMark
                String orderInfoSourceTopic = "dwd_order_info";
                String orderDetailSourceTopic = "dwd_order_detail";
                String orderWideSinkTopic = "dwm_order_wide";
                String groupId = "order_wide_group_0325";
                SingleOutputStreamOperator<OrderInfo> orderInfoDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderInfoSourceTopic, groupId))
                        .map(line -> {
                                OrderInfo orderInfo = JSON.parseObject(line, OrderInfo.class);

                                String create_time = orderInfo.getCreate_time();
                                String[] dateTimeArr = create_time.split(" ");
                                orderInfo.setCreate_date(dateTimeArr[0]);
                                orderInfo.setCreate_hour(dateTimeArr[1].split(":")[0]);

                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                orderInfo.setCreate_ts(sdf.parse(create_time).getTime());

                                return orderInfo;
                        }).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
                                        @Override
                                        public long extractTimestamp(OrderInfo element, long recordTimestamp) {
                                                return element.getCreate_ts();
                                        }
                                }));
                SingleOutputStreamOperator<OrderDetail> orderDetailDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderDetailSourceTopic, groupId))
                        .map(line -> {
                                OrderDetail orderDetail = JSON.parseObject(line, OrderDetail.class);
                                String create_time = orderDetail.getCreate_time();

                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                orderDetail.setCreate_ts(sdf.parse(create_time).getTime());

                                return orderDetail;
                        }).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderDetail>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<OrderDetail>() {
                                        @Override
                                        public long extractTimestamp(OrderDetail element, long recordTimestamp) {
                                                return element.getCreate_ts();
                                        }
                                }));

                //TODO 3.双流JOIN
                SingleOutputStreamOperator<OrderWide> orderWideWithNoDimDS = orderInfoDS.keyBy(OrderInfo::getId)
                        .intervalJoin(orderDetailDS.keyBy(OrderDetail::getOrder_id))
                        .between(Time.seconds(-5), Time.seconds(5)) //生成环境中给的时间给最大延迟时间
                        .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
                                @Override
                                public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector<OrderWide> out) throws Exception {
                                        out.collect(new OrderWide(orderInfo, orderDetail));
                                }
                        });

                //打印测试
                orderWideWithNoDimDS.print("orderWideWithNoDimDS>>>>>>>>>");

                //TODO 4.关联维度信息  HBase Phoenix
//        orderWideWithNoDimDS.map(orderWide -> {
//            //关联用户维度
//            Long user_id = orderWide.getUser_id();
//            //根据user_id查询Phoenix用户信息
//            //将用户信息补充至orderWide
//            //地区
//            //SKU
//            //SPU
//            //。。。
//            //返回结果
//            return orderWide;
//        });

                //4.1 关联用户维度
                SingleOutputStreamOperator<OrderWide> orderWideWithUserDS = AsyncDataStream.unorderedWait(
                        orderWideWithNoDimDS,
                        new DimAsyncFunction<OrderWide>("DIM_USER_INFO") {
                                @Override
                                public String getKey(OrderWide orderWide) {
                                        return orderWide.getUser_id().toString();
                                }

                                @Override
                                public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException {
                                        orderWide.setUser_gender(dimInfo.getString("GENDER"));

                                        String birthday = dimInfo.getString("BIRTHDAY");
                                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");

                                        long currentTs = System.currentTimeMillis();
                                        long ts = sdf.parse(birthday).getTime();

                                        long age = (currentTs - ts) / (1000 * 60 * 60 * 24 * 365L);

                                        orderWide.setUser_age((int) age);
                                }
                        },
                        60,
                        TimeUnit.SECONDS);

                //打印测试
//        orderWideWithUserDS.print("orderWideWithUserDS");

                //4.2 关联地区维度
                SingleOutputStreamOperator<OrderWide> orderWideWithProvinceDS = AsyncDataStream.unorderedWait(orderWideWithUserDS,
                        new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") {
                                @Override
                                public String getKey(OrderWide orderWide) {
                                        return orderWide.getProvince_id().toString();
                                }

                                @Override
                                public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException {
                                        orderWide.setProvince_name(dimInfo.getString("NAME"));
                                        orderWide.setProvince_area_code(dimInfo.getString("AREA_CODE"));
                                        orderWide.setProvince_iso_code(dimInfo.getString("ISO_CODE"));
                                        orderWide.setProvince_3166_2_code(dimInfo.getString("ISO_3166_2"));
                                }
                        }, 60, TimeUnit.SECONDS);

                //4.3 关联SKU维度
                SingleOutputStreamOperator<OrderWide> orderWideWithSkuDS = AsyncDataStream.unorderedWait(
                        orderWideWithProvinceDS, new DimAsyncFunction<OrderWide>("DIM_SKU_INFO") {
                                @Override
                                public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
                                        orderWide.setSku_name(jsonObject.getString("SKU_NAME"));
                                        orderWide.setCategory3_id(jsonObject.getLong("CATEGORY3_ID"));
                                        orderWide.setSpu_id(jsonObject.getLong("SPU_ID"));
                                        orderWide.setTm_id(jsonObject.getLong("TM_ID"));
                                }

                                @Override
                                public String getKey(OrderWide orderWide) {
                                        return String.valueOf(orderWide.getSku_id());
                                }
                        }, 60, TimeUnit.SECONDS);

                //4.4 关联SPU维度
                SingleOutputStreamOperator<OrderWide> orderWideWithSpuDS = AsyncDataStream.unorderedWait(
                        orderWideWithSkuDS, new DimAsyncFunction<OrderWide>("DIM_SPU_INFO") {
                                @Override
                                public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
                                        orderWide.setSpu_name(jsonObject.getString("SPU_NAME"));
                                }

                                @Override
                                public String getKey(OrderWide orderWide) {
                                        return String.valueOf(orderWide.getSpu_id());
                                }
                        }, 60, TimeUnit.SECONDS);

                //4.5 关联TM维度
                SingleOutputStreamOperator<OrderWide> orderWideWithTmDS = AsyncDataStream.unorderedWait(
                        orderWideWithSpuDS, new DimAsyncFunction<OrderWide>("DIM_BASE_TRADEMARK") {
                                @Override
                                public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
                                        orderWide.setTm_name(jsonObject.getString("TM_NAME"));
                                }

                                @Override
                                public String getKey(OrderWide orderWide) {
                                        return String.valueOf(orderWide.getTm_id());
                                }
                        }, 60, TimeUnit.SECONDS);

                //4.6 关联Category维度
                SingleOutputStreamOperator<OrderWide> orderWideWithCategory3DS = AsyncDataStream.unorderedWait(
                        orderWideWithTmDS, new DimAsyncFunction<OrderWide>("DIM_BASE_CATEGORY3") {
                                @Override
                                public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
                                        orderWide.setCategory3_name(jsonObject.getString("NAME"));
                                }

                                @Override
                                public String getKey(OrderWide orderWide) {
                                        return String.valueOf(orderWide.getCategory3_id());
                                }
                        }, 60, TimeUnit.SECONDS);

                orderWideWithCategory3DS.print("orderWideWithCategory3DS>>>>>>>>>>>");

                //TODO 5.将数据写入Kafka
                orderWideWithCategory3DS
                        .map(JSONObject::toJSONString)
                        .addSink(MyKafkaUtil.getKafkaProducer(orderWideSinkTopic));

                //TODO 6.启动任务
                env.execute("OrderWideApp");
        }

支付宽表设计

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //TODO 2.读取Kafka主题的数据创建流 并转换为JavaBean对象 提取时间戳生成WaterMark
        String groupId = "payment_wide_group";
        String paymentInfoSourceTopic = "dwd_payment_info";
        String orderWideSourceTopic = "dwm_order_wide";
        String paymentWideSinkTopic = "dwm_payment_wide";
        SingleOutputStreamOperator<OrderWide> orderWideDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderWideSourceTopic, groupId))
                .map(line -> JSON.parseObject(line, OrderWide.class))
                .assignTimestampsAndWatermarks(WatermarkStrategy.<OrderWide>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<OrderWide>() {
                            @Override
                            public long extractTimestamp(OrderWide element, long recordTimestamp) {
                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                try {
                                    return sdf.parse(element.getCreate_time()).getTime();
                                } catch (ParseException e) {
                                    e.printStackTrace();
                                    return recordTimestamp;
                                }
                            }
                        }));
        SingleOutputStreamOperator<PaymentInfo> paymentInfoDS = env.addSource(MyKafkaUtil.getKafkaConsumer(paymentInfoSourceTopic, groupId))
                .map(line -> JSON.parseObject(line, PaymentInfo.class))
                .assignTimestampsAndWatermarks(WatermarkStrategy.<PaymentInfo>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<PaymentInfo>() {
                            @Override
                            public long extractTimestamp(PaymentInfo element, long recordTimestamp) {
                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                try {
                                    return sdf.parse(element.getCreate_time()).getTime();
                                } catch (ParseException e) {
                                    e.printStackTrace();
                                    return recordTimestamp;
                                }
                            }
                        }));
                        
        //TODO 3.双流JOIN
        SingleOutputStreamOperator<PaymentWide> paymentWideDS = paymentInfoDS.keyBy(PaymentInfo::getOrder_id)
                .intervalJoin(orderWideDS.keyBy(OrderWide::getOrder_id))
                .between(Time.minutes(-15), Time.seconds(5))
                .process(new ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>() {
                    @Override
                    public void processElement(PaymentInfo paymentInfo, OrderWide orderWide, Context ctx, Collector<PaymentWide> out) throws Exception {
                        out.collect(new PaymentWide(paymentInfo, orderWide));
                    }
                });

        //TODO 4.将数据写入Kafka
        paymentWideDS.print(">>>>>>>>>");
        paymentWideDS
                .map(JSONObject::toJSONString)
                .addSink(MyKafkaUtil.getKafkaProducer(paymentWideSinkTopic));

        //TODO 5.启动任务
        env.execute("PaymentWideApp");
    }

DWM层总结

主要通过计算把一种明细转换为另一种明细来应对后续的统计和计算
利用状态(state)进行去重
利用CEP可以针对一组数据进行筛选判断
学会使用intervaljoin处理流join
处理维度关联,通过缓存和异步查询对性能进行优化

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-04 11:06:05  更:2022-02-04 11:07:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 1:14:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码