基于
Flink-1.12
二、SQL Client
2.1、配置
1、添加依赖jar
flink-connector-jdbc_2.11-1.12.3.jar
mysql-connector-java-5.1.48.jar
2、重启flink
stop-cluster.sh
start-cluster.sh
注意: 如果不重启的话,无法加载到上面的依赖jar java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat 或者
Flink SQL> select * from t_sum;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
3、启动SQL Client
./sql-client.sh embedded
2.2、创建表
2.2.1、MySQL 中创建表
create table t_sum(
cnt int
);
2.2.2、Flink SQL Client 创建表
create table t_sum(
cnt int
)
with(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://chb1:3306/flinktest',
'table-name' = 't_sum',
'username' = 'root',
'password' = '123456'
);
2.3、测试
2.3.1、在Flink SQL Client 插入数据
Flink SQL> insert into t_sum values (10);
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 7cc4b890a7cb551dd31d38d0d4613c60
Flink SQL>

查看MySQL 中是否插入数据 
2.3.1、在MySQL 插入数据
mysql> insert into t_sum value(20);
Query OK, 1 row affected (0.00 sec)
mysql>
Flink SQL Client 中也更新了 
2.4、由于使用的是默认catalog ,没有持久存储,所以 Flink SQL Client 退出重新登录,数据没了

参考
JDBC SQL Connector
关注我的公众号【宝哥大数据】, 更多干货。。。

|