报错信息:
Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.
Reason: Required context properties mismatch.
The matching candidates:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
The following properties are requested:
connector.properties.bootstrap.servers=10.100.2.191:6667
connector.properties.zookeeper.connect=10.100.2.191:2181
connector.property-version=1
connector.topic=ywb02
connector.type=kafka
connector.version=0.11
format.property-version=1
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=name
schema.1.data-type=INT
schema.1.name=age
The following factories have been considered:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at org.apache.flink.table.planner.StreamPlanner.getTableSink(StreamPlanner.scala:447)
at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:144)
at org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
at com.codetree.flink.flinksql.TableDemo_KafkaPipeLine.main(TableDemo_KafkaPipeLine.java:54)
解决:
这个坑我也不是很懂,从网友那里找到了解决办法,把kafka的版本改成universal就解决了。
|