IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> FlinkSQL to Kafka连接器报错:could not find any factory for identifier ‘kafka‘ that implements -> 正文阅读

[大数据]FlinkSQL to Kafka连接器报错:could not find any factory for identifier ‘kafka‘ that implements

一、软件环境

环境 & 软件版本
LinuxOSCentOS 7
Flink Standalone单节点测试集群1.13.6
kafka0.11.0
Zookeeper3.4.5

二、启动FlinkSql Client

1.13.6版本的flink中的flinksql客户端还是Beta版本

启动FlinkSql客户端:

[bigdata_admin@dn5 bin]$ ./sql-client.sh embedded
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
No default environment specified.
Searching for '/data/flink-1.13.6/conf/sql-client-defaults.yaml'...not found.
Command history file path: /home/bigdata_admin/.flink-sql-history

                                   ?▓██▓██?
                               ▓████??█▓?▓███▓?
                            ▓███▓??        ???▓██?  ?
                          ?██?   ??▓▓█▓▓??      ?████
                          ██?         ??▓███?    ?█?█?
                            ?▓█            ███   ▓??██
                              ▓█       ?????▓██▓???▓▓█
                            █? █   ???       ███▓▓█ ?█???
                            ████?   ?▓█▓      ██??? ▓███?
                         ??█▓▓██       ▓█?    ▓█?▓██▓ ?█?
                   ▓??▓████? ██         ?█    █▓??█???█?
                  ███▓?██▓  ▓█           █   █▓ ?▓█▓▓█?
                ?██▓  ?█?            █  █? ?█████▓? ██▓??
               ███? ? █?          ▓ ?█ █████???    ?█?▓  ▓?
              ██▓█ ??▓?          ▓███████▓?       ?█? ?▓ ▓██▓
           ?██▓ ▓█ █▓█       ??█████▓▓??         ██??  █ ?  ▓█?
           ▓█▓  ▓█ ██▓ ?▓▓▓▓▓▓▓?              ?██▓           ?█?
           ▓█    █ ▓███▓??              ?▓▓▓███▓          ??? ▓█
           ██▓    ██?    ??▓▓███▓▓▓▓▓██████▓?            ▓███  █
          ▓███? ███   ?▓▓???   ?▓████▓?                  ??▓?  █▓
          █▓??▓▓██  ??????????▓██▓?                            █▓
          ██ ▓??█   ▓▓▓▓???  ?█▓       ?▓▓██▓    ▓?          ??▓
          ▓█▓ ▓?█  █▓?  ??▓▓██?            ?▓█?   ??????▓█████?
           ██? ▓█?█?  ?▓▓?  ▓█                █?      ????   ?█?
           ▓█   ?█▓   ?     █?                ?█              █▓
            █▓   ██         █?                 ▓▓        ?█▓▓▓?█?
             █▓ ?▓██?       ▓?                  ▓█▓?????▓█?    ?█
              ██   ▓█▓?      ?                    ??█?██?      ▓▓
               ▓█?   ?█▓??                         ?? █?█▓?????██
                ?██?    ?▓▓?                     ▓██▓?█? ?▓▓▓▓?█▓
                  ?▓██?                          ▓?  ?█▓█  ?????
                      ?▓▓▓▓▓?????????????????????????▓▓  ▓??█?
          
    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Flink SQL> HELP;
The following commands are available:

CLEAR           Clears the current terminal.
CREATE TABLE            Create table under current catalog and database.
DROP TABLE              Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'
CREATE VIEW             Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'
DESCRIBE                Describes the schema of a table with the given name.
DROP VIEW               Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'
EXPLAIN         Describes the execution plan of a query or table with the given name.
HELP            Prints the available commands.
INSERT INTO             Inserts the results of a SQL SELECT query into a declared table sink.
INSERT OVERWRITE                Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.
QUIT            Quits the SQL CLI client.
RESET           Resets a session configuration property. Syntax: 'RESET <key>;'. Use 'RESET;' for reset all session properties.
SELECT          Executes a SQL SELECT query on the Flink cluster.
SET             Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.
SHOW FUNCTIONS          Shows all user-defined and built-in functions or only user-defined functions. Syntax: 'SHOW [USER] FUNCTIONS;'
SHOW TABLES             Shows all registered tables.
SOURCE          Reads a SQL SELECT query from a file and executes it on the Flink cluster.
USE CATALOG             Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'
USE             Sets the current default database. Experimental! Syntax: 'USE <name>;'
LOAD MODULE             Load a module. Syntax: 'LOAD MODULE <name> [WITH ('<key1>' = '<value1>' [, '<key2>' = '<value2>', ...])];'
UNLOAD MODULE           Unload a module. Syntax: 'UNLOAD MODULE <name>;'
USE MODULES             Enable loaded modules. Syntax: 'USE MODULES <name1> [, <name2>, ...];'
BEGIN STATEMENT SET             Begins a statement set. Syntax: 'BEGIN STATEMENT SET;'
END             Ends a statement set. Syntax: 'END;'

Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements.

FlinkSQL建表关联Kafka对应的topic

构建测试表:

