一、Flink运行模式
1、定义
Flink是一个优秀的分布式流式处理框架,它通过将批数据视为有界流、流数据视为无界流的方式统一了批处理和流处理的编程模型。
2、模式
A、有local、standalone、k8s、mesos、yarn等五种模式,最常使用yarn模式。
B、使用yarn模式的理由:
理由一:yarn提供了更好的任务管理和资源调度的解决方案。
理由二:yarn能够自动完成flink组件的容错。
1、jobmanager异常退出,ResourceManager会重新调度jobmanager到其他节点。
2、taskmanager异常退出,jobmanager会重新像resourcemanager申请资源启动taskmanager。
3、yarn模式详解
A、分类:
session模式、Per-job模式。
B、Session模式:
有一个flink集群常驻yarn集群,所有作业共享RM和dispatcher;
不需要每次都申请资源,前一个任务没完成,后一个任务等待;
作业完成,资源不释放。适用于频繁的小作业。
启动步骤:
1、启动hadoop集群--->启动flink集群start-cluster.sh;
2、启动yarn-session集群:yarn-session -n 2 -s 2 -jm 1024 -tm 1024 -d;
3、提交任务:flink run jar包路径
补充:
停止yarn-session方式:yarn application -kill ${yarn-session的id}
C、Per-job模式:
每个任务都会在yarn集群中启动一个flink集群,每个任务都独享RM和dispatcher;
每个任务都要申请资源,任务之间不干扰;
作业完成,资源释放。适用于不频繁大作业。
启动步骤:
1、启动hadoop集群--->启动flink集群start-cluster.sh;
2、提交任务:
flink run -m yarn-cluster \ 运行模式为per-job模式
-ynm "first_flink_job" \ 任务名称
-yDexecution.runtime-mode=STREAMING \
-yD设定属性的值,处理模式还可以是BATCH和AUTOMATIC
-p 3 \设置并行度为3
-yjm 1024 \ jobmanager的内存
-ytm 1024 \ taskmanager的内存
-ys 2 \ 每个taskmanager的slot个数
-c com.farben.fink.FirstFlink \ 主类完全限定路径
-yj /opt/soft/flink/jar/MyFirstJar.jar \ jar包路径
-yqu "dev_line" \ 指定yarn的任务队列
-output 输出路径
注意:
使用yarn模式一定要关闭yarn的内存检查,因为yarn会自动杀死内存超标的任务,而flink的任务经常内存超标。
D、yarn模式执行过程:
后续补充。
二、Flink架构原理
1、架构
client:
提交作业,等待结果,也可以结束作业。
dispatcher(dp):
作业提交时dispatcher启动,启动后会创建一个rest接口,并将应用移交给一个jm;
在架构中非必需。
resourcemanager(rm):
负责管理tm上的taskslot,将有空闲taskslot的分配给jm;
taskslot不够rm会自动申请,释放空闲tm上的资源。
jobmanager(jm):
一个应用程序对应一个jm。向rm申请资源,将执行图executionGraph分发到tm。
任务运行过程中负责所有需要中央协调的工作,包括检查点的协调。
taskmanager(tm):
工作进程,有一个或多个taskslot;
启动后会向rm注册,运行过程会和其他的tm交换数据。
每个tm上taskslot的数量决定了该tm的最大并行度,最好和cpu核心数一致。
taskslot是flink上最小资源分配单位。
2、执行图
StreamGraph:最初的逻辑执行流程,也就算子之间的前后顺序。在client上生成。
JobGraph:将StreamGraph中OneToOne的操作合并得到JobGraph。在client上生成。
ExecutionGraph:依据并行度、资源情况等信息对JobGraph转换得到。在JobManager上生成。
物理执行图:对ExecutionGraph的具体落实。
3、核心概念
DataFlow:
flink程序执行中被映射而成的数据模型,有source、transformations、sink三部分组成。
Partition:
每个无法优化的算子和operatorchain都有一定的分区数,算子的分区数、并行度、subtask一一对应。
operator chain:
多个OneToOne算子可能会被优化成一个operator chain。
Parallelism:
一个算子(或operatorchain)具有的subtask个数称为该算子的并行度。
job:
由很多的task组成,job的并行度取决于所有槽共享组中并行度最大的算子的并行度之和;
job消耗的最少slot数量等于job的并行度。
Task:
无法被优化的算子和Operator chain都对应一个 task。
Subtask:
根据并行度拆分task得到subtask,一个分区对应一个subtask,subtask独立运行在一个线程中。
4、两个算子间数据传输模式
OneToOne:
类似spark 窄依赖,一对一。分区数和分区内数据有序性不变。
Redistributing:
类似spark宽依赖,分区数和分区内数据有序性改变。
5、slot sharing(槽共享)
一个job不同task的subtask可以共享同一个slot,这样可以让slot的负载更加均衡,更充分利用集群资源。
槽共享组:
A、SlotSharingGroup是用来实现slot共享的类,它会尽可能地保证同一个group中并行度相同的task的subtask共享同一个slot。
B、slotSharingGroup算子默认group为default(即默认一个job下的subtask都可以共享一个slot)。
C、slotSharingGroup算子可以传入共享组名字, someStream.filter(...).slotSharingGroup("group1");
指定了filter的slot共享组为group1。
D、CoLocationGroup可以保证所有的并行度相同的subtasks运行在同一个slot,主要用于迭代流
(训练机器学习模型)。
6、分区策略
后续补充。
7、碎碎念
核心配置文件:
flink-conf.ymal 配置从节点:slaves 启动flink:start-cluster.sh
提交任务:
方式1:将jar包上传到flink网页UI上,在页面设置任务属性后执行。
方式2:命令行方式 flink run .......
并行度:
生效顺序:
代码中算子并行度>代码中全局并行度>客户端CLI设置并行度>配置文件并行度。
job的并行度是指所有槽共享组中并行度最大的算子具备的并行度之和。
命令:
flink list [-a] 列出所有job信息,-a 取消的job也会列出
flink cancel ${job的id} 取消job
flink -help 查看命令帮助
UI:
UI: 任务8081 yarn8088
三、Datastream api
1、分层api
从高到低依次为:
SQL Api--->Table Api--->DataStream Api--->processFuction api。
api级别越高,表达越简明、使用越方便;api级别越低,表达越丰富、使用越灵活。
SQL api和Table api不同点仅在于处理数据时的差异。
processFuction api仅仅是一个process方法,在该方法中完成分流、获取时间戳、watermark和注册定时
任务等其他api难以完成的工作。
2、source
负责读取外部数据,flink的source有集合、文件、端口、kafka、自定义等。
从集合获取数据:
fromCollection/fromSequence/fromElements
从文件获取数据:
readTextFile(可以是本地和HDFS,可以是文件【夹】和压缩文件)
从端口获取数据:
socketTextStream
|