IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink相关API开发及运行架构和实现原理详解 -> 正文阅读

[大数据]Flink相关API开发及运行架构和实现原理详解

一、Flink相关API说明

Flink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用起来难度越大

注意:我自己使用的是flink 1.7.2版本,但是在Flink1.12时支持流批一体,DataSetAPI已经不推荐使用了,大家最好优先使用DataStream流式API,既支持无界数据处理/流处理,也支持有界数据处理/批处理!

二、Flink运行架构

1、flink的编程模型

  • Flink程序主要包含三部分:Source/Transformation/Sink(源/转换操作/输出)

2、创建工程,maven依赖准备

  • properties

????<properties>
????????<encoding>UTF-8</encoding>
????????<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
????????<maven.compiler.source>1.8</maven.compiler.source>
????????<maven.compiler.target>1.8</maven.compiler.target>
????????<java.version>1.8</java.version>
????????<!--?Flink的架构中使用了?Akka?来实现底层的分布式通信,而?Akka?是用?Scala?开发的。我们用到的?Scala?版本为?2.12。-->
????????<scala.version>2.12</scala.version>
????????<flink.version>1.12.0</flink.version>
????</properties>
  • dependencies

 
  • build

?<sourceDirectory>src/main/java</sourceDirectory>
????????<plugins>
????????????<!--?编译插件?-->
????????????<plugin>
????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????<artifactId>maven-compiler-plugin</artifactId>
????????????????<version>3.5.1</version>
????????????????<configuration>
????????????????????<source>1.8</source>
????????????????????<target>1.8</target>
????????????????????<!--<encoding>${project.build.sourceEncoding}</encoding>-->
????????????????</configuration>
????????????</plugin>
????????????<plugin>
????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????<artifactId>maven-surefire-plugin</artifactId>
????????????????<version>2.18.1</version>
????????????????<configuration>
????????????????????<useFile>false</useFile>
????????????????????<disableXmlReport>true</disableXmlReport>
????????????????????<includes>
????????????????????????<include>**/*Test.*</include>
????????????????????????<include>**/*Suite.*</include>
????????????????????</includes>
????????????????</configuration>
????????????</plugin>
????????????<!--?打包插件(会包含所有依赖)?-->
????????????<plugin>
????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????<artifactId>maven-shade-plugin</artifactId>
????????????????<version>2.3</version>
????????????????<executions>
????????????????????<execution>
????????????????????????<phase>package</phase>
????????????????????????<goals>
????????????????????????????<goal>shade</goal>
????????????????????????</goals>
????????????????????????<configuration>
????????????????????????????<filters>
????????????????????????????????<filter>
????????????????????????????????????<artifact>*:*</artifact>
????????????????????????????????????<excludes>
????????????????????????????????????????<!--
????????????????????????????????????????zip?-d?learn_spark.jar?META-INF/*.RSA?META-INF/*.DSA?META-INF/*.SF?-->
????????????????????????????????????????<exclude>META-INF/*.SF</exclude>
????????????????????????????????????????<exclude>META-INF/*.DSA</exclude>
????????????????????????????????????????<exclude>META-INF/*.RSA</exclude>
????????????????????????????????????</excludes>
????????????????????????????????</filter>
????????????????????????????</filters>
????????????????????????????<transformers>
????????????????????????????????<transformer?implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
????????????????????????????????????<!--?设置jar包的入口类(可选)?-->
?????????????????????????????????????<mainClass>com.tjcu.TestDataStreamYarn</mainClass>
????????????????????????????????</transformer>
????????????????????????????</transformers>
????????????????????????</configuration>
????????????????????</execution>
????????????????</executions>
????????????</plugin>
????????</plugins>

3、需求

  • 使用Flink实现WordCount

4、编码步骤

1.准备环境-env
2.准备数据-source
3.处理数据-transformation
4.输出结果-sink
5.触发执行-execute

  • 其中创建环境参考官网:

getExecutionEnvironment()?//推荐使用
createLocalEnvironment()
createRemoteEnvironment(String?host,?int?port,?String...?jarFiles)

5、代码实现

(1)DataSet(了解)

package?com.tjcu;

import?org.apache.flink.api.common.functions.FlatMapFunction;
import?org.apache.flink.api.common.functions.MapFunction;
import?org.apache.flink.api.java.DataSet;
import?org.apache.flink.api.java.ExecutionEnvironment;
import?org.apache.flink.api.java.operators.AggregateOperator;
import?org.apache.flink.api.java.operators.DataSource;
import?org.apache.flink.api.java.operators.FlatMapOperator;
import?org.apache.flink.api.java.operators.UnsortedGrouping;
import?org.apache.flink.api.java.tuple.Tuple2;
import?org.apache.flink.util.Collector;

/**
?*?@author?:王恒杰
?*?@date?:Created?in?2022/4/20?15:37
?*?@description:使用Flink完成WordCount-DataSet
?*?编码步骤
?*??1.准备环境-env
?*??2.准备数据-source
?*??3.处理数据-transformation
?*??4.输出结果-sink
?*??5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
?*/
public?class?TestDataSet?{
????public?static?void?main(String[]?args)?throws?Exception?{
?????//老版本的API如下,目前不推荐使用了
????????//1.准备环境?env
????????ExecutionEnvironment?executionEnvironment?=?ExecutionEnvironment.getExecutionEnvironment();
????????//2.准备数据-source
????????DataSource<String>?stringDataSource?=?executionEnvironment.fromElements("whj?hadoop?spark",?"whj?hadoop?spark",?"whj?hadoop?spark");
????????//3.处理数据-transformation
????????//3.1每一行数据按照空格切分成一个个的单词组成一个集合
????????FlatMapOperator<String,?String>?wordsDS?=?stringDataSource.flatMap(new?FlatMapFunction<String,?String>()?{
????????????@Override
????????????public?void?flatMap(String?value,?Collector<String>?out)?throws?Exception?{
????????????????//value就是一行行的数据
????????????????String[]?words?=?value.split("?");
????????????????for?(String?word?:?words)?{
????????????????????out.collect(word);//将切割处理的一个个的单词收集起来并返回
????????????????}
????????????}
????????});
????????DataSet<Tuple2<String,?Integer>>?wordAndOne?=?wordsDS.map(new?MapFunction<String,?Tuple2<String,?Integer>>()?{
????????????@Override
????????????public?Tuple2<String,?Integer>?map(String?value)?throws?Exception?{
????????????????//value就是每一个单词
????????????????return?Tuple2.of(value,?1);
????????????}
????????});

????????//分组
????????UnsortedGrouping<Tuple2<String,?Integer>>?grouped?=?wordAndOne.groupBy(0);

????????//聚合
????????AggregateOperator<Tuple2<String,?Integer>>?result?=?grouped.sum(1);
????????//输出结果-sink
????????result.print();

????}
}