Flink SQL> DROP TABLE IF EXISTS `ods_base_province`;
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE `ods_base_province` (
>   `id` INT,
>   `name` STRING,
>   `region_id` INT ,
>   `area_code`STRING
> ) WITH(
> 'connector' = 'kafka',
>  'topic' = 'mydw.base_province',
>  'properties.bootstrap.servers' = 'dn5:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'canal-json' ,
>  'scan.startup.mode' = 'earliest-offset' 
> ) ;
[INFO] Execute statement succeed.

这里的format还有json、csv等类型,需根据数据的实际应用场景来确定。

三、问题症状

建表成功,但是查询其下数据的时候,报了如下错误:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print

四、问题原因 & 解决方案

问题原因:

Flink 运行时上下文 classpath 中缺少flinksql与kafka的连接器jar包(如: flink-sql-connector-kafka_2.11-1.x.y.jar)

解决方案:

通过在Flink集群安装目录${FLINK_HOME}的lib下(如:/data/flink-1.13.6/lib),引入如下 jar:

  • flink-sql-connector-kafka_2.11-1.13.6.jar

下载位置:
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka_2.11/1.13.6

此时lib包下jar:

[bigdata_admin@dn5 lib]$ ll 
total 202084
-rw-r--r-- 1 bigdata_admin bigdata_admin     92314 Feb  4 17:11 flink-csv-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 115425612 Feb  4 17:15 flink-dist_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin    148127 Feb  4 17:11 flink-json-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin   7709740 May  7  2021 flink-shaded-zookeeper-3.4.14.jar
-rw-rw-r-- 1 bigdata_admin bigdata_admin   3674190 Feb  4 17:59 flink-sql-connector-kafka_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin  36455408 Feb  4 17:14 flink-table_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin  41077430 Feb  4 17:14 flink-table-blink_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin    208006 Jan 13 19:06 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin    301872 Jan  7 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin   1790452 Jan  7 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin     24279 Jan  7 18:07 log4j-slf4j-impl-2.17.1.jar

触发MySQL binlog 数据的生成,并接入kafka的topic :mydw.base_province中,kafka消费端接收的数据如下:

[bigdata_admin@dn5 ~]$ kafka-console-consumer --zookeeper dn3:2181 --topic mydw.base_province 

{"data":[{"id":"6","name":"上海","region_id":"2","area_code":"310000"}],"database":"rtdw_test_gmall","es":1652270652000,"id":62,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"上海1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652270652976,"type":"UPDATE"}
{"data":[{"id":"1","name":"北京1","region_id":"1","area_code":"110000"}],"database":"rtdw_test_gmall","es":1652272862000,"id":282,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"北京"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272862913,"type":"UPDATE"}
{"data":[{"id":"1","name":"北京","region_id":"1","area_code":"110000"}],"database":"rtdw_test_gmall","es":1652272892000,"id":286,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"北京1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272892515,"type":"UPDATE"}
{"data":[{"id":"2","name":"天津市1","region_id":"1","area_code":"120000"}],"database":"rtdw_test_gmall","es":1652272930000,"id":291,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"天津市"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272930256,"type":"UPDATE"}
{"data":[{"id":"3","name":"山西1","region_id":"1","area_code":"140000"}],"database":"rtdw_test_gmall","es":1652272933000,"id":292,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"山西"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272933760,"type":"UPDATE"}
{"data":[{"id":"4","name":"内蒙古1","region_id":"1","area_code":"150000"}],"database":"rtdw_test_gmall","es":1652272936000,"id":293,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"内蒙古"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272936865,"type":"UPDATE"}
{"data":[{"id":"2","name":"天津市","region_id":"1","area_code":"120000"}],"database":"rtdw_test_gmall","es":1652272944000,"id":295,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"天津市1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272944673,"type":"UPDATE"}
{"data":[{"id":"3","name":"山西","region_id":"1","area_code":"140000"}],"database":"rtdw_test_gmall","es":1652272947000,"id":296,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"山西1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272947977,"type":"UPDATE"}
{"data":[{"id":"4","name":"内蒙古","region_id":"1","area_code":"150000"}],"database":"rtdw_test_gmall","es":1652272950000,"id":298,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"内蒙古1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272950982,"type":"UPDATE"}

再次执行如下 FlinkSQL 查询语句:

Flink SQL> select * from `ods_base_province`;

Flink SQL Client中查询到的结果:
在这里插入图片描述

Ctrl + C 后退出结果查询
[INFO] Result retrieval cancelled.

五、错误的jar版本

如果没有引入任何flink-sql-connector-kafka_x.y.z.jar
会产生如下错误:

Flink SQL> select * from `ods_base_province`;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

如果引入的jar包版本是:flink-sql-connector-kafka_2.11-1.12.1.jar
则会产生如下错误:

Flink SQL> select * from `ods_base_province`;
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/CatalogTable;

在这里插入图片描述

如果引入的jar包版本是:flink-sql-connector-kafka_2.11-1.14.4.jar
则会产生如下错误:

Flink SQL> select * from ods_base_province;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.configuration.DescribedEnum

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-14 10:00:40  更:2022-05-14 10:00:51 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 5:44:01-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码