IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink 作业提交流程 -> 正文阅读

[大数据]flink 作业提交流程

之前给大家介绍了DataStream API中 Environment 和 Transformation 连个体系的源代码,今天来了小插曲,给大家宏观介绍下 Flink 作业的提交流程,希望对大家有帮助。

一、DataStream 作业提交流程

1)、首先,先给大家展示下流程图:
在这里插入图片描述

2)、提交流程说明:

FlinkCli 先创建一个 Flink 环境变量

然后将环境变量存入到ThreadLocal中

在启动 Flink 作业jar包的 main 方法

Flink 应用程序通过 StreamExecutionEnvironment.getExecutionEnvironment() 获取到相应的执行环境变量

Flink 应用程序将用户编写的作业转换成 jobGraph 提交给Flink 集群

3)、Flink 作业以哪种方式提交,取决于 StreamExecutionEnvironment 的配置信息;

起到主要作用的配置参数是 execution.target;

execution.target 取值:

remote

local

yarn-per-job

yarn-session

kubernetes-session

yarn-application

kubernetes-application

StreamExecutionEnvironment 会根据 execution.target 配置的不同取值创建相应的 PipelineExecutorFactory, 再由 PipelineExecutorFactory 创建相应的 PipelineExecutor, PipelineExecutor执行相应的作业提交工作;

源代码探究:

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute()

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(String jobName)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamGraph streamGraph)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamGraph streamGraph)

org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(final Configuration configuration) (见 代码 3-1)

ExecutorFactory 举例,org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory,(见代码 3-2)

代码 3-1

@Override
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
checkNotNull(configuration);

// 通过 java SPI 技术加载 实现了 PipelineExecutorFactory 接口的类
final ServiceLoader loader =
ServiceLoader.load(PipelineExecutorFactory.class);

final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
final Iterator<PipelineExecutorFactory> factories = loader.iterator();
while (factories.hasNext()) {
    try {
        final PipelineExecutorFactory factory = factories.next();
        // 根据 execution.target 的取值 过滤出匹配到的 PipelineExecutorFactory 
        if (factory != null && factory.isCompatibleWith(configuration)) {
            compatibleFactories.add(factory);
        }
    } catch (Throwable e) {}
}

if (compatibleFactories.size() > 1) {}
if (compatibleFactories.isEmpty()) {}
return compatibleFactories.get(0);

}

代码 3-2

@Internal
public class YarnSessionClusterExecutorFactory implements PipelineExecutorFactory {

@Override
public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
    return YarnSessionClusterExecutor.NAME.equalsIgnoreCase(
            configuration.get(DeploymentOptions.TARGET));
}

}

// 配置选项
public static final ConfigOption TARGET =
key(“execution.target”)

4)、FlinkCli 创建 Flink 环境变量相关流程:

org.apache.flink.client.cli.CliFrontend.main()

org.apache.flink.client.cli.CliFrontend.executeProgram()

org.apache.flink.client.ClientUtils.executeProgram()

public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);

    LOG.info(
            "Starting program (detached: {})",
            !configuration.getBoolean(DeploymentOptions.ATTACHED));

    ContextEnvironment.setAsContext(
            executorServiceLoader,
            configuration,
            userCodeClassLoader,
            enforceSingleJobExecution,
            suppressSysout);

// 设置流环境变量
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);

    try {
        // 启动用户程序的main方法
        program.invokeInteractiveModeForExecution();
    } finally {
        ContextEnvironment.unsetAsContext();
        StreamContextEnvironment.unsetAsContext();
    }
} finally {
    Thread.currentThread().setContextClassLoader(contextClassLoader);
}

}

5)、StreamExecutionEnvironment.getExecutionEnvironment() 获取执行环境的逻辑:

先从 threadLocal 获取环境变量

如果 threadLocal 中没有相应的环境变量,则创建一个本地环境变量

return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
.orElseGet(StreamExecutionEnvironment::createLocalEnvironment);

public static Optional resolveFactory(ThreadLocal threadLocalFactory, @Nullable T staticFactory) {
final T localFactory = threadLocalFactory.get();
final T factory = localFactory == null ? staticFactory : localFactory;

return Optional.ofNullable(factory);
}

二、Flink Table

1)、flink Sql 作业提交流程
在这里插入图片描述

2)、提交流程说明

TableEnvironmentImpl 在创建的过程中创建了 Executor , ExecutorBase 中包含了StreamExecutionEnvironment 的实例, StreamExecutionEnvironment 的实例由 StreamExecutionEnvironment .getExecutionEnvironment() 方法创建。

TableEnvironmentImpl 作业的提交依赖 StreamExecutionEnvironment 的作业提交流程。

TableEnvironmentImpl 借助Parser组件将 SQL 语句转换成 Operation,然后借助 Planner组件将Operation转换成 List。

使用StreamExecutionEnvironment 将 List 转换成 StreamGraph。

后续操作与DataStream提交流程一样。

3)、 TableEnvironmentImpl .executeSql() 执行逻辑:

Sql 解析, 将Sql语句解析为 List 变量;

Transformation转换,将 List 转换为 List<Transformation<?>>

PipeLine转换, 将List<Transformation<?>> 转换为 PipeLine

4)、TableEnvironmentImpl 创建过程:

ModuleManager 的创建

CatalogManager 的创建

FunctionCatalog 的创建

Executor (执行环境)的创建, 先通过 java SPI 加载 Executor 工厂, 通过EnvironmentSettings.Builder.useBlinkPlanner() 指定为 org.apache.flink.table.planner.delegation.BlinkExecutorFactory

Planner的创建(包括Parser的构造),先通过 java SPI 加载 Planner 工厂,通过EnvironmentSettings.Builder.useBlinkPlanner() 指定为org.apache.flink.table.planner.delegation.BlinkPlannerFactory

构造TableEnvironmentImpl

5)、Sql解析 (Blink Planner: StreamPlanner / BatchPlanner)

基本流程:

Sql语句解析成Sql 抽象语法树

Planner对sql 语法树进行验证

将验证过的语法树转换成关系代数树

将关系代数树封装成Flink对应的Operation

public List parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
SqlNode parsed = parser.parse(statement);

Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);

Calsite :Sql 解析框架

SqlNode 代表Sql 抽象语法树中的节点,CalciteParser 内部使用 SqlParser 将Sql语句解析成Sql 抽象语法树。

Operation (Flink Table API中抽象出来的概念) 代表任意类型的Sql操作行为,例如 Select 、Insert、Drop 等sql操作可以表示为QueryOperation、CatalogSinkModifyOperation、DropOperation。FlinkPlannerImpl内部使用 Calsite 的 SqlToRelConverter 将验证后的抽象语法树转换成关系代数树。

6)、Operation 转换为 Transformation 逻辑 (Blink Planner : StreamPlanner / BatchPlanner)

基本流程:

从Operation中 获取到 关系代数树

根据优化规则优化关系代数树

生成物理执行计划

将物理执行计划转换成 List<Transformation<?>>

override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[]] = {
validateAndOverrideConfiguration()
if (modifyOperations.isEmpty) {
return List.empty[Transformation[
]]
}

val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
val transformations = translateToPlan(execGraph)
cleanupInternalConfigurations()
transformations
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-10 11:07:42  更:2021-12-10 11:08:25 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 7:43:44-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码