IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink 1.13 CliFrontend之run源码解读 -> 正文阅读

[大数据]Flink 1.13 CliFrontend之run源码解读

提示:本文的源码是基于Flink 1.13版本


前言

上一篇博文介绍了runApplication的源码,这篇文章再简单的解读一下run的源码,也是最常见的pre-job的运行模式。如果你看了上一篇博文,相信这篇文章看起来会很简单,因为run的代码比runApplication简单些,很多地方基本是复用的


一、per-job模式提交命令

./bin/flink run 
-m yarn-cluster 
-c com.bigdata.test 
-yqu root.users.flink /home/bigdata/test.jar

二、源码解读

1. run方法解读

在解析启动命令为run之后,会调用run方法

 protected void run(String[] args) throws Exception {
        LOG.info("Running 'run' command.");
        //添加常用的命令行操作
        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        //根据命令行出入的参数解析命令命令行操作
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);

        //判断操作是否为help操作,打印帮助信息,并返回
        if (commandLine.hasOption(HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRun(customCommandLines);
            return;
        }

        //根据参数信息,筛选出有效的命令行操作。
        final CustomCommandLine activeCommandLine =
                validateAndGetActiveCommandLine(checkNotNull(commandLine));
        //初始化程序操作,如果是python程序通过反射初始化,如果是java程序通过programOptions获取必要参数进行初始化
        final ProgramOptions programOptions = ProgramOptions.create(commandLine);
        //根据jar和入口类获取程序运行所需要的依赖
        final List<URL> jobJars = getJobJarAndDependencies(programOptions);
        //获取有效的配置
        final Configuration effectiveConfiguration =
                getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

        LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
        //根据命令行操作选择和有效配置创建PackagedProgram
        try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
            //执行项目
            executeProgram(effectiveConfiguration, program);
        }
    }

2. executeProgram方法解读

 public static void executeProgram(
            PipelineExecutorServiceLoader executorServiceLoader,
            Configuration configuration,
            PackagedProgram program,
            boolean enforceSingleJobExecution,
            boolean suppressSysout)
            throws ProgramInvocationException {
        checkNotNull(executorServiceLoader);
        final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
        final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(userCodeClassLoader);

            LOG.info(
                    "Starting program (detached: {})",
                    !configuration.getBoolean(DeploymentOptions.ATTACHED));

            ContextEnvironment.setAsContext(
                    executorServiceLoader,
                    configuration,
                    userCodeClassLoader,
                    enforceSingleJobExecution,
                    suppressSysout);

            StreamContextEnvironment.setAsContext(
                    executorServiceLoader,
                    configuration,
                    userCodeClassLoader,
                    enforceSingleJobExecution,
                    suppressSysout);

            try {
                //利用反射执行传入的入口类
                program.invokeInteractiveModeForExecution();
            } finally {
                ContextEnvironment.unsetAsContext();
                StreamContextEnvironment.unsetAsContext();
            }
        } finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }
    }

总结

由于之前介绍过了runApplication的源码,这里就不再赘述复用的代码了,大体流程都差不多,解析命令、解析参数,构建ProgramOptions,获取依赖,获取有效配置,不同的是最后一步,run是直接执行程序,其本质是通过反射执行jar里面的main函数,而runApplication会调用deployer.run(effectiveConfiguration, applicationConfiguration)把程序提交部署到集群去。所以在per-job模式下,在flink程序的Main函数的env.execute没执行前都是在客户端执行,只有当执行到env.execute才会提交到集群,而Application模式下则会直接先提交到集群,构建JobGraph,然后执行Main方法。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-31 16:42:40  更:2021-07-31 16:42:42 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 23:06:17-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码