Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.catalog.Catalog
No ExecutorFactory found to execute the application
一个简单的热门商品统计代码,用table API & SQL实现,flink版本从1.10.1变成1.13.0后,就出现Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.catalog.Catalog的问题
pom.xml如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>UserBehaviorAnalysis</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<kafka.version>2.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
scala代码入下:
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions}
import org.apache.flink.table.api._
import org.apache.flink.types.Row
object HotItemTableApi {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val path = this.getClass.getClassLoader.getResource("UserBehavior.csv").getPath
val dataStream: DataStream[UserBehavior] = env
.readTextFile(path)
.map(line => {
val userBehaviorArray: Array[String] = line.split(",")
UserBehavior(userBehaviorArray(0).toLong, userBehaviorArray(1).toLong, userBehaviorArray(2).toLong, userBehaviorArray(3), userBehaviorArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp*1000)
val settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
val dataTable: Table = tableEnv.fromDataStream(dataStream, $"itemId", $"behavior", $"timestamp".rowtime() as "ts")
val aggTable: Table = dataTable
.filter($"behavior" === "pv")
.window(Slide over 1.hours every 5.minutes on $"ts" as "sw")
.groupBy($"itemId", $"sw")
.select($"itemId", $"sw".end as "windowEnd", $"itemId".count as "cnt")
tableEnv.createTemporaryView("aggTable", aggTable)
tableEnv.sqlQuery(
"""
|select
| *
|from
|(
|select
| *,
| row_number()over(partition by windowEnd order by cnt desc) as rn
|from aggTable
|) t where rn <= 3
|""".stripMargin).toRetractStream[Row].print()
env.execute("sql")
}
}
代码没有错误,但执行时,
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
就一直报
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.catalog.Catalog
查遍了资料都完全没有类似错误,心态炸裂!!!于是先放弃table API的方式,将之前写的DataStream API的代码执行一遍。 代码如下:
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.sql.Timestamp
import scala.collection.mutable.ListBuffer
case class UserBehavior(userId: Long, itemId: Long, category: Long,behavior: String, timestamp: Long)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
object HotItems {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val path = this.getClass.getClassLoader.getResource("UserBehavior.csv").getPath
env
.readTextFile(path)
.map(line => {
val userBehaviorArray: Array[String] = line.split(",")
UserBehavior(userBehaviorArray(0).toLong, userBehaviorArray(1).toLong, userBehaviorArray(2).toLong,userBehaviorArray(3), userBehaviorArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000)
.filter(_.behavior == "pv")
.keyBy("itemId")
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new CountAgg(), new ResultWindowFunc())
.keyBy("windowEnd")
.process(new HotTopItems(3))
.print()
env.execute("test4")
}
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(in: UserBehavior, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc: Long, acc1: Long): Long = acc + acc1
}
class ResultWindowFunc() extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
val itemId = key.asInstanceOf[Tuple1[Long]].f0
out.collect(ItemViewCount(itemId = itemId, windowEnd = window.getEnd, count = input.iterator.next))
}
}
class HotTopItems(size: Int) extends KeyedProcessFunction[Tuple,ItemViewCount,String] {
private var itemState: ListState[ItemViewCount] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount]))
}
override def processElement(i: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
itemState.add(i)
context.timerService().registerEventTimeTimer(i.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
super.onTimer(timestamp, ctx, out)
val listBuffer = new ListBuffer[ItemViewCount]()
import scala.collection.JavaConversions._
for(item <- itemState.get){
listBuffer += item
}
itemState.clear()
val topList: ListBuffer[ItemViewCount] = listBuffer.sortBy(_.count)(Ordering.Long.reverse).take(size)
val sb = new StringBuilder()
sb.append("=====================\n")
sb.append("time: ").append(new Timestamp(timestamp - 1)).append("\n")
for(i <- topList.indices){
val currentItem = topList(i)
sb.append("No").append(i)
.append(": itemID = ").append(currentItem.itemId)
.append(" count = ").append(currentItem.count).append("\n")
}
sb.append("=====================\n\n")
Thread.sleep(1000)
out.collect(sb.toString)
}
}
}
发现之前成功运行的代码也出现错误: No ExecutorFactory found to execute the application
于是再进行面向Google编程,终于查到问题。
https://programmerah.com/flink-1-1-error-no-executorfactory-found-to-execute-the-application-24628/
升级为v1.13.0后pom.xml需要加上 Flink clients 依赖,记得把<scope>标签去掉
https://stackoverflow.com/questions/63600971/no-executorfactory-found-to-execute-the-application-in-flink-1-11-1
按以上方式操作后,DataStream API的代码执行成功,然后再抱着试一试的心态执行了Table API的代码,居然也执行成功了,所以报那个错误是什么意思,百思不得其解。
|