IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink 1.13搞心态的问题 Catalog ClassNotFoundException &No ExecutorFactory found to execute the application -> 正文阅读

[大数据]Flink 1.13搞心态的问题 Catalog ClassNotFoundException &No ExecutorFactory found to execute the application

Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.catalog.Catalog

No ExecutorFactory found to execute the application

一个简单的热门商品统计代码,用table API & SQL实现,flink版本从1.10.1变成1.13.0后,就出现Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.catalog.Catalog的问题

pom.xml如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>UserBehaviorAnalysis</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <kafka.version>2.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.binary.version}</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>
                            jar-with-dependencies
                        </descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

scala代码入下:

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions}
import org.apache.flink.table.api._
import org.apache.flink.types.Row

object HotItemTableApi {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val path = this.getClass.getClassLoader.getResource("UserBehavior.csv").getPath
    val dataStream: DataStream[UserBehavior] = env
      .readTextFile(path)
      .map(line => {
        val userBehaviorArray: Array[String] = line.split(",")
        UserBehavior(userBehaviorArray(0).toLong, userBehaviorArray(1).toLong, userBehaviorArray(2).toLong, userBehaviorArray(3), userBehaviorArray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp*1000)

    val settings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)



    val dataTable: Table = tableEnv.fromDataStream(dataStream, $"itemId", $"behavior", $"timestamp".rowtime() as "ts")


    val aggTable: Table = dataTable
      .filter($"behavior" === "pv")
      .window(Slide over 1.hours every 5.minutes on $"ts" as "sw")
      .groupBy($"itemId", $"sw")
      .select($"itemId", $"sw".end as "windowEnd", $"itemId".count as "cnt")

    tableEnv.createTemporaryView("aggTable", aggTable)

    tableEnv.sqlQuery(
      """
        |select
        | *
        |from
        |(
        |select
        |   *,
        |   row_number()over(partition by windowEnd order by cnt desc) as rn
        |from aggTable
        |) t where rn <= 3
        |""".stripMargin).toRetractStream[Row].print()

    env.execute("sql")
  }
}

代码没有错误,但执行时,

val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)

就一直报

Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.catalog.Catalog

查遍了资料都完全没有类似错误,心态炸裂!!!于是先放弃table API的方式,将之前写的DataStream API的代码执行一遍。
代码如下:

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.sql.Timestamp
import scala.collection.mutable.ListBuffer

case class UserBehavior(userId: Long, itemId: Long, category: Long,behavior: String, timestamp: Long)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

object HotItems {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val path = this.getClass.getClassLoader.getResource("UserBehavior.csv").getPath
    env
      .readTextFile(path)
      .map(line => {
        val userBehaviorArray: Array[String] = line.split(",")
        UserBehavior(userBehaviorArray(0).toLong, userBehaviorArray(1).toLong, userBehaviorArray(2).toLong,userBehaviorArray(3), userBehaviorArray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000)
      .filter(_.behavior == "pv")
      .keyBy("itemId")
      .timeWindow(Time.hours(1), Time.minutes(5))
      .aggregate(new CountAgg(), new ResultWindowFunc())
      .keyBy("windowEnd")
      .process(new HotTopItems(3))
      .print()


    env.execute("test4")
  }

  class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {
    override def createAccumulator(): Long = 0L

    override def add(in: UserBehavior, acc: Long): Long = acc + 1

    override def getResult(acc: Long): Long = acc

    override def merge(acc: Long, acc1: Long): Long = acc + acc1
  }

  class ResultWindowFunc() extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
    override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
      val itemId = key.asInstanceOf[Tuple1[Long]].f0
      out.collect(ItemViewCount(itemId = itemId, windowEnd = window.getEnd, count = input.iterator.next))
    }
  }

  class HotTopItems(size: Int) extends KeyedProcessFunction[Tuple,ItemViewCount,String] {

    private var itemState: ListState[ItemViewCount] = _

    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount]))
    }
    override def processElement(i: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
      itemState.add(i)
      context.timerService().registerEventTimeTimer(i.windowEnd + 1)
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
      super.onTimer(timestamp, ctx, out)
      val listBuffer = new ListBuffer[ItemViewCount]()
      import scala.collection.JavaConversions._
      for(item <- itemState.get){
        listBuffer += item
      }

      itemState.clear()

      val topList: ListBuffer[ItemViewCount] = listBuffer.sortBy(_.count)(Ordering.Long.reverse).take(size)

      val sb = new StringBuilder()
      sb.append("=====================\n")
      sb.append("time: ").append(new Timestamp(timestamp - 1)).append("\n")
      for(i <- topList.indices){
        val currentItem = topList(i)
        sb.append("No").append(i)
          .append(": itemID = ").append(currentItem.itemId)
          .append("  count = ").append(currentItem.count).append("\n")
      }
      sb.append("=====================\n\n")
      Thread.sleep(1000)
      out.collect(sb.toString)
    }
  }

}

发现之前成功运行的代码也出现错误:
No ExecutorFactory found to execute the application

于是再进行面向Google编程,终于查到问题。

https://programmerah.com/flink-1-1-error-no-executorfactory-found-to-execute-the-application-24628/

升级为v1.13.0后pom.xml需要加上 Flink clients 依赖,记得把<scope>标签去掉

https://stackoverflow.com/questions/63600971/no-executorfactory-found-to-execute-the-application-in-flink-1-11-1

按以上方式操作后,DataStream API的代码执行成功,然后再抱着试一试的心态执行了Table API的代码,居然也执行成功了,所以报那个错误是什么意思,百思不得其解。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-07 12:09:17  更:2021-08-07 12:09:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 17:17:23-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码