按照官方文档Apache Kafka 连接器建立一个类
public class FlinkDataStream {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stringDataStream = KafkaProp(env);
stringDataStream.print();
env.execute("Flink-Kafka Demo");
}
private static DataStream<String> KafkaProp(StreamExecutionEnvironment env){
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.51.113:9092");
properties.setProperty("group.id", "id");
return env.addSource(new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties));
}
}
运行时报错
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/eventtime/WatermarkStrategy
at test.FlinkDataStream.KafkaProp(FlinkDataStream.java:26)
at test.FlinkDataStream.main(FlinkDataStream.java:16)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.eventtime.WatermarkStrategy
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 2 more
找不到WatermarkStrategy,通过寻找发现是缺少flink-core依赖,添加依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.12.1</version>
</dependency>
继续报错:
Exception in thread "main" java.lang.NoSuchFieldError: MANAGED_MEMORY_PRE_ALLOCATE
at org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration.fromConfiguration(TaskManagerServicesConfiguration.java:266)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:356)
at org.apache.flink.runtime.minicluster.MiniCluster.startTaskExecutor(MiniCluster.java:496)
at org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:487)
at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:313)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:114)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at test.FlinkDataStream.main(FlinkDataStream.java:18)
搜索发现出现NoSuchFieldError错误一般和依赖有关,但检查后发现并没有重复或缺少的依赖,检查了MANAGED_MEMORY_PRE_ALLOCATE的出现位置,为TaskManagerOptions的函数,而我对其进行引用时发现IDE没有该条提示:
然后检查maven依赖,发现flink版本为1.9.3,怀疑为版本问题,修改为1.13.0:
<flink.version>1.13.0</flink.version>
该错误解决
出现另一个错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/configuration/StateBackendOptions
at org.apache.flink.runtime.state.StateBackendLoader.loadUnwrappedStateBackendFromConfig(StateBackendLoader.java:113)
at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:220)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:911)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:854)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:247)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:216)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:204)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.<init>(LocalStreamEnvironment.java:52)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:2163)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$13(StreamExecutionEnvironment.java:2105)
at java.util.Optional.orElseGet(Optional.java:267)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2105)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2087)
at test.FlinkDataStream.main(FlinkDataStream.java:13)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.configuration.StateBackendOptions
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 14 more
google查询不到该错误,依旧修改flink版本:
<flink.version>1.12.0</flink.version>
该问题解决。
出现kafka连接不上:
16:50:24,261 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-id-9, groupId=id] Error connecting to node DESKTOP-678FKLL.localdomain:9092 (id: 1 rack: null)
java.net.UnknownHostException: DESKTOP-678FKLL.localdomain
其中DESKTOP-678FKLL为本机电脑,而我设置的properties中kafka设置的地址为WSL UBUNTU中的安装的kafka。
首先修改WSL中kafka的配置文件config/server.properties
listeners地址为WSL地址
重启kafka后即可连接(kafka的启动可参考我的另一篇WSL中kafka安装和启动 )
|