IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink Sql 实时ETL kafka 数据 -> 正文阅读

[大数据]Flink Sql 实时ETL kafka 数据

1, 配置 debezium kafka 连接器

a, mysql开启binlog, 创建mysql 表和cdc用户

配置mysql: https://debezium.io/documentation/reference/1.3/connectors/mysql.html#setup-the-mysql-server

mysql> show master status;
+------------------+----------+--------------+------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB |
+------------------+----------+--------------+------------------+
| mysql-bin.000001 | 10290780 |              |                  |
+------------------+----------+--------------+------------------+
1 row in set (0.00 sec)

mysql> use test2;
mysql> show create table employee;
+----------+--------------------------------------------------------------------                                                                                                                              
| Table    | Create Table                                                                                                                                                                                                                                                                                                                 
+----------+--------------------------------------------------------------------                                                               +
| employee | CREATE TABLE `employee` (
  `id` int(11) DEFAULT NULL,
  `dept_id` int(11) DEFAULT NULL,
  `name` varchar(20) COLLATE utf8_unicode_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci |
+----------+--------------------------------------------------------------------
1 row in set (0.00 sec)

b, 使用 ksql 创建kafka连接器:debezium

ksql> CREATE SOURCE CONNECTOR `mysql-dbz-src2` WITH(  
     "connector.class"='io.debezium.connector.mysql.MySqlConnector',     
     "tasks.max"='1',
     "database.hostname"='127.0.0.1',
     "database.port"='3306',
     "database.server.name"='dbz2',

     "database.user"='cdc',
     "database.password"='cdc',
     
     "database.whitelist"='test2',
     "database.history.kafka.bootstrap.servers"='localhost:9092',
     "database.history.kafka.topic"='schema-changes2.inventory2'
);

ksql> show connectors;
 Connector Name  | Type   | Class                                         | Status
--------------------------------------------------------------------------------------------------------
 mysql-dbz-src2  | SOURCE | io.debezium.connector.mysql.MySqlConnector    | RUNNING (1/1 tasks RUNNING)
--------------------------------------------------------------------------------------------------------

ksql> show topics;
 Kafka Topic                 | Partitions | Partition Replicas
---------------------------------------------------------------
 dbz2                        | 1          | 1
 dbz2.test2.dept             | 1          | 1
 dbz2.test2.employee         | 1          | 1
 dbz2.test2.enriched_orders  | 1          | 1
 dbz2.test2.orders           | 1          | 1
 dbz2.test2.products         | 1          | 1
 dbz2.test2.salary           | 1          | 1
 dbz2.test2.shipments        | 1          | 1
 default_ksql_processing_log | 1          | 1
---------------------------------------------------------------

通过confluent platform查看kafka数据:
在这里插入图片描述
查看topic schema中:表字段和数据类型
在这里插入图片描述

2, flink sql (单表ETL) :dedebezium,hbase连接器

[root@localhost bin]$ ./sql-client.sh "embedded"
  |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|

        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.


Flink SQL>  // dbz2.test2.employee: (字段类型和 registry.url中保持一致,才能解析数据)
CREATE TABLE topic_test2_employee (
  id int,
  dept_id int,
  name string
) WITH (
 'connector' = 'kafka',
 'topic' = 'dbz2.test2.employee',
 'properties.bootstrap.servers' = 'localhost:9092',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'debezium-avro-confluent',
 'debezium-avro-confluent.schema-registry.url' = 'http://localhost:9081'
);

CREATE TABLE hbase_tab1 (
 id string,
 f ROW<dept_id string, name string >,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'hbase-1.4',
 'table-name' = 'xx_t1',
 'zookeeper.quorum' = '192.168.56.117:2181'
);

/*
Flink SQL> show tables;
hbase_tab1
topic_test2_dept */
INSERT INTO hbase_tab1
SELECT  cast(id as string), ROW( cast(dept_id as string), name) FROM topic_test2_employee;

[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 2e88bb74fded83e284b4e971fa6bf475

登录flink web ui: 查看运行的任务
在这里插入图片描述
查看hbase 数据是否实时变化(mysql insert/update/delete数据后,是否同步变化)

hbase(main):016:0> scan 'xx_t1'
ROW                                      COLUMN+CELL
 1                                       column=f:dept_id, timestamp=1627454335726, value=1
 1                                       column=f:name, timestamp=1627454335726, value=Li si
 2                                       column=f:dept_id, timestamp=1627454335726, value=2
 2                                       column=f:name, timestamp=1627454335726, value=Test 1
2 row(s) in 0.0120 seconds

3, flink sql (多表ETL) :dedebezium,hbase连接器

mysql join查询结果:(flink sql 执行类型sql来合并数据并写入hbase)
在这里插入图片描述

Flink SQL>  CREATE TABLE topic_test2_dept (
			  id int,
			  name string
			) WITH (
			 'connector' = 'kafka',
			 'topic' = 'dbz2.test2.dept',
			 'properties.bootstrap.servers' = 'localhost:9092',
			 'scan.startup.mode' = 'earliest-offset',
			 'format' = 'debezium-avro-confluent',
			 'debezium-avro-confluent.schema-registry.url' = 'http://localhost:9081'
			);

			CREATE TABLE topic_test2_salary (
			  id int,
			  emp_id int,
			  money double
			) WITH (
			 'connector' = 'kafka',
			 'topic' = 'dbz2.test2.salary',
			 'properties.bootstrap.servers' = 'localhost:9092',
			 'scan.startup.mode' = 'earliest-offset',
			 'format' = 'debezium-avro-confluent',
			 'debezium-avro-confluent.schema-registry.url' = 'http://localhost:9081'
			);
			
			CREATE TABLE hbase_www (
			 emp_id string,
			 f ROW<emp_dept_id string, name string , deptname string, salary string>,
			 PRIMARY KEY (emp_id) NOT ENFORCED
			) WITH (
			 'connector' = 'hbase-1.4',
			 'table-name' = 'www',
			 'zookeeper.quorum' = '192.168.56.117:2181'
			);

#关联查询
insert into hbase_www
select  cast(emp.id as string) emp_id, 
	ROW(cast( emp.dept_id as string) , emp.name , dept.name , cast( salary.money as string)    )
from topic_test2_employee  emp
inner join topic_test2_dept dept on dept.id=emp.dept_id
inner join topic_test2_salary salary on salary.emp_id= emp.id

[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 137c51550417ab7003c00c95d74f842b

登录flink web ui: 查看运行的任务
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-29 11:43:03  更:2021-07-29 11:45:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/3 0:55:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码