(2)基于DataStream(匿名内部类-处理流)

package?com.tjcu;
import?org.apache.flink.api.common.RuntimeExecutionMode;
import?org.apache.flink.api.common.functions.FlatMapFunction;
import?org.apache.flink.api.common.functions.MapFunction;
import?org.apache.flink.api.java.ExecutionEnvironment;
import?org.apache.flink.api.java.operators.DataSource;
import?org.apache.flink.api.java.operators.FlatMapOperator;
import?org.apache.flink.api.java.tuple.Tuple2;
import?org.apache.flink.streaming.api.datastream.DataStream;
import?org.apache.flink.streaming.api.datastream.DataStreamSource;
import?org.apache.flink.streaming.api.datastream.KeyedStream;
import?org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.util.Collector;;

/**
?*?@author?:王恒杰
?*?@date?:Created?in?2022/4/20?16:07
?*?@description:需求:使用Flink完成WordCount-DataStream?批处理
?*??*?编码步骤
?*??*?1.准备环境-env
?*??*?2.准备数据-source
?*??*?3.处理数据-transformation
?*??*?4.输出结果-sink
?*??*?5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
?*/
public?class?TestDataStream?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//新版本的流批统一API,既支持流处理也支持批处理
????????//1.准备环境?env
????????StreamExecutionEnvironment?executionEnvironment?=?StreamExecutionEnvironment.getExecutionEnvironment();
//?????????executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);//注意:使用DataStream实现批处理
????????executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);//注意:使用DataStream实现流处理
????????//executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//注意:使用DataStream根据数据源自动选择使用流还是批

