IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink sql -mysql cdc 到hudi表在输出到kafka中 -> 正文阅读

[大数据]flink sql -mysql cdc 到hudi表在输出到kafka中

1. 版本 对应的版本

mysqlflinkkafkahudi
5.7.20-logfink 13.52.0.0.30.10

2. 采用架构

在这里插入图片描述

3. flink sql 的 mysql cdc 表

3.1 mysql 表结构

CREATE TABLE `Flink_cdc` (
  `id` bigint(64) NOT NULL AUTO_INCREMENT,
  `name` varchar(64) DEFAULT NULL,
  `age` int(20) DEFAULT NULL,
  `birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=69 DEFAULT CHARSET=utf8mb4;

3.2 flink sql mysql cdc 表

Flink SQL> CREATE TABLE source_mysql (
>    id BIGINT PRIMARY KEY NOT ENFORCED,
>    name STRING,
>    age INT,
>    birthday TIMESTAMP(3),
>    ts TIMESTAMP(3)
>  ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = '192.168.1.162',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = '123456',
>  'server-time-zone' = 'Asia/Shanghai',
>  'debezium.snapshot.mode' = 'initial',
>  'database-name' = 'wudldb',
>  'table-name' = 'Flink_cdc'
>  );
> 
[INFO] Execute statement succeed.

3.2 新建hudi 表 并且插入数据


Flink SQL>  CREATE TABLE flink_cdc_sink_hudi_hive_wudl(
> id bigint ,
> name string,
> age int,
> birthday TIMESTAMP(3),
> ts TIMESTAMP(3),
> part STRING,
> primary key(id) not enforced
> )
> PARTITIONED BY (part)
> with(
> 'connector'='hudi',
> 'path'= 'hdfs://192.168.1.161:8020/flink_cdc_sink_hudi_hive_wudl', 
> 'table.type'= 'MERGE_ON_READ',
> 'hoodie.datasource.write.recordkey.field'= 'id', 
> 'write.precombine.field'= 'ts',
> 'write.tasks'= '1',
> 'write.rate.limit'= '2000', 
> 'compaction.tasks'= '1', 
> 'compaction.async.enabled'= 'true',
> 'compaction.trigger.strategy'= 'num_commits',
> 'compaction.delta_commits'= '1',
> 'changelog.enabled'= 'true',
> 'read.streaming.enabled'= 'true',
> 'read.streaming.check-interval'= '3',
> 'hive_sync.enable'= 'true',
> 'hive_sync.mode'= 'hms',
> 'hive_sync.metastore.uris'= 'thrift://node02.com:9083',
> 'hive_sync.jdbc_url'= 'jdbc:hive2://node02.com:10000',
> 'hive_sync.table'= 'flink_cdc_sink_hudi_hive_wudl',
> 'hive_sync.db'= 'db_hive',
> 'hive_sync.username'= 'root',
> 'hive_sync.password'= '123456',
> 'hive_sync.support_timestamp'= 'true'
> );
[INFO] Execute statement succeed.


3.3 将cdc 的表数据插入到hudi 表中

Flink SQL> INSERT INTO flink_cdc_sink_hudi_hive_wudl SELECT id, name,age,birthday, ts, DATE_FORMAT(birthday, 'yyyyMMdd') as part FROM source_mysql ;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8a6e4869c43e57d57357c1767e7c2b38


4. 查看数据

在这里插入图片描述

5. 批处理 从hudi 表输出到 kakfa

5.1 创建hudi 表

Flink SQL> CREATE TABLE hudi_flink_kafka_source (
> id bigint ,
> name string,
> age int,
> birthday TIMESTAMP(3),
> ts TIMESTAMP(3),
> part STRING,
> primary key(id) not enforced
> )
> PARTITIONED BY (part) 
> WITH (
>   'connector' = 'hudi',
>   'path'= 'hdfs://192.168.1.161:8020/flink_cdc_sink_hudi_hive20220905', 
>   'table.type' = 'MERGE_ON_READ',
>   'write.operation' = 'upsert',
>   'hoodie.datasource.write.recordkey.field'= 'id',
>   'write.precombine.field' = 'ts',
>   'write.tasks'= '1',
>   'compaction.tasks' = '1', 
>   'compaction.async.enabled' = 'true', 
>   'compaction.trigger.strategy' = 'num_commits', 
>   'compaction.delta_commits' = '1'
>   );
> 

5.2 创建kafka 表

Flink SQL> CREATE TABLE kakfa_sink6 (
> id bigint ,
> name string,
> age int,
> birthday TIMESTAMP(3),
> ts TIMESTAMP(3)
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'wudl2022flink03',
>   'properties.bootstrap.servers' = '192.168.1.161:6667',
>   'properties.group.id' = 'wudl20220905',
>   'format' = 'json',
>   'json.fail-on-missing-field' = 'false',
>   'json.ignore-parse-errors' = 'true'
> );
[INFO] Execute statement succeed.

Flink SQL> INSERT INTO kakfa_sink6  SELECT id, name,age,birthday, ts FROM hudi_flink_kafka_source ;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 005ee1b8011319d235c6485c2abb3efb



6. 查看表结构数据

在这里插入图片描述

7. 时间转化函数

7.1 flink sql LOCALTIMESTAMP 获取系统时间

Flink SQL> select     DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss');
+----+--------------------------------+
| op |                         EXPR$0 |
+----+--------------------------------+
| +I |            2022-09-05 19:19:42 |
+----+--------------------------------+
Received a total of 1 row

 # TO_TIMESTAMP  时间的转化
 
Flink SQL> 

Flink SQL> select TO_TIMESTAMP(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss'));
+----+-------------------------+
| op |                  EXPR$0 |
+----+-------------------------+
| +I | 2022-09-05 19:20:30.000 |
+----+-------------------------+
Received a total of 1 row

Flink SQL> 

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-09-13 11:22:42  更:2022-09-13 11:22:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 23:57:15-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码