1, 配置 debezium kafka 连接器
a, mysql开启binlog, 创建mysql 表和cdc用户
配置mysql: https://debezium.io/documentation/reference/1.3/connectors/mysql.html#setup-the-mysql-server
mysql> show master status;
+------------------+----------+--------------+------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB |
+------------------+----------+--------------+------------------+
| mysql-bin.000001 | 10290780 | | |
+------------------+----------+--------------+------------------+
1 row in set (0.00 sec)
mysql> use test2;
mysql> show create table employee;
+----------+--------------------------------------------------------------------
| Table | Create Table
+----------+-------------------------------------------------------------------- +
| employee | CREATE TABLE `employee` (
`id` int(11) DEFAULT NULL,
`dept_id` int(11) DEFAULT NULL,
`name` varchar(20) COLLATE utf8_unicode_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci |
+----------+--------------------------------------------------------------------
1 row in set (0.00 sec)
b, 使用 ksql 创建kafka连接器:debezium
ksql> CREATE SOURCE CONNECTOR `mysql-dbz-src2` WITH(
"connector.class"='io.debezium.connector.mysql.MySqlConnector',
"tasks.max"='1',
"database.hostname"='127.0.0.1',
"database.port"='3306',
"database.server.name"='dbz2',
"database.user"='cdc',
"database.password"='cdc',
"database.whitelist"='test2',
"database.history.kafka.bootstrap.servers"='localhost:9092',
"database.history.kafka.topic"='schema-changes2.inventory2'
);
ksql> show connectors;
Connector Name | Type | Class | Status
mysql-dbz-src2 | SOURCE | io.debezium.connector.mysql.MySqlConnector | RUNNING (1/1 tasks RUNNING)
ksql> show topics;
Kafka Topic | Partitions | Partition Replicas
dbz2 | 1 | 1
dbz2.test2.dept | 1 | 1
dbz2.test2.employee | 1 | 1
dbz2.test2.enriched_orders | 1 | 1
dbz2.test2.orders | 1 | 1
dbz2.test2.products | 1 | 1
dbz2.test2.salary | 1 | 1
dbz2.test2.shipments | 1 | 1
default_ksql_processing_log | 1 | 1
通过confluent platform查看kafka数据:  查看topic schema中:表字段和数据类型 
2, flink sql (单表ETL) :dedebezium,hbase连接器
[root@localhost bin]$ ./sql-client.sh "embedded"
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Flink SQL>
CREATE TABLE topic_test2_employee (
id int,
dept_id int,
name string
) WITH (
'connector' = 'kafka',
'topic' = 'dbz2.test2.employee',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url' = 'http://localhost:9081'
);
CREATE TABLE hbase_tab1 (
id string,
f ROW<dept_id string, name string >,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'xx_t1',
'zookeeper.quorum' = '192.168.56.117:2181'
);
INSERT INTO hbase_tab1
SELECT cast(id as string), ROW( cast(dept_id as string), name) FROM topic_test2_employee;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 2e88bb74fded83e284b4e971fa6bf475
登录flink web ui: 查看运行的任务  查看hbase 数据是否实时变化(mysql insert/update/delete数据后,是否同步变化)
hbase(main):016:0> scan 'xx_t1'
ROW COLUMN+CELL
1 column=f:dept_id, timestamp=1627454335726, value=1
1 column=f:name, timestamp=1627454335726, value=Li si
2 column=f:dept_id, timestamp=1627454335726, value=2
2 column=f:name, timestamp=1627454335726, value=Test 1
2 row(s) in 0.0120 seconds
3, flink sql (多表ETL) :dedebezium,hbase连接器
mysql join查询结果:(flink sql 执行类型sql来合并数据并写入hbase) 
Flink SQL> CREATE TABLE topic_test2_dept (
id int,
name string
) WITH (
'connector' = 'kafka',
'topic' = 'dbz2.test2.dept',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url' = 'http://localhost:9081'
);
CREATE TABLE topic_test2_salary (
id int,
emp_id int,
money double
) WITH (
'connector' = 'kafka',
'topic' = 'dbz2.test2.salary',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url' = 'http://localhost:9081'
);
CREATE TABLE hbase_www (
emp_id string,
f ROW<emp_dept_id string, name string , deptname string, salary string>,
PRIMARY KEY (emp_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'www',
'zookeeper.quorum' = '192.168.56.117:2181'
);
insert into hbase_www
select cast(emp.id as string) emp_id,
ROW(cast( emp.dept_id as string) , emp.name , dept.name , cast( salary.money as string) )
from topic_test2_employee emp
inner join topic_test2_dept dept on dept.id=emp.dept_id
inner join topic_test2_salary salary on salary.emp_id= emp.id
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 137c51550417ab7003c00c95d74f842b
登录flink web ui: 查看运行的任务 
|