IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 解决Flink SQL connector 连接 kafka 时报Could not find a suitable table factory的错 -> 正文阅读

[大数据]解决Flink SQL connector 连接 kafka 时报Could not find a suitable table factory的错

环境

  • jdk 1.8
  • scala 2.11.8
  • flink: 当前最新版本1.13.0
  • kafka 2.4.1

执行代码

   val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val bsTableEnv = TableEnvironment.create(bsSettings)

    val kafkaSchema = new Schema()
      .field("id", DataTypes.INT())
      .field("name", DataTypes.STRING())
      .field("age", DataTypes.BIGINT())

    bsTableEnv.connect(
      new Kafka()
        .version("2.4.1")
        .topic("test")
        .property("zookeeper.connect", "192.168.100.130:2181")
        .property("bootstrap.servers", "192.168.100.130:9092")
        .property("connector.type ","kafka")
        .property("parallelism","1")
    )
      .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
//      .withFormat(new Json())
      .withSchema(kafkaSchema)
      .inAppendMode()
      .createTemporaryTable("kafkaInputTable")

    bsTableEnv.from("kafkaInputTable").execute().print()

报错

Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
	at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:45)
	......
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties: 
'connector.type' expects 'filesystem', but is 'kafka'
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory

报错原因

其中比较重要的信息是Could not find a suitable tablefactory,意思是从备选的三个CsvBatchTableSourceFactory, CsvAppendTableSourceFactory,KafkaTableSourceSinkFactory中没有找到合适的tablefactory,但是这里按道理应该选择KafkaTableSourceSinkFactory的,debug了一下,源代码如下:

Map<String, String> missingProperties = new HashMap<>();
            for (Map.Entry<String, String> e : plainContext.entrySet()) {
                if (properties.containsKey(e.getKey())) {
                    String fromProperties = properties.get(e.getKey());
                    if (!Objects.equals(fromProperties, e.getValue())) {
                        mismatchedProperties.put(
                                e.getKey(), new Tuple2<>(e.getValue(), fromProperties));
                    }
                } else {
                    missingProperties.put(e.getKey(), e.getValue());
                }
            }
            int matchedSize =
                    plainContext.size() - mismatchedProperties.size() - missingProperties.size();
            if (matchedSize == plainContext.size()) {
                matchingFactories.add(factory);
            } else {
                if (bestMatched == null || matchedSize > bestMatched.matchedSize) {
                    bestMatched =
                            new ContextBestMatched<>(
                                    factory, matchedSize, mismatchedProperties, missingProperties);
                }
            }
        }

        if (matchingFactories.isEmpty()) {
            String bestMatchedMessage = null;
            if (bestMatched != null && bestMatched.matchedSize > 0) {
                StringBuilder builder = new StringBuilder();
                builder.append(bestMatched.factory.getClass().getName());

                if (bestMatched.missingProperties.size() > 0) {
                    builder.append("\nMissing properties:");
                    bestMatched.missingProperties.forEach(
                            (k, v) -> builder.append("\n").append(k).append("=").append(v));
                }

                if (bestMatched.mismatchedProperties.size() > 0) {
                    builder.append("\nMismatched properties:");
                    bestMatched.mismatchedProperties.entrySet().stream()
                            .filter(e -> e.getValue().f1 != null)
                            .forEach(
                                    e ->
                                            builder.append(
                                                    String.format(
                                                            "\n'%s' expects '%s', but is '%s'",
                                                            e.getKey(),
                                                            e.getValue().f0,
                                                            e.getValue().f1)));
                }

                bestMatchedMessage = builder.toString();
            }
            //noinspection unchecked
            throw new NoMatchingTableFactoryException(
                    "Required context properties mismatch.",
                    bestMatchedMessage,
                    factoryClass,
                    (List<TableFactory>) classFactories,
                    properties);
        }

在类中会对符合条件的三个TableFactory做选择,取出一个bestMatched,如果没有bestMatched的TableFactory,就抛出NoMatchingTableFactoryException,所以要保证正确的 配置的参数正确,否则mismatchedProperties太多,就会被认为是不匹配的TableFactory

这里我们的connector配置的version 是2.4.1KafkaTableSourceSinkFactory.class不匹配,导致的匹配异常,修改一下版本,改为universal即可。

如果有配置和我不一样的,可以尝试在判断bestMatched代码的位置打上断点,调试看一下到底是哪个参数配置有误导致的类型匹配错误。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-r0l4s09C-1627632968026)(images/FlinkSQL报错.png)]

  • kafka 的universal版本是什么?
    • Flink附带了提供了多个Kafka连接器:universal通用版本,0.10,0.11
    • 官方文档解释说universal(通用版本)的连接器,会尝试跟踪Kafka最新版本,兼容0.10或者之后的Kafka版本,官方文档也说对于绝大多数情况使用这个即可。在最新的官方文档上有这个通用版本连接器的迁移介绍:
Migrating Kafka Connector from 0.11 to universal
In order to perform the migration, see the upgrading jobs and Flink versions guide and:

Use Flink 1.9 or newer for the whole process.
Do not upgrade Flink and user operators at the same time.
Make sure that Kafka Consumer and/or Kafka Producer used in your job have assigned unique identifiers (uid):
Use stop with savepoint feature to take the savepoint (for example by using stop --withSavepoint)CLI command.

附:Flink 通过 CreateSql 创建 Table

另外,在最新版本1.13.0中,TableEnvironment#connect方法已经被置为Deprecated,官方更推荐采用executeSql方法创建Table,原因在官方文档中也有说明,样例代码如下:

 val createTable =
      """
              CREATE TABLE PERSON (
              |    name VARCHAR COMMENT '姓名',
              |    age VARCHAR COMMENT '年龄',
              |    city VARCHAR COMMENT '所在城市',
              |    address VARCHAR COMMENT '家庭住址',
              |    ts BIGINT COMMENT '时间戳',
              |    pay_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000, 'yyyy-MM-dd HH:mm:ss')), -- 定义事件时间
              |    WATERMARK FOR pay_time AS pay_time - INTERVAL '0' SECOND
              )
              |WITH (
              |    'connector.type' = 'kafka', -- 使用 kafka connector
              |    'connector.version' = 'universal',  -- kafka 版本
              |    'connector.topic' = 'kafka_ddl',  -- kafka topic
              |    'connector.startup-mode' = 'earliest-offset', -- 从最早的 offset 开始读取
              |    'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息
              |    'connector.properties.0.value' = 'Desktop:2181',
              |    'connector.properties.1.key' = 'bootstrap.servers',
              |    'connector.properties.1.value' = 'Desktop:9091',
              |    'update-mode' = 'append',
              |    'format.type' = 'json',  -- 数据源格式为 json
              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
              |)
            """.stripMargin

    tEnv.executeSql(createTable)
    val query: String ="""SELECT name,COUNT(age) FROM PERSON GROUP BY name""".stripMargin
    val result: Table = tEnv.sqlQuery(query)
    tEnv.toRetractStream[Row](result).print()
      bsEnv.execute("Flink SQL DDL")
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-31 16:42:40  更:2021-07-31 16:44:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 14:11:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码