IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink SQL搭建实时数仓DWD层 -> 正文阅读

[大数据]Flink SQL搭建实时数仓DWD层

1.实时数仓DWD层

DWD是明细数据层,该层的表结构和粒度与原始表保持一致,不过需要对ODS层数据进行清洗、维度退化、脱敏等,最终得到的数据是干净的,完整的、一致的数据。

(1)对用户行为数据解析。

(2)对核心数据进行空值过滤。

(3)对业务数据采集维度模型重新建模,即维度退化。

2.基于车辆出行的维度建模

3.基于Flink SQL搭建实时数仓DWD层

package com.bigdata.warehouse.dwd;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class DwdCarsLog {    public static void main(String[] args) {
            //1.获取Stream的执行环境StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();            //设置并行度            //senv.setParallelism(1);            //开启checkpoint容错//senv.enableCheckpointing(60000);//senv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//senv.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);//senv.getCheckpointConfig().setCheckpointTimeout(10000);//senv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);            //设置状态后端            //(1)开启RocksDB            //senv.setStateBackend(new EmbeddedRocksDBStateBackend());            //(2)设置checkpoint 存储            //senv.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://mycluster/flink/checkpoints"));            //2.创建表执行环境            StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);            //3.读取车辆进出事实表         tEnv.executeSql("CREATE TABLE ods_cars_log (" +                "  id STRING," +                "  opTime STRING," +                "  ctype SMALLINT," +                "  carCode STRING," +                "  cId BIGINT," +                "  proc_time as PROCTIME() "+                ") WITH (" +                "  'connector' = 'kafka'," +                "  'topic' = 'ods_cars_log'," +                "  'properties.bootstrap.servers' = 'hadoop1:9092'," +                "  'properties.group.id' = 'ods_cars_log'," +                "  'scan.startup.mode' = 'earliest-offset'," +                "  'format' = 'json'" +                ")");
            //4.读取车辆维度表            tEnv.executeSql("CREATE TABLE dim_base_cars ( " +                "  id INT, " +                "  owerId INT, " +                "  carCode STRING, " +                "  carColor STRING, " +                "  type TINYINT, " +                "  remark STRING, " +                "  PRIMARY KEY(id) NOT ENFORCED " +                ") WITH ( " +                "  'connector' = 'jdbc', " +                "  'url' = 'jdbc:mysql://hadoop1:3306/sca?useUnicode=true&characterEncoding=utf8', " +                "  'table-name' = 'dim_base_cars', " +                "  'username' = 'hive', " +                "  'password' = 'hive' " +                ")");
            //5.关联事实表与维度表获取车辆进出明细表            Table resultTable = tEnv.sqlQuery("select " +                "cl.id, " +                "c.owerId, " +                "cl.opTime, " +                "cl.cId, " +                "cl.carCode, " +                "cl.ctype " +                "from ods_cars_log cl " +                "left join dim_base_cars for system_time as of cl.proc_time as c " +                "on cl.carCode=c.carCode");            tEnv.createTemporaryView("resultTable",resultTable);
            //6.创建dwd_cars_log表            tEnv.executeSql("CREATE TABLE dwd_cars_log ( " +                " id STRING, " +                " owerId INT, " +                " opTime STRING, " +                " cId BIGINT, " +                " carCode STRING, " +                " ctype SMALLINT, " +                " PRIMARY KEY (id) NOT ENFORCED " +                ") WITH ( " +                " 'connector' = 'upsert-kafka', " +                " 'topic' = 'dwd_cars_log', " +                " 'properties.bootstrap.servers' = 'hadoop1:9092', " +                " 'key.format' = 'json', " +                " 'value.format' = 'json' " +                ")");
            //7.将关联结果写入dwd_cars_log表            tEnv.executeSql("insert into dwd_cars_log select * from resultTable");    }}

4.基于Kafka创建DWD层topic

#创建kafka topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic dwd_cars_log --replication-factor 3 --partitions 1

5.查看实时数仓DWD层结果

#消费kafka topic

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dwd_cars_log --from-beginning

如果控制台打印预期结果,说明实时数仓DWD层搭建成功。

{"id":"3bfe7e59-4771-4aa8-ab90-80c98010c4ea","owerId":10022759,"opTime":"2022-07-15 11:59:55.443","cId":10000095,"carCode":"青I·PY2MR","ctype":2}

{"id":"36208b62-739b-4eea-abf4-9f26b85b85d1","owerId":10075672,"opTime":"2022-07-15 11:59:56.443","cId":10000311,"carCode":"渝Z·C0AFY","ctype":1}{"id":"2a5df539-4668-4a42-8013-978b82b3c318","owerId":10126156,"opTime":"2022-07-15 11:59:57.443","cId":10000526,"carCode":"晋B·1RPVV","ctype":1}{"id":"2bd0ce39-1c39-4db5-9376-68e297fda4b0","owerId":10206773,"opTime":"2022-07-15 11:59:58.443","cId":10000843,"carCode":"冀D·FX3IJ","ctype":2}{"id":"2959544d-53f9-43e4-9101-96629fecdcc6","owerId":10153485,"opTime":"2022-07-15 11:59:59.443","cId":10000631,"carCode":"晋D·8OWIR","ctype":2}{"id":"2fd665f9-ea27-44fd-a8cd-1f204ab2d5fc","owerId":10152560,"opTime":"2022-07-15 12:00:00.099","cId":10000627,"carCode":"贵A·MVO77","ctype":2}{"id":"3c283bc5-5616-43cf-87b2-c94396ced64f","owerId":10103872,"opTime":"2022-07-15 12:00:01.037","cId":10000425,"carCode":"辽L·3C5DU","ctype":1}{"id":"3634862d-c824-4829-a017-0082b7514471","owerId":10234908,"opTime":"2022-07-15 12:00:02.376","cId":10000961,"carCode":"沪T·QNNXP","ctype":1}{"id":"2b4a4d0f-4441-4e75-8437-008dfea5c03c","owerId":10228881,"opTime":"2022-07-15 12:00:03.33","cId":10000938,"carCode":"闽E·GZKRQ","ctype":2}{"id":"2ce336bc-2b31-4089-ae85-a76921c6a306","owerId":10144509,"opTime":"2022-07-15 12:00:04.819","cId":10000596,"carCode

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-08-06 10:50:33  更:2022-08-06 10:51:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 23:21:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码