????????//2.准备数据-source
????????DataStreamSource<String>?stringDataStreamSource?=?executionEnvironment.fromElements("whj?hadoop?spark",?"whj?spark",?"whj");
????????//3.处理数据-transformation
????????//3.1每一行数据按照空格切分成一个个的单词组成一个集合
????????DataStream<String>?words?=?stringDataStreamSource.flatMap(new?FlatMapFunction<String,?String>()?{
????????????@Override
????????????public?void?flatMap(String?value,?Collector<String>?out)?throws?Exception?{
????????????????//value就是每一行数据
????????????????String[]?arr?=?value.split("?");
????????????????for?(String?word?:?arr)?{
????????????????????out.collect(word);
????????????????}
????????????}
????????});
????????DataStream<Tuple2<String,?Integer>>?wordAndOne?=?words.map(new?MapFunction<String,?Tuple2<String,?Integer>>()?{
????????????@Override
????????????public?Tuple2<String,?Integer>?map(String?value)?throws?Exception?{
????????????????//value就是一个个单词
????????????????return?Tuple2.of(value,?1);
????????????}
????????});
????????//分组:注意DataSet中分组是groupBy,DataStream分组是keyBy
????????KeyedStream<Tuple2<String,?Integer>,?String>?grouped?=?wordAndOne.keyBy(t?->?t.f0);

????????//聚合
????????SingleOutputStreamOperator<Tuple2<String,?Integer>>?result?=?grouped.sum(1);

????????//TODO?3.sink
????????result.print();

????????//启动并等待程序结束
????????executionEnvironment.execute();

????}
}

  • 注意:在Flink1.12中DataStream既支持流处理也支持批处理,如何区分?

env.setRuntimeMode(RuntimeExecutionMode.BATCH);//注意:使用DataStream实现批处理
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//注意:使用DataStream实现流处理
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//注意:使用DataStream根据数据源自动选择使用流还是批

(3)代码实现-DataStream-Lambda

package?com.tjcu;

import?org.apache.flink.api.common.RuntimeExecutionMode;
import?org.apache.flink.api.common.typeinfo.Types;
import?org.apache.flink.api.java.tuple.Tuple2;
import?org.apache.flink.streaming.api.datastream.DataStream;
import?org.apache.flink.streaming.api.datastream.DataStreamSource;
import?org.apache.flink.streaming.api.datastream.KeyedStream;
import?org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.util.Collector;;import?java.util.Arrays;

/**
?*?@author?:王恒杰
?*?@date?:Created?in?2022/4/20?16:07
?*?@description:需求:使用Flink完成WordCount-DataStream?批处理
?*??*?编码步骤
?*??*?1.准备环境-env
?*??*?2.准备数据-source
?*??*?3.处理数据-transformation
?*??*?4.输出结果-sink
?*??*?5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
?*/
public?class?TestDataStream?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//新版本的流批统一API,既支持流处理也支持批处理
????????//1.准备环境?env
????????StreamExecutionEnvironment?executionEnvironment?=?StreamExecutionEnvironment.getExecutionEnvironment();
//?????????executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);//注意:使用DataStream实现批处理
????????executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);//注意:使用DataStream实现流处理
????????//executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//注意:使用DataStream根据数据源自动选择使用流还是批

????????//2.准备数据-source
????????DataStreamSource<String>?stringDataStreamSource?=?executionEnvironment.fromElements("whj?hadoop?spark",?"whj?spark",?"whj");
????????//3.处理数据-transformation
????????//3.1每一行数据按照空格切分成一个个的单词组成一个集合
????????SingleOutputStreamOperator<String>?words?=?stringDataStreamSource.flatMap(
????????????????(String?value,?Collector<String>?out)?->?Arrays.stream(value.split("?")).forEach(out::collect)
????????).returns(Types.STRING);

