背景
在FlinkSql client下尝试将 kafka中映射的虚拟表ods_base_province 导入到mysql表base_province时,抛了如下错误:
Flink SQL>
INSERT INTO base_province
SELECT *
FROM ods_base_province;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
kafka
print
upsert-kafka
Shutting down the session...
done.
[bigdata_admin@dn5 flink_sql_rtdw-demo]$
分析
在Flink运行时上下文中可能:
- 缺少flink与jdbc的连接适配器
- 缺少 mysql 的 jdbc 驱动包
请检查${FLINK_HOME}/lib下是否包含如下名称的jar:
- flink-connector-jdbc_2.x-1.y.z.jar
- mysql-connector-java-5.1.38.jar (仅举例)
修复
手工添加上述依赖包,记得把hive、kafka的适配器连接包,也一块导进来:
cd ${FLINK_HOME}/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.13.6/flink-connector-jdbc_2.11-1.13.6.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.13.6/flink-connector-hive_2.11-1.13.6.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.6/flink-sql-connector-kafka_2.11-1.13.6.jar
下载后的jar包:
[bigdata_admin@dn5 lib]$ ll
total 210872
-rw-rw-r-- 1 bigdata_admin bigdata_admin 7758727 Feb 4 17:48 flink-connector-hive_2.11-1.13.6.jar
-rw-rw-r-- 1 bigdata_admin bigdata_admin 249570 Feb 4 17:48 flink-connector-jdbc_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 92314 Feb 4 17:11 flink-csv-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 115425612 Feb 4 17:15 flink-dist_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 148127 Feb 4 17:11 flink-json-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 7709740 May 7 2021 flink-shaded-zookeeper-3.4.14.jar
-rw-rw-r-- 1 bigdata_admin bigdata_admin 3674190 Feb 4 17:59 flink-sql-connector-kafka_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 36455408 Feb 4 17:14 flink-table_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 41077430 Feb 4 17:14 flink-table-blink_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 208006 Jan 13 19:06 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 301872 Jan 7 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 1790452 Jan 7 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 24279 Jan 7 18:07 log4j-slf4j-impl-2.17.1.jar
-rw-rw-r-- 1 bigdata_admin bigdata_admin 983911 Dec 2 2015 mysql-connector-java-5.1.38.jar
引申
这里使用的FlinkSql client方式来操作source端kafka中的数据,落地至sink端的mysql中,在使用TableEnvironment scala编程时,请将 驱动包添加到 pom.xml中, 同时在相关依赖中的参数置为 provided,如下所示, 以防止与服务器上的jar发生jar冲突。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
Maven依赖包参数取值范围:
1.test范围是指测试范围有效,在编译和打包时都不会使用这个依赖
2.compile范围是指编译范围内有效,在编译和打包时都会将依赖存储进去
3.provided依赖,在编译和测试过程中有效,最后生成的war包时不会加入 例如:
servlet-api,因为servlet-api tomcat服务器已经存在了,如果再打包会冲突
4.runtime在运行时候依赖,在编译时候不依赖
默认依赖范围是compile
|