一、软件环境
环境 & 软件 | 版本 |
---|
LinuxOS | CentOS 7 | Flink Standalone单节点测试集群 | 1.13.6 | kafka | 0.11.0 | Zookeeper | 3.4.5 |
二、启动FlinkSql Client
1.13.6版本的flink中的flinksql客户端还是Beta版本
启动FlinkSql客户端:
[bigdata_admin@dn5 bin]$ ./sql-client.sh embedded
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
No default environment specified.
Searching for '/data/flink-1.13.6/conf/sql-client-defaults.yaml'...not found.
Command history file path: /home/bigdata_admin/.flink-sql-history
?▓██▓██?
▓████??█▓?▓███▓?
▓███▓?? ???▓██? ?
?██? ??▓▓█▓▓?? ?████
██? ??▓███? ?█?█?
?▓█ ███ ▓??██
▓█ ?????▓██▓???▓▓█
█? █ ??? ███▓▓█ ?█???
████? ?▓█▓ ██??? ▓███?
??█▓▓██ ▓█? ▓█?▓██▓ ?█?
▓??▓████? ██ ?█ █▓??█???█?
███▓?██▓ ▓█ █ █▓ ?▓█▓▓█?
?██▓ ?█? █ █? ?█████▓? ██▓??
███? ? █? ▓ ?█ █████??? ?█?▓ ▓?
██▓█ ??▓? ▓███████▓? ?█? ?▓ ▓██▓
?██▓ ▓█ █▓█ ??█████▓▓?? ██?? █ ? ▓█?
▓█▓ ▓█ ██▓ ?▓▓▓▓▓▓▓? ?██▓ ?█?
▓█ █ ▓███▓?? ?▓▓▓███▓ ??? ▓█
██▓ ██? ??▓▓███▓▓▓▓▓██████▓? ▓███ █
▓███? ███ ?▓▓??? ?▓████▓? ??▓? █▓
█▓??▓▓██ ??????????▓██▓? █▓
██ ▓??█ ▓▓▓▓??? ?█▓ ?▓▓██▓ ▓? ??▓
▓█▓ ▓?█ █▓? ??▓▓██? ?▓█? ??????▓█████?
██? ▓█?█? ?▓▓? ▓█ █? ???? ?█?
▓█ ?█▓ ? █? ?█ █▓
█▓ ██ █? ▓▓ ?█▓▓▓?█?
█▓ ?▓██? ▓? ▓█▓?????▓█? ?█
██ ▓█▓? ? ??█?██? ▓▓
▓█? ?█▓?? ?? █?█▓?????██
?██? ?▓▓? ▓██▓?█? ?▓▓▓▓?█▓
?▓██? ▓? ?█▓█ ?????
?▓▓▓▓▓?????????????????????????▓▓ ▓??█?
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Flink SQL> HELP;
The following commands are available:
CLEAR Clears the current terminal.
CREATE TABLE Create table under current catalog and database.
DROP TABLE Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'
CREATE VIEW Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'
DESCRIBE Describes the schema of a table with the given name.
DROP VIEW Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'
EXPLAIN Describes the execution plan of a query or table with the given name.
HELP Prints the available commands.
INSERT INTO Inserts the results of a SQL SELECT query into a declared table sink.
INSERT OVERWRITE Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.
QUIT Quits the SQL CLI client.
RESET Resets a session configuration property. Syntax: 'RESET <key>;'. Use 'RESET;' for reset all session properties.
SELECT Executes a SQL SELECT query on the Flink cluster.
SET Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.
SHOW FUNCTIONS Shows all user-defined and built-in functions or only user-defined functions. Syntax: 'SHOW [USER] FUNCTIONS;'
SHOW TABLES Shows all registered tables.
SOURCE Reads a SQL SELECT query from a file and executes it on the Flink cluster.
USE CATALOG Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'
USE Sets the current default database. Experimental! Syntax: 'USE <name>;'
LOAD MODULE Load a module. Syntax: 'LOAD MODULE <name> [WITH ('<key1>' = '<value1>' [, '<key2>' = '<value2>', ...])];'
UNLOAD MODULE Unload a module. Syntax: 'UNLOAD MODULE <name>;'
USE MODULES Enable loaded modules. Syntax: 'USE MODULES <name1> [, <name2>, ...];'
BEGIN STATEMENT SET Begins a statement set. Syntax: 'BEGIN STATEMENT SET;'
END Ends a statement set. Syntax: 'END;'
Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements.
FlinkSQL建表关联Kafka对应的topic
构建测试表:
Flink SQL> DROP TABLE IF EXISTS `ods_base_province`;
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE `ods_base_province` (
> `id` INT,
> `name` STRING,
> `region_id` INT ,
> `area_code`STRING
> ) WITH(
> 'connector' = 'kafka',
> 'topic' = 'mydw.base_province',
> 'properties.bootstrap.servers' = 'dn5:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'canal-json' ,
> 'scan.startup.mode' = 'earliest-offset'
> ) ;
[INFO] Execute statement succeed.
这里的format还有json、csv等类型,需根据数据的实际应用场景来确定。
三、问题症状
建表成功,但是查询其下数据的时候,报了如下错误:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
四、问题原因 & 解决方案
问题原因:
Flink 运行时上下文 classpath 中缺少flinksql与kafka的连接器jar包(如: flink-sql-connector-kafka_2.11-1.x.y.jar)
解决方案:
通过在Flink集群安装目录${FLINK_HOME}的lib下(如:/data/flink-1.13.6/lib),引入如下 jar:
- flink-sql-connector-kafka_2.11-1.13.6.jar
下载位置: https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka_2.11/1.13.6
此时lib包下jar:
[bigdata_admin@dn5 lib]$ ll
total 202084
-rw-r--r-- 1 bigdata_admin bigdata_admin 92314 Feb 4 17:11 flink-csv-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 115425612 Feb 4 17:15 flink-dist_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 148127 Feb 4 17:11 flink-json-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 7709740 May 7 2021 flink-shaded-zookeeper-3.4.14.jar
-rw-rw-r-- 1 bigdata_admin bigdata_admin 3674190 Feb 4 17:59 flink-sql-connector-kafka_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 36455408 Feb 4 17:14 flink-table_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 41077430 Feb 4 17:14 flink-table-blink_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 208006 Jan 13 19:06 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 301872 Jan 7 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 1790452 Jan 7 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 24279 Jan 7 18:07 log4j-slf4j-impl-2.17.1.jar
触发MySQL binlog 数据的生成,并接入kafka的topic :mydw.base_province中,kafka消费端接收的数据如下:
[bigdata_admin@dn5 ~]$ kafka-console-consumer --zookeeper dn3:2181 --topic mydw.base_province
{"data":[{"id":"6","name":"上海","region_id":"2","area_code":"310000"}],"database":"rtdw_test_gmall","es":1652270652000,"id":62,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"上海1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652270652976,"type":"UPDATE"}
{"data":[{"id":"1","name":"北京1","region_id":"1","area_code":"110000"}],"database":"rtdw_test_gmall","es":1652272862000,"id":282,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"北京"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272862913,"type":"UPDATE"}
{"data":[{"id":"1","name":"北京","region_id":"1","area_code":"110000"}],"database":"rtdw_test_gmall","es":1652272892000,"id":286,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"北京1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272892515,"type":"UPDATE"}
{"data":[{"id":"2","name":"天津市1","region_id":"1","area_code":"120000"}],"database":"rtdw_test_gmall","es":1652272930000,"id":291,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"天津市"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272930256,"type":"UPDATE"}
{"data":[{"id":"3","name":"山西1","region_id":"1","area_code":"140000"}],"database":"rtdw_test_gmall","es":1652272933000,"id":292,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"山西"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272933760,"type":"UPDATE"}
{"data":[{"id":"4","name":"内蒙古1","region_id":"1","area_code":"150000"}],"database":"rtdw_test_gmall","es":1652272936000,"id":293,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"内蒙古"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272936865,"type":"UPDATE"}
{"data":[{"id":"2","name":"天津市","region_id":"1","area_code":"120000"}],"database":"rtdw_test_gmall","es":1652272944000,"id":295,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"天津市1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272944673,"type":"UPDATE"}
{"data":[{"id":"3","name":"山西","region_id":"1","area_code":"140000"}],"database":"rtdw_test_gmall","es":1652272947000,"id":296,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"山西1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272947977,"type":"UPDATE"}
{"data":[{"id":"4","name":"内蒙古","region_id":"1","area_code":"150000"}],"database":"rtdw_test_gmall","es":1652272950000,"id":298,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"内蒙古1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272950982,"type":"UPDATE"}
再次执行如下 FlinkSQL 查询语句:
Flink SQL> select * from `ods_base_province`;
Flink SQL Client中查询到的结果:
Ctrl + C 后退出结果查询
[INFO] Result retrieval cancelled.
五、错误的jar版本
如果没有引入任何flink-sql-connector-kafka_x.y.z.jar 会产生如下错误:
Flink SQL> select * from `ods_base_province`;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
如果引入的jar包版本是:flink-sql-connector-kafka_2.11-1.12.1.jar 则会产生如下错误:
Flink SQL> select * from `ods_base_province`;
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/CatalogTable;
如果引入的jar包版本是:flink-sql-connector-kafka_2.11-1.14.4.jar 则会产生如下错误:
Flink SQL> select * from ods_base_province;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.configuration.DescribedEnum
|