????????DataStream<Tuple2<String,?Integer>>?wordAndOne?=?words.map(
????????????????(String?value)?->?Tuple2.of(value,?1)
????????).returns(Types.TUPLE(Types.STRING,Types.INT));
????????//分组:注意DataSet中分组是groupBy,DataStream分组是keyBy
????????KeyedStream<Tuple2<String,?Integer>,?String>?grouped?=?wordAndOne.keyBy(t?->?t.f0);

????????//聚合
????????SingleOutputStreamOperator<Tuple2<String,?Integer>>?result?=?grouped.sum(1);

????????//TODO?3.sink
????????result.print();

????????//启动并等待程序结束
????????executionEnvironment.execute();

????}
}

(4)On-Yarn-掌握

  • 注意:写入HDFS如果存在权限问题,进行如下设置:

hadoop?fs?-chmod?-R?777??/

并在代码中添加:

System.setProperty("HADOOP_USER_NAME",?"root")
  • 代码实现:

package?com.tjcu;

import?org.apache.flink.api.common.RuntimeExecutionMode;
import?org.apache.flink.api.common.typeinfo.Types;
import?org.apache.flink.api.java.functions.KeySelector;
import?org.apache.flink.api.java.tuple.Tuple2;
import?org.apache.flink.api.java.utils.ParameterTool;
import?org.apache.flink.streaming.api.datastream.DataStream;
import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.util.Collector;

import?java.util.Arrays;


/**
?*?@author?:王恒杰
?*?@date?:Created?in?2022/4/20?16:28
?*?@description:使用Flink完成WordCount-DataStream--使用lambda表达式--修改代码使适合在Yarn上运行?编码步骤
?*?*?1.准备环境-env
?*?*?2.准备数据-source
?*?*?3.处理数据-transformation
?*?*?4.输出结果-sink
?*?*?5.触发执行-execute//批处理不需要调用!流处理需要
?*/
public?class?TestDataStreamYarn?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//获取参数
????????ParameterTool?params?=?ParameterTool.fromArgs(args);
????????String?output?=?null;
????????if?(params.has("output"))?{
????????????output?=?params.get("output");
????????}?else?{
????????????output?=?"hdfs://node1:8020/wordcount/output_"?+?System.currentTimeMillis();
????????}
????????//1.准备环境-env
????????StreamExecutionEnvironment?executionEnvironment?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

????????//2.准备数据源-source
????????DataStream<String>?linesDS?=?executionEnvironment.fromElements("whj?hadoop?spark",?"whj?hadoop?spark",?"whj?hadoop",?"whj");

????????//3.处理数据-transformation
????????DataStream<Tuple2<String,?Integer>>?result?=?linesDS
????????????????.flatMap(
????????????????????????(String?value,?Collector<String>?out)?->?Arrays.stream(value.split("?")).forEach(out::collect)
????????????????).returns(Types.STRING)
????????????????.map(
????????????????????????(String?value)?->?Tuple2.of(value,?1)
????????????????).returns(Types.TUPLE(Types.STRING,?Types.INT))
????????????????//.keyBy(0);
????????????????.keyBy((KeySelector<Tuple2<String,?Integer>,?String>)?t?->?t.f0)
????????????????.sum(1);

????????//4.输出结果-sink
????????result.print();

????????//如果执行报hdfs权限相关错误,可以执行?hadoop?fs?-chmod?-R?777??/
????????System.setProperty("HADOOP_USER_NAME",?"root");//设置用户名
????????//result.writeAsText("hdfs://node1:8020/wordcount/output_"+System.currentTimeMillis()).setParallelism(1);
????????result.writeAsText(output).setParallelism(1);

????????//5.触发执行-execute
????????executionEnvironment.execute();
????}
}

  • 打包

  • 上传

  • 提交执行

/usr/apps/flink/bin/flink?run?-Dexecution.runtime-mode=BATCH?-m?yarn-cluster?-yjm?1024?-ytm?1024?
?/usr/apps/words.jar?--output?hdfs://node1:8020/wordcount/output_xx

  • 在Web页面可以观察到提交的程序:

