环境
- jdk
1.8 - scala
2.11.8 - flink: 当前最新版本
1.13.0 - kafka
2.4.1
执行代码
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = TableEnvironment.create(bsSettings)
val kafkaSchema = new Schema()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
.field("age", DataTypes.BIGINT())
bsTableEnv.connect(
new Kafka()
.version("2.4.1")
.topic("test")
.property("zookeeper.connect", "192.168.100.130:2181")
.property("bootstrap.servers", "192.168.100.130:9092")
.property("connector.type ","kafka")
.property("parallelism","1")
)
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(kafkaSchema)
.inAppendMode()
.createTemporaryTable("kafkaInputTable")
bsTableEnv.from("kafkaInputTable").execute().print()
报错
Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:45)
......
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.
The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
报错原因
其中比较重要的信息是Could not find a suitable tablefactory ,意思是从备选的三个CsvBatchTableSourceFactory , CsvAppendTableSourceFactory ,KafkaTableSourceSinkFactory 中没有找到合适的tablefactory ,但是这里按道理应该选择KafkaTableSourceSinkFactory 的,debug了一下,源代码如下:
Map<String, String> missingProperties = new HashMap<>();
for (Map.Entry<String, String> e : plainContext.entrySet()) {
if (properties.containsKey(e.getKey())) {
String fromProperties = properties.get(e.getKey());
if (!Objects.equals(fromProperties, e.getValue())) {
mismatchedProperties.put(
e.getKey(), new Tuple2<>(e.getValue(), fromProperties));
}
} else {
missingProperties.put(e.getKey(), e.getValue());
}
}
int matchedSize =
plainContext.size() - mismatchedProperties.size() - missingProperties.size();
if (matchedSize == plainContext.size()) {
matchingFactories.add(factory);
} else {
if (bestMatched == null || matchedSize > bestMatched.matchedSize) {
bestMatched =
new ContextBestMatched<>(
factory, matchedSize, mismatchedProperties, missingProperties);
}
}
}
if (matchingFactories.isEmpty()) {
String bestMatchedMessage = null;
if (bestMatched != null && bestMatched.matchedSize > 0) {
StringBuilder builder = new StringBuilder();
builder.append(bestMatched.factory.getClass().getName());
if (bestMatched.missingProperties.size() > 0) {
builder.append("\nMissing properties:");
bestMatched.missingProperties.forEach(
(k, v) -> builder.append("\n").append(k).append("=").append(v));
}
if (bestMatched.mismatchedProperties.size() > 0) {
builder.append("\nMismatched properties:");
bestMatched.mismatchedProperties.entrySet().stream()
.filter(e -> e.getValue().f1 != null)
.forEach(
e ->
builder.append(
String.format(
"\n'%s' expects '%s', but is '%s'",
e.getKey(),
e.getValue().f0,
e.getValue().f1)));
}
bestMatchedMessage = builder.toString();
}
throw new NoMatchingTableFactoryException(
"Required context properties mismatch.",
bestMatchedMessage,
factoryClass,
(List<TableFactory>) classFactories,
properties);
}
在类中会对符合条件的三个TableFactory 做选择,取出一个bestMatched ,如果没有bestMatched 的TableFactory,就抛出NoMatchingTableFactoryException ,所以要保证正确的 配置的参数正确,否则mismatchedProperties 太多,就会被认为是不匹配的TableFactory
这里我们的connector配置的version 是2.4.1 和KafkaTableSourceSinkFactory.class 不匹配,导致的匹配异常,修改一下版本,改为universal 即可。
如果有配置和我不一样的,可以尝试在判断bestMatched 代码的位置打上断点,调试看一下到底是哪个参数配置有误导致的类型匹配错误。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-r0l4s09C-1627632968026)(images/FlinkSQL报错.png)]
- kafka 的universal版本是什么?
- Flink附带了提供了多个Kafka连接器:universal通用版本,0.10,0.11
- 官方文档解释说universal(通用版本)的连接器,会尝试跟踪Kafka最新版本,兼容0.10或者之后的Kafka版本,官方文档也说对于绝大多数情况使用这个即可。在最新的官方文档上有这个通用版本连接器的迁移介绍:
Migrating Kafka Connector from 0.11 to universal
In order to perform the migration, see the upgrading jobs and Flink versions guide and:
Use Flink 1.9 or newer for the whole process.
Do not upgrade Flink and user operators at the same time.
Make sure that Kafka Consumer and/or Kafka Producer used in your job have assigned unique identifiers (uid):
Use stop with savepoint feature to take the savepoint (for example by using stop --withSavepoint)CLI command.
附:Flink 通过 CreateSql 创建 Table
另外,在最新版本1.13.0 中,TableEnvironment#connect 方法已经被置为Deprecated ,官方更推荐采用executeSql 方法创建Table,原因在官方文档中也有说明,样例代码如下:
val createTable =
"""
CREATE TABLE PERSON (
| name VARCHAR COMMENT '姓名',
| age VARCHAR COMMENT '年龄',
| city VARCHAR COMMENT '所在城市',
| address VARCHAR COMMENT '家庭住址',
| ts BIGINT COMMENT '时间戳',
| pay_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000, 'yyyy-MM-dd HH:mm:ss')), -- 定义事件时间
| WATERMARK FOR pay_time AS pay_time - INTERVAL '0' SECOND
)
|WITH (
| 'connector.type' = 'kafka', -- 使用 kafka connector
| 'connector.version' = 'universal', -- kafka 版本
| 'connector.topic' = 'kafka_ddl', -- kafka topic
| 'connector.startup-mode' = 'earliest-offset', -- 从最早的 offset 开始读取
| 'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息
| 'connector.properties.0.value' = 'Desktop:2181',
| 'connector.properties.1.key' = 'bootstrap.servers',
| 'connector.properties.1.value' = 'Desktop:9091',
| 'update-mode' = 'append',
| 'format.type' = 'json', -- 数据源格式为 json
| 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
|)
""".stripMargin
tEnv.executeSql(createTable)
val query: String ="""SELECT name,COUNT(age) FROM PERSON GROUP BY name""".stripMargin
val result: Table = tEnv.sqlQuery(query)
tEnv.toRetractStream[Row](result).print()
bsEnv.execute("Flink SQL DDL")
|