IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 湖仓一体电商项目(十八):业务实现之编写写入DWD层业务代码 -> 正文阅读

[大数据]湖仓一体电商项目(十八):业务实现之编写写入DWD层业务代码

文章目录

业务实现之编写写入DWD层业务代码

一、代码编写

二、??????????????创建Iceberg-DWD层表

1、在Hive中添加Iceberg表格式需要的包

2、创建Iceberg表

三、代码测试

1、在Kafka中创建对应的topic

2、将代码中消费Kafka数据改成从头开始消费

3、执行代码,查看对应结果


业务实现之编写写入DWD层业务代码

一、???????代码编写

Flink读取Kafka topic “KAFKA-ODS-TOPIC”?数据写入Iceberg-DWD层也是复用第一个业务代码,这里只需要在代码中加入写入Iceberg-DWD层代码即可,代码如下:

//插入 iceberg - dwd 层 会员浏览商品日志信息 :DWD_BROWSELOG
tblEnv.executeSql(
  s"""
    |insert into hadoop_iceberg.icebergdb.DWD_BROWSELOG
    |select
    | log_time,
    | user_id2,
    | user_ip,
    | front_product_url,
    | browse_product_url,
    | browse_product_tpcode,
    | browse_product_code,
    | obtain_points
    | from ${table} where iceberg_ods_tbl_name = 'ODS_BROWSELOG'
  """.stripMargin)

另外,在Flink处理此topic中每条数据时都有获取对应写入后续Kafka topic信息,本业务对应的每条用户日志数据写入的kafka topic为“KAFKA-DWD-BROWSE-LOG-TOPIC”,所以代码可以复用。

二、??????????????创建Iceberg-DWD层表

代码在执行之前需要在Hive中预先创建对应的Iceberg表,创建Icebreg表方式如下:

1、在Hive中添加Iceberg表格式需要的包

启动HDFS集群,node1启动Hive metastore服务,在Hive客户端启动Hive添加Iceberg依赖包:

#node1节点启动Hive metastore服务
[root@node1 ~]# hive --service metastore &

#在hive客户端node3节点加载两个jar包
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

2、创建Iceberg表

这里创建Iceberg-DWD表有“DWD_BROWSELOG”,创建语句如下:

CREATE TABLE DWD_BROWSELOG  (
 log_time string,
 user_id string,
 user_ip string,
 front_product_url string,
 browse_product_url string,
 browse_product_tpcode string,
 browse_product_code string,
 obtain_points string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWD_BROWSELOG/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

三、代码测试

以上代码编写完成后,代码执行测试步骤如下:

1、在Kafka中创建对应的topic

#在Kafka 中创建 KAFKA-DWD-BROWSE-LOG-TOPIC topic
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWD-BROWSE-LOG-TOPIC --partitions 3 --replication-factor 3

#监控以上topic数据
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DWD-BROWSE-LOG-TOPIC

2、将代码中消费Kafka数据改成从头开始消费

代码中Kafka Connector中属性“scan.startup.mode”设置为“earliest-offset”,从头开始消费数据。

这里也可以不设置从头开始消费Kafka数据,而是直接启动向日志采集接口模拟生产日志代码“RTMockUserLogData.java”,需要启动日志采集接口及Flume。

3、执行代码,查看对应结果

以上代码执行后在,在对应的Kafka “KAFKA-DWD-BROWSE-LOG-TOPIC” topic中都有对应的数据。在Iceberg-DWD层中对应的表中也有数据。

Kafka中结果如下:

Iceberg-DWD层表”DWD_BROWSELOG”中的数据如下:


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ?留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活?
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-09-24 21:03:44  更:2022-09-24 21:04:03 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 21:06:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码