| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> flink 作业提交流程 -> 正文阅读 |
|
[大数据]flink 作业提交流程 |
之前给大家介绍了DataStream API中 Environment 和 Transformation 连个体系的源代码,今天来了小插曲,给大家宏观介绍下 Flink 作业的提交流程,希望对大家有帮助。 一、DataStream 作业提交流程 1)、首先,先给大家展示下流程图: 2)、提交流程说明: FlinkCli 先创建一个 Flink 环境变量 然后将环境变量存入到ThreadLocal中 在启动 Flink 作业jar包的 main 方法 Flink 应用程序通过 StreamExecutionEnvironment.getExecutionEnvironment() 获取到相应的执行环境变量 Flink 应用程序将用户编写的作业转换成 jobGraph 提交给Flink 集群 3)、Flink 作业以哪种方式提交,取决于 StreamExecutionEnvironment 的配置信息; 起到主要作用的配置参数是 execution.target; execution.target 取值: remote local yarn-per-job yarn-session kubernetes-session yarn-application kubernetes-application StreamExecutionEnvironment 会根据 execution.target 配置的不同取值创建相应的 PipelineExecutorFactory, 再由 PipelineExecutorFactory 创建相应的 PipelineExecutor, PipelineExecutor执行相应的作业提交工作; 源代码探究: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute() org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(String jobName) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamGraph streamGraph) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamGraph streamGraph) org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(final Configuration configuration) (见 代码 3-1) ExecutorFactory 举例,org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory,(见代码 3-2) 代码 3-1 @Override // 通过 java SPI 技术加载 实现了 PipelineExecutorFactory 接口的类
} 代码 3-2 @Internal
} // 配置选项 4)、FlinkCli 创建 Flink 环境变量相关流程: org.apache.flink.client.cli.CliFrontend.main() org.apache.flink.client.cli.CliFrontend.executeProgram() org.apache.flink.client.ClientUtils.executeProgram() public static void executeProgram(
// 设置流环境变量
} 5)、StreamExecutionEnvironment.getExecutionEnvironment() 获取执行环境的逻辑: 先从 threadLocal 获取环境变量 如果 threadLocal 中没有相应的环境变量,则创建一个本地环境变量 return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory) public static Optional resolveFactory(ThreadLocal threadLocalFactory, @Nullable T staticFactory) { return Optional.ofNullable(factory); 二、Flink Table 1)、flink Sql 作业提交流程 2)、提交流程说明 TableEnvironmentImpl 在创建的过程中创建了 Executor , ExecutorBase 中包含了StreamExecutionEnvironment 的实例, StreamExecutionEnvironment 的实例由 StreamExecutionEnvironment .getExecutionEnvironment() 方法创建。 TableEnvironmentImpl 作业的提交依赖 StreamExecutionEnvironment 的作业提交流程。 TableEnvironmentImpl 借助Parser组件将 SQL 语句转换成 Operation,然后借助 Planner组件将Operation转换成 List。 使用StreamExecutionEnvironment 将 List 转换成 StreamGraph。 后续操作与DataStream提交流程一样。 3)、 TableEnvironmentImpl .executeSql() 执行逻辑: Sql 解析, 将Sql语句解析为 List 变量; Transformation转换,将 List 转换为 List<Transformation<?>> PipeLine转换, 将List<Transformation<?>> 转换为 PipeLine 4)、TableEnvironmentImpl 创建过程: ModuleManager 的创建 CatalogManager 的创建 FunctionCatalog 的创建 Executor (执行环境)的创建, 先通过 java SPI 加载 Executor 工厂, 通过EnvironmentSettings.Builder.useBlinkPlanner() 指定为 org.apache.flink.table.planner.delegation.BlinkExecutorFactory Planner的创建(包括Parser的构造),先通过 java SPI 加载 Planner 工厂,通过EnvironmentSettings.Builder.useBlinkPlanner() 指定为org.apache.flink.table.planner.delegation.BlinkPlannerFactory 构造TableEnvironmentImpl 5)、Sql解析 (Blink Planner: StreamPlanner / BatchPlanner) 基本流程: Sql语句解析成Sql 抽象语法树 Planner对sql 语法树进行验证 将验证过的语法树转换成关系代数树 将关系代数树封装成Flink对应的Operation public List parse(String statement) { Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed) Calsite :Sql 解析框架 SqlNode 代表Sql 抽象语法树中的节点,CalciteParser 内部使用 SqlParser 将Sql语句解析成Sql 抽象语法树。 Operation (Flink Table API中抽象出来的概念) 代表任意类型的Sql操作行为,例如 Select 、Insert、Drop 等sql操作可以表示为QueryOperation、CatalogSinkModifyOperation、DropOperation。FlinkPlannerImpl内部使用 Calsite 的 SqlToRelConverter 将验证后的抽象语法树转换成关系代数树。 6)、Operation 转换为 Transformation 逻辑 (Blink Planner : StreamPlanner / BatchPlanner) 基本流程: 从Operation中 获取到 关系代数树 根据优化规则优化关系代数树 生成物理执行计划 将物理执行计划转换成 List<Transformation<?>> override def translate( val relNodes = modifyOperations.map(translateToRel) |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/17 7:43:44- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |