IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 在Centos7上基于Zookeeper实现Flink1.13.2的高可用HA搭建(Standalone和Yarn版本) -> 正文阅读

[大数据]在Centos7上基于Zookeeper实现Flink1.13.2的高可用HA搭建(Standalone和Yarn版本)

Standalone和Yarn模式任选一种即可

1. 安装规划

  • 每台服务器相互设置ssh无密码登录,注意authorized_keys权限为600
服务名安装服务器服务作用安装教程备注
java8bigdata001/2/3
zookeeperbigdata001/2/3基于Centos7分布式安装Zookeeper3.6.3
hadoopbigdata001/2/3Centos7上Hadoop 3.3.1的高可用HA安装过程

2. 下载解压(在bigdata001操作)

[root@bigdata001 opt]#
[root@bigdata001 opt]# wget https://mirror.novg.net/apache/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz
[root@bigdata001 opt]#
[root@bigdata001 opt]# tar -zxvf flink-1.13.2-bin-scala_2.11.tgz
[root@bigdata001 opt]#
[root@bigdata001 opt]# cd flink-1.13.2
[root@bigdata001 flink-1.13.2]#

3. Standalone模式

3.1 修改conf/flink-conf.yaml(在bigdata001操作)

新增目录

[root@bigdata001 flink-1.13.2]# 
[root@bigdata001 flink-1.13.2]# pwd
/opt/flink-1.13.2
[root@bigdata001 flink-1.13.2]# 
[root@bigdata001 flink-1.13.2]# mkdir web_upload_dir
[root@bigdata001 flink-1.13.2]# 
[root@bigdata001 flink-1.13.2]# mkdir io_tmp_dir
[root@bigdata001 flink-1.13.2]# 

修改部分

rest.address: bigdata001

jobmanager.memory.process.size: 2g
taskmanager.memory.process.size: 6g

taskmanager.numberOfTaskSlots: 2

state.backend: rocksdb
state.checkpoints.dir: hdfs://nnha/flink/checkpoints/rocksdb
state.savepoints.dir: hdfs://nnha/flink/savepoints/rocksdb

io.tmp.dirs: /opt/flink-1.13.2/io_tmp_dir

high-availability: zookeeper
high-availability.zookeeper.quorum: bigdata001:2181,bigdata002:2181,bigdata003:2181
high-availability.storageDir: hdfs://nnha/flink/ha/recovery/

添加部分

env.java.home: /opt/jdk1.8.0_201

execution.checkpointing.interval: 300000

web.upload.dir: /opt/flink-1.13.2/web_upload_dir

high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /ha_cluster 

3.2 修改conf/masters和conf/workers(在bigdata001操作)

bigdata001:8081
bigdata002:8081
bigdata003:8081
bigdata001
bigdata002
bigdata003

3.3 依赖jar包的添加和环境变量的添加(在bigdata001操作)

[root@bigdata001 flink-1.13.2]#
[root@bigdata001 flink-1.13.2]# pwd
/opt/flink-1.13.2
[root@bigdata001 flink-1.13.2]# mv lib/flink-shaded-zookeeper-3.4.14.jar opt/
[root@bigdata001 flink-1.13.2]# mv opt/flink-shaded-zookeeper-3.5.6.jar lib/
[root@bigdata001 flink-1.13.2]#

vi /root/.bashrc(在bigdata002/3也要操作)

export HADOOP_CLASSPATH=/opt/hadoop-3.3.1/share/hadoop/client/*:/opt/hadoop-3.3.1/share/hadoop/common/*:/opt/hadoop-3.3.1/share/hadoop/common/lib/*:/opt/hadoop-3.3.1/share/hadoop/hdfs/*:/opt/hadoop-3.3.1/share/hadoop/hdfs/lib/*:/opt/hadoop-3.3.1/share/hadoop/mapreduce/*:/opt/hadoop-3.3.1/share/hadoop/tools/lib/*:/opt/hadoop-3.3.1/share/hadoop/yarn/*:/opt/hadoop-3.3.1/share/hadoop/yarn/lib/*:/opt/hadoop-3.3.1/share/hadoop/yarn/csi/*:/opt/hadoop-3.3.1/share/hadoop/yarn/csi/lib/*:/opt/hadoop-3.3.1/share/hadoop/yarn/timelineservice/*:/opt/hadoop-3.3.1/share/hadoop/yarn/timelineservice/lib/*

export HADOOP_CONF_DIR=/opt/hadoop-3.3.1/etc/hadoop

vi /root/.bashrc

export FLINK_HOME=/opt/flink-1.13.2

export PATH=$PATH:$FLINK_HOME/bin

3.4 启动和验证

  1. flink-1.13.2目录的分发(在bigdata001操作)
[root@bigdata001 opt]# scp -r flink-1.13.2 root@bigdata002:/opt
[root@bigdata001 opt]# scp -r flink-1.13.2 root@bigdata003:/opt
  1. 启动(在bigdata001操作)
[root@bigdata001 opt]# 
[root@bigdata001 opt]# start-cluster.sh 
Starting HA cluster with 3 masters.
Starting standalonesession daemon on host bigdata001.
Starting standalonesession daemon on host bigdata002.
Starting standalonesession daemon on host bigdata003.
Starting taskexecutor daemon on host bigdata001.
Starting taskexecutor daemon on host bigdata002.
Starting taskexecutor daemon on host bigdata003.
[root@bigdata001 opt]#
  1. jps查看bigdata001/2/3,都会有下面2各进程
3125 StandaloneSessionClusterEntrypoint
3464 TaskManagerRunner
  1. 访问http://bigdata001:8081、http://bigdata002:8081、http://bigdata003:8081,如下图所示:
    flink web界面5. 执行测试程序(在bigdata001操作)
[root@bigdata001 opt]# 
[root@bigdata001 opt]# /opt/flink-1.13.2/bin/flink run /opt/flink-1.13.2/examples/streaming/WordCount.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.13.2/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 5e1d3795b04976513ea026b3a8b066ce
Program execution finished
Job with JobID 5e1d3795b04976513ea026b3a8b066ce has finished.
Job Runtime: 1125 ms

[root@bigdata001 opt]#
  1. stop集群(在bigdata001操作)
[root@bigdata001 opt]# stop-cluster.sh 
Stopping taskexecutor daemon (pid: 32321) on host bigdata001.
Stopping taskexecutor daemon (pid: 16596) on host bigdata002.
Stopping taskexecutor daemon (pid: 2990) on host bigdata003.
Stopping standalonesession daemon (pid: 31982) on host bigdata001.
Stopping standalonesession daemon (pid: 16274) on host bigdata002.
Stopping standalonesession daemon (pid: 2675) on host bigdata003.
[root@bigdata001 opt]# 

4. Yarn模式

4.1 修改conf/flink-conf.yaml(在bigdata001操作)

新增目录

[root@bigdata001 flink-1.13.2]# 
[root@bigdata001 flink-1.13.2]# pwd
/opt/flink-1.13.2
[root@bigdata001 flink-1.13.2]# 
[root@bigdata001 flink-1.13.2]# mkdir web_upload_dir
[root@bigdata001 flink-1.13.2]# 
[root@bigdata001 flink-1.13.2]# mkdir io_tmp_dir
[root@bigdata001 flink-1.13.2]# 

修改部分

jobmanager.memory.process.size: 2g
taskmanager.memory.process.size: 6g

taskmanager.numberOfTaskSlots: 2

state.backend: rocksdb
state.checkpoints.dir: hdfs://nnha/flink/checkpoints/rocksdb
state.savepoints.dir: hdfs://nnha/flink/savepoints/rocksdb

io.tmp.dirs: /opt/flink-1.13.2/io_tmp_dir

high-availability: zookeeper
high-availability.zookeeper.quorum: bigdata001:2181,bigdata002:2181,bigdata003:2181
high-availability.storageDir: hdfs://nnha/flink/ha/recovery/

添加部分

env.java.home: /opt/jdk1.8.0_201

execution.checkpointing.interval: 300000

web.upload.dir: /opt/flink-1.13.2/web_upload_dir

high-availability.zookeeper.path.root: /flink

4.2 修改conf/masters和conf/workers(在bigdata001操作)

bigdata001:8081
bigdata002:8081
bigdata003:8081
bigdata001
bigdata002
bigdata003

4.3 依赖jar包的添加和环境变量的添加(在bigdata001操作)

[root@bigdata001 flink-1.13.2]#
[root@bigdata001 flink-1.13.2]# pwd
/opt/flink-1.13.2
[root@bigdata001 flink-1.13.2]# mv lib/flink-shaded-zookeeper-3.4.14.jar opt/
[root@bigdata001 flink-1.13.2]# mv opt/flink-shaded-zookeeper-3.5.6.jar lib/
[root@bigdata001 flink-1.13.2]#

vi /root/.bashrc(在bigdata002/3也要操作)

export HADOOP_CLASSPATH=/opt/hadoop-3.3.1/share/hadoop/client/*:/opt/hadoop-3.3.1/share/hadoop/common/*:/opt/hadoop-3.3.1/share/hadoop/common/lib/*:/opt/hadoop-3.3.1/share/hadoop/hdfs/*:/opt/hadoop-3.3.1/share/hadoop/hdfs/lib/*:/opt/hadoop-3.3.1/share/hadoop/mapreduce/*:/opt/hadoop-3.3.1/share/hadoop/tools/lib/*:/opt/hadoop-3.3.1/share/hadoop/yarn/*:/opt/hadoop-3.3.1/share/hadoop/yarn/lib/*:/opt/hadoop-3.3.1/share/hadoop/yarn/csi/*:/opt/hadoop-3.3.1/share/hadoop/yarn/csi/lib/*:/opt/hadoop-3.3.1/share/hadoop/yarn/timelineservice/*:/opt/hadoop-3.3.1/share/hadoop/yarn/timelineservice/lib/*

export HADOOP_CONF_DIR=/opt/hadoop-3.3.1/etc/hadoop

vi /root/.bashrc

export FLINK_HOME=/opt/flink-1.13.2

export PATH=$PATH:$FLINK_HOME/bin

4.4 Application Mode启动和验证

  • 每一个application启动一个集群,application的main()方法在JobManager执行,减轻了client的压力,application失败或运行完就会关闭集群
  • 因每一次都需要启动集群,适用于时效要求不高、长时间运行、资源隔离要求高的application
  1. 准备测试jar包
[root@bigdata001 opt]# 
[root@bigdata001 opt]# hadoop fs -mkdir /flink
[root@bigdata001 opt]# hadoop fs -mkdir /flink/run-jars
[root@bigdata001 opt]# hadoop fs -put flink-1.13.2/examples/streaming/WordCount.jar /flink
/run-jars
[root@bigdata001 opt]#
  1. 启动application
[root@bigdata001 opt]#
[root@bigdata001 opt]# flink run-application -t yarn-application hdfs://nnha/flink/run-jars/WordCount.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.13.2/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2021-08-13 11:36:19,441 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.13.2/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-08-13 11:36:19,659 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-08-13 11:36:19,766 INFO  org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] - Failing over to rm2
2021-08-13 11:36:19,863 INFO  org.apache.hadoop.conf.Configuration                         [] - resource-types.xml not found
2021-08-13 11:36:19,863 INFO  org.apache.hadoop.yarn.util.resource.ResourceUtils           [] - Unable to find 'resource-types.xml'.
2021-08-13 11:36:19,974 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=6144, slotsPerTaskManager=2}
2021-08-13 11:36:22,837 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1628825767036_0001
2021-08-13 11:36:23,245 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1628825767036_0001
2021-08-13 11:36:23,246 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2021-08-13 11:36:23,252 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2021-08-13 11:36:29,935 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-08-13 11:36:29,936 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface bigdata001:41218 of application 'application_1628825767036_0001'.
[root@bigdata001 opt]# 
  • 可以看到Flink的Web访问链接是:bigdata001:41218,这个Ip和Port每次都是不固定的
  • 当需要添加依赖的jar包,添加方式如:-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir"
  • 可以通过命令列出application正在运行的job: flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
  • 可以通过命令停掉application正在运行的job: flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
  1. 查看yarn的Web界面
    Flink提交到Yarn的application

4.5 Session Mode启动和验证

  1. 启动
[root@bigdata001 opt]# 
[root@bigdata001 log]# yarn-session.sh --detached > /opt/flink-1.13.2/yarn-session_start.log
[root@bigdata001 log]#
[root@bigdata001 log]# cat /opt/flink-1.13.2/yarn-session_start.log
......省略部分......
2021-08-13 12:49:12,218 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-08-13 12:49:12,219 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface bigdata002:39955 of application 'application_1628825767036_0003'.
2021-08-13 12:49:12,319 INFO  org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility [] - Using emulated InjectSessionExpiration
......省略部分......
  • 可以看到随机的Flink Web链接是: bigdata002:39955, 此Ip和Port每次都是不固定的
  • 用jps查看,只有bigdata002上有一个进程:31346 YarnSessionClusterEntrypoint
  • 可以通过命令列出session正在运行的job: flink list -t yarn-session -Dyarn.application.id=application_XXXX_YY
  • 可以通过命令停掉session正在运行的job: flink cancel -t yarn-session -Dyarn.application.id=application_XXXX_YY <jobId>

查看Flink的Web界面,如下所示:
Flink Web界面查看Yarn的Web界面,如下所示:
Yarn的Web界面
2. 运行测试jar包

[root@bigdata001 opt]#
[root@bigdata001 opt]# /opt/flink-1.13.2/bin/flink run /opt/flink-1.13.2/examples/streaming/WordCount.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.13.2/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2021-08-13 12:56:46,180 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2021-08-13 12:56:46,180 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2021-08-13 12:56:46,642 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.13.2/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-08-13 12:56:46,811 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-08-13 12:56:46,877 INFO  org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] - Failing over to rm2
2021-08-13 12:56:46,917 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface bigdata002:39955 of application 'application_1628825767036_0003'.
Job has been submitted with JobID a6c736ff784fb4e73702f7ef6597d4cf
Program execution finished
Job with JobID a6c736ff784fb4e73702f7ef6597d4cf has finished.
Job Runtime: 11389 ms

[root@bigdata001 opt]# 

再次查看Flink的Web界面
Flink Web界面
3. 停止session

[root@bigdata001 opt]#
[root@bigdata001 opt]# echo "stop" | yarn-session.sh -id application_1628825767036_0003
[root@bigdata001 opt]#

或者

[root@bigdata001 opt]#
[root@bigdata001 opt]# yarn application -kill application_1628825767036_0004
[root@bigdata001 opt]#
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-14 14:08:30  更:2021-08-14 14:10:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:50:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码