http://node1:8088/cluster或者http://node1:50070/explorer.html#/

三、Flink原理

1、Flink角色分工

  • 在实际生产中,Flink 都是以集群在运行,在运行的过程中包含了两类进程。

#1.?JobManager
???他扮演的是集群管理者的角色,负责调度任务,协调checkpoints、协调故障恢复、
???收集?Job?的状态信息,并管理?Flink?集群中的从节点?TaskManager。
#2.TaskManager
???实际负责执行计算的?Worker,在其上执行?Flink?Job?的一组?Task;
???TaskManager?还是所在节点的管理员,
???它负责把该节点上的服务器信息比如内存、磁盘、任务运行情况等向?JobManager?汇报。
#3.Client:
用户在提交编写好的?Flink?工程时,会先创建一个客户端再进行提交,这个客户端就是?Client
  • 上面个案例的运行就是从节点执行的

  • 角色分工

2、Flink执行流程

用户首先提交Flink程序到JobClient,经过JobClient的处理、解析、优化提交到JobManager,最后由TaskManager运行task。

(1)Standalone版本

(2)On Yarn版本实现原理

1.?Client向HDFS上传Flink的Jar包和配置

2.?Client向Yarn?ResourceManager提交任务并申请资源

3.?ResourceManager分配Container资源并启动ApplicationMaster,
??然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager
??
4.?ApplicationMaster向ResourceManager申请工作资源,
??NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
??
5.?TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务

3、Flink Streaming Dataflow

(1)Dataflow、Operator、Partition、SubTask、Parallelism

1.?Dataflow:Flink程序在执行的时候会被映射成一个数据流模型

2.?Operator:数据流模型中的每一个操作被称作Operator,Operator分为:Source/Transform/Sink
??????????准备数据-Source
??????????处理数据-Transformation
??????????输出结果-Sink

3.?Partition:数据流模型是分布式的和并行的,执行中会形成1~n个分区

4.?Subtask:多个分区任务可以并行,每一个都是独立运行在一个线程中的,也就是一个Subtask子任务

5.?Parallelism:并行度,就是可以同时真正执行的子任务数/分区数

(2) Operator传递模式

数据在两个operator(算子)之间传递的时候有两种模式:

  • One to One模式:

两个operator用此模式传递的时候,会保持数据的分区数和数据的排序;如上图中的Source1到Map1,它就保留的Source的分区特性,以及分区元素处理的有序性。--类似于Spark中的窄依赖

  • Redistributing 模式:

这种模式会改变数据的分区数;每个一个operator subtask会根据选择transformation把数据发送到不同的目标subtasks,比如keyBy()会通过hashcode重新分区,broadcast()和rebalance()方法会随机重新分区。--类似于Spark中的宽依赖

(3)Operator Chain(算子链)

客户端在提交任务的时候会对Operator进行优化操作,能进行合并的Operator会被合并为一个Operator,合并后的Operator称为Operator chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行--就是SubTask。

(4)TaskSlot(任务槽) And Slot Sharing(槽共享)

  • 任务槽(TaskSlot)

每个TaskManager是一个JVM的进程, 为了控制一个TaskManager(worker)能接收多少个task,Flink通过Task Slot来进行控制。TaskSlot数量是用来限制一个TaskManager工作进程中可以同时运行多少个工作线程,TaskSlot 是一个 TaskManager 中的最小资源分配单位,一个 TaskManager 中有多少个 TaskSlot 就意味着能支持多少并发的Task处理。

Flink将进程的内存进行了划分到多个slot中,内存被划分到不同的slot之后可以获得如下好处:

1.??TaskManager最多能同时并发执行的子任务数是可以通过TaskSolt数量来控制的
2.??TaskSolt有独占的内存空间,这样在一个TaskManager中可以运行多个不同的作业,作业之间不受影响。

  • 槽共享(Slot Sharing)

Flink允许子任务共享插槽,即使它们是不同任务(阶段)的子任务(subTask),只要它们来自同一个作业。比如图左下角中的map和keyBy和sink 在一个 TaskSlot 里执行以达到资源共享的目的。

允许插槽共享有两个主要好处:

1.?资源分配更加公平,如果有比较空闲的slot可以将更多的任务分配给它。
2.?有了任务槽共享,可以提高资源的利用率。
  • 注意:

slot是静态的概念,是指taskmanager具有的并发执行能力

parallelism(平行)是动态的概念,是指程序运行时实际使用的并发能力

(5)运行时组件

Flink运行时架构主要包括四个不同的组件,它们会在运行流处理应用程序时协同工作:

  • 作业资源管理器(JobManager): 分配任务、调度checkpoint做快照

  • 任务管理器(TaskManager):主要干活的

  • 资源管理器(ResourceManager):管理分配资源

  • 分发器(Dispatcher):方便递交任务的接口,WebUI

因为Flink是用Java和Scala实现的,所以所有组件都会运行在Java虚拟机上。每个组件的职责如下:

  • 作业管理器(JobManager)

1.?控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager?所控制执行。

2.?JobManager?会先接收到要执行的应用程序,这个应用程序会包括:
???作业图(JobGraph)、逻辑数据流图(logical?dataflow?graph)和打包了所有的类、库和其它资源的JAR包。

3.?JobManager?会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),
????包含了所有可以并发执行的任务。

4.?JobManager?会向资源管理器(ResourceManager)请求执行任务必要的资源,
??也就是任务管理器(TaskManager)上的插槽(slot)。
??一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。
??而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
  • 任务管理器(TaskManager)

1.?Flink中的工作进程。通常在Flink中会有多个TaskManager运行,
???每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。

2.?启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,
???TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。

3.?在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
  • 资源管理器(ResourceManager)

1.?主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger?插槽是Flink中定义的处理资源单元。

2.?Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。

3.?当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。
??如果ResourceManager没有足够的插槽来满足JobManager的请求,
??它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
  • 分发器(Dispatcher)

1.?可以跨作业运行,它为应用提交提供了REST接口。

2.?当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。

3.?Dispatcher也会启动一个Web?UI,用来方便地展示和监控作业执行的信息。

4.?Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。

(6)Flink执行图(ExecutionGraph)

由Flink程序直接映射成的数据流图是StreamGraph,也被称为逻辑流图,因为它们表示的是计算逻辑的高级视图。为了执行一个流处理程序,Flink需要将逻辑流图转换为物理数据流图(也叫执行图),详细说明程序的执行方式。

Flink 中的执行图可以分成四层:StreamGraph(逻辑流图) -> JobGraph -> ExecutionGraph -> 物理执行图

1.?StreamGraph:是根据用户通过?Stream?API?编写的代码生成的最初的图。用来表示程序的拓扑?[tuò?pū]?结构。

2.?JobGraph:StreamGraph经过优化后生成了?JobGraph,提交给?JobManager?的数据结构。
?????????????主要的优化为,将多个符合条件的节点?chain?在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。

3.?ExecutionGraph:JobManager?根据?JobGraph?生成ExecutionGraph。方便调度和监控和跟踪各个?tasks?的状态。
????????????????????ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。

4.?物理执行图:JobManager?根据?ExecutionGraph?对?Job?进行调度后,
?????????????在各个TaskManager?上部署?Task?后形成的“图”,并不是一个具体的数据结构。

  • Flink执行executor会自动根据程序代码生成DAG数据流图

  • 简单理解:

1.?StreamGraph:最初的程序执行逻辑流程,也就是算子之间的前后顺序--在Client上生成

2.?JobGraph:将OneToOne的Operator合并为OperatorChain--在Client上生成

3.?ExecutionGraph:将JobGraph根据代码中设置的并行度和请求的资源进行并行化规划!--在JobManager上生成

4.?物理执行图:将ExecutionGraph的并行计划,落实到具体的TaskManager上,将具体的SubTask落实到具体的TaskSlot内进行运行。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-22 18:43:14  更:2022-04-22 18:45:36 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 12:34:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码