Flink 1.14.2 on Yarn遇到的坑
1.Flink 1.14.2的安装
1.1下载安装Flink 1.14.2
环境准备:
-
JDK 1.8 -
scala 2.12 -
Ubuntu 18 -
Hadoop 3.3.0 -
三台主机(hadoop1、hadoop2、hadoop3)
Flink 集群分布 | | |
---|
hadoop1 | hadopo2 | hadoop3 | JobManager、TaskManager | TaskManager | TaskManager |
Flink官网
https://dlcdn.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz
https://downloads.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz
1.2解压
解压flink-1.14.2-bin-scala_2.12.tgz到/opt/module/下
admin@hadoop1:/opt/module$ tar -zxvf flink-1.14.2-bin-scala_2.12.tgz -C /opt/module/
1.3 修改集群配置
-
进入 conf 目录下,修改 flink-conf.yaml 文件,修改 jobmanager.rpc.address 参数为 hadoop1,如下所示: admin@hadoop1:/opt/module/flink-1.14.2/conf$ vim flink-conf.yaml
jobmanager.rpc.address: hadoop1
-
修改workers文件,将hadoop1、hadoop2、hadoop3部署为TaskManager节点 admin@hadoop1:/opt/module/flink-1.14.2/conf$ vim workers
hadoop1
hadoop2
hadoop3
1.4 启动集群
admin@hadoop1:/opt/module/flink-1.14.2$ bin bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop2.
Starting taskexecutor daemon on host hadoop3.
1.5 查看各个节点进程情况
-
hadoop1 admin@hadoop1:/opt/module/flink-1.14.2$ jps
8241 Jps
8100 TaskManagerRunner
7710 StandaloneSessionClusterEntrypoint
-
hadoop2 admin@hadoop2:~$ jps
3282 Jps
3015 TaskManagerRunner
-
hadoop3 admin@hadoop3:~$ jps
2498 TaskManagerRunner
2798 Jps
1.6 关闭集群
admin@hadoop1:/opt/module/flink-1.14.2$ bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 3054) on host hadoop1.
Stopping taskexecutor daemon (pid: 2258) on host hadoop2.
Stopping taskexecutor daemon (pid: 1740) on host hadoop3.
Stopping standalonesession daemon (pid: 2662) on host hadoop1.
2 Flink on Yarn
2.1.为什么使用Flink on Yarn
在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:
-
Yarn的资源可以按需使用,提高集群的资源利用率 -
Yarn的任务有优先级,根据优先级运行作业 -
基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)
-
JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控 -
如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器 -
如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManage
2.2.环境配置
配置环境变量,/etc/profile
export HADOOP_HOME=/opt/module/hadoop-3.3.0
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
2.3.启动Hadoop集群
要使用Yarn对Flink进行调度,首先要启动Hadoop集群和Yarn(三个节点之前已经安装了Hadoop3.3.0).
启动Hadoop集群.
admin@hadoop1:~$ start-dfs.sh
Starting namenodes on [hadoop1]
Starting datanodes
Starting secondary namenodes [hadoop3]
在hadoop2节点上启动Yarn.
admin@hadoop2:~$ start-yarn.sh
Starting resourcemanager
Starting nodemanagers
2.4.启动Yarn会话模式
YARN 的会话模式与独立集群略有不同,需要首先申请一个 YARN 会话(YARN session) 来启动 Flink 集群。具体步骤如下:
- 启动集群
-
启动 hadoop 集群(HDFS, YARN)。 -
执行脚本命令向 YARN 集群申请资源,开启一个 YARN 会话,启动 Flink 集群。 admin@hadoop1:/opt/module/flink-1.14.2$ bin/yarn-session.sh -nm test
可用参数解读:
? -d:分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,
即使关掉当前对话窗口,YARN session 也可以后台运行。
? -jm(–jobManagerMemory):配置 JobManager 所需内存,默认单位 MB。
? -nm(–name):配置在 YARN UI 界面上显示的任务名。
? -qu(–queue):指定 YARN 队列名。
? -tm(–taskManager):配置每个 TaskManager 所使用内存。
?
2.5启动yarn-session发生错误
1.Java.lang.NotClassDefFoundError: org/apache/hadoop/fs/Path
解决方法:
将Flink-Shaded-Hadoop-3-Uber放入/opt/module/flink-1.14.1/lib/目录下
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0
2.Java.lang.NoClassDefFoundError: javax/ws/rs/ext/MessageBodyReader
解决办法:
将javax.ws.rs-api-2.0.jar包放到/opt/module/flink-1.14.2/lib/目录下
https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.0/javax.ws.rs-api-2.0.jar
3.java.lang.NoSuchMethodError:org.apache.commons.cli.Option.builder(Ljava/lang/String;)Lorg/apache/commons/cli/Option$Builder;
解决办法:
将commons-cli包放到/opt/module/flink-1.14.2/lib/目录下(注意是commons.apache.org下的包)
https://commons.apache.org/proper/commons-cli/
2.6启动yarn-session
1.启动Yarn-session
admin@hadoop1:/opt/module/flink-1.14.2$ bin/yarn-session.sh -nm test
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-1.14.2/lib/log4j-slf4j-impl-2.16.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.3.0/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2022-04-16 23:48:07,020 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, hadoop1
2022-04-16 23:48:07,024 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123
2022-04-16 23:48:07,024 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2022-04-16 23:48:07,024 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2022-04-16 23:48:07,025 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2022-04-16 23:48:07,025 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 1
2022-04-16 23:48:07,025 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2022-04-16 23:48:07,227 WARN org.apache.hadoop.util.NativeCodeLoader [] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-04-16 23:48:07,298 INFO org.apache.flink.runtime.security.modules.HadoopModule [] - Hadoop user set to vinlee (auth:SIMPLE)
2022-04-16 23:48:07,316 INFO org.apache.flink.runtime.security.modules.JaasModule [] - Jaas file will be created as /tmp/jaas-6385816757132499081.conf.
2022-04-16 23:48:07,342 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/opt/module/flink-1.14.2/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2022-04-16 23:48:07,577 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at hadoop2/192.168.219.130:8032
2022-04-16 23:48:07,833 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at hadoop2/192.168.219.130:10200
2022-04-16 23:48:07,863 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2022-04-16 23:48:07,882 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (172.800mb (181193935 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2022-04-16 23:48:08,250 INFO org.apache.hadoop.conf.Configuration [] - resource-types.xml not found
2022-04-16 23:48:08,251 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils [] - Unable to find 'resource-types.xml'.
2022-04-16 23:48:08,428 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2022-04-16 23:48:08,428 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2022-04-16 23:48:08,428 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2022-04-16 23:48:17,204 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2022-04-16 23:48:17,217 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1650169241221_0001
2022-04-16 23:48:17,885 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1650169241221_0001
2022-04-16 23:48:17,885 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated
2022-04-16 23:48:17,893 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED
2022-04-16 23:48:28,839 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2022-04-16 23:48:28,840 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface hadoop3:41687 of application 'application_1650169241221_0001'.
2.通过命令行向Yarn-session提交任务
admin@hadoop1:/opt/module/flink-1.14.2$ bin/flink run -c cn.vinlee.SocketStreamWordCount workstation-flink-1.0-SNAPSHOT.jar
2.7单作业模式部署
在 YARN 环境中,由于有了外部平台做资源调度,所以我们也可以直接向 YARN 提交一个单独的作业,从而启动一个 Flink 集群。
admin@hadoop1:/opt/module/flink-1.14.2$ bin/flink run -d -t yarn-per-job -c cn.vinlee.SocketStreamWordCount workstation-flink-1.0-SNAPSHOT.jar
2.8应用模式部署
应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。
admin@hadoop1:/opt/module/flink-1.14.2$ flink run-application -t yarn-application -c cn.vinlee.SocketStreamWordCount workstation-flink-1.0-SNAPSHOT.jar
参考
-
https://blog.csdn.net/tototuzuoquan/article/details/116448743 -
《尚硅谷尚硅谷2022版Flink1.13实战教程》
|