IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink任务执行总体流程 -> 正文阅读

[大数据]Flink任务执行总体流程

Flink任务执行总体流程

本文描述一个Flink任务执行的总体流程的前半部分,执行环境的选择。

Flink任务的一个例子

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // the port to connect to
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = 
      				StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
                @Override
                public void flatMap(String value, Collector<WordWithCount> out) {
                    for (String word : value.split("\\s")) {
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>() {
                @Override
                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                    return new WordWithCount(a.word, a.count + b.count);
                }
            });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

Flink任务执行总体步骤

(1)创建一个StreamExecutionEnvironment对象env,也就是流任务的执行环境对象。

(2)调用数据接入函数来接入数据源的数据,流执行环境支持的数据源有:数值集合,文件,socket数据流。支持的模式有:一次性读取,和持续间隔的读取数据。通过数据接入函数,得到一个DataStreamSource对象,这是一个DataSream的子类,表示它是一个流数据的接入源。有了数据接入的源,就可以在该DataStream上执行各种转换(transformation)和处理操作了。

(3)在得到的数据源DataStream(实际上是一个DataStreamSource)对象上执行各种转换操作,比如:map,flatMap,ByKey等。这一步,在实现层面这些操作其实并没有执行,而是把这些操作添加到流执行环境env的转换操作数组中。

每次在调用transformation操作时,都会调用DataStream#doTransform函数,而在该函数中,会根据该转换操作来创建一个SingleOutputStreamOperator对象,并把该对象添加到流执行环境env的转换操作数组中,该数组是由变量:StreamExecutionEnvironment#transformations来定义的。

(4)当调用的转换操作完成时,就会调用流执行环境env的execute()函数来执行这些转换操作,从而得到最终的结果。

执行环境

Flink提供了两套执行环境,一套是批执行环境,其父类是:ExecutionEnvironment。一套是流执行,其父类是:StreamExecutionEnvironment。这里我们关注流执行环境。对于流执行环境,又可以分为本地环境和远端环境,各种类型的环境对应的实现类,如下表所示:

类别本地环境远端环境
批执行环境LocalEnvironmentRemoteEnvironment
流执行环境LocalStreamEnvironmentRemoteStreamEnvironment

执行任务:execute

当应用程序调用execute(jobName)时,最终会调用父类的StreamExecutionEnvironment#execute来执行任务。该函数会继续调用StreamExecutionEnvironment#executeAsync函数。

由于最终的StreamGraph是要在执行器(需要实现PipelineExecutor接口)中执行,所以在executeAsync函数中,会根据配置信息来获取具体的PipelineExecutorFactory实现类对象,该对象是执行器(Executor)的工厂类,用来获取具体的执行器对象,通过执行器对象的execute(StreamGraph, Configuration)来执行StreamGraph构成的任务。

executeAsync函数

executeAsync函数实现流程如下:

(1)根据配置获取一个Pipeline执行器工厂对象:PipelineExecutorFactory

(2)获取执行器对象,并执行StreamGraph,返回一个CompletableFuture 对象:jobClientFuture。

(3)通过jobClientFuture对象得到JobClient对象:jobClient。

其代码概要如下:

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
		... // 删减代码
   // 根据配置,获取pipeline执行器工厂对象
   final PipelineExecutorFactory executorFactory =
      executorServiceLoader.getExecutorFactory(configuration);
  
  	... // 省略代码
   // 获取执行器对象,并执行StreamGraph,返回异步结果获取对象
   // 通过Executor工厂获取一个Executor,并通过该Executor来执行streamGraph。
   CompletableFuture<JobClient> jobClientFuture = executorFactory
      .getExecutor(configuration)
      .execute(streamGraph, configuration);
	 ... // 删减代码 
  
   // 异步获取执行结果
   try {
      JobClient jobClient = jobClientFuture.get();
      jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
      return jobClient;
   } catch (ExecutionException executionException) {
     ... //
   }
}

获取Executor的工厂对象

根据配置来获取Executor的工厂对象。可能的工厂类有:

  • LocalExecutorFactory:创建本地Executor工厂类,该工厂创建的executor是:LocalExecutor。
  • RemoteExecutorFactory:创建远程Executor的工厂类。该工厂创建的Executor是:RemoteExecutor。当部署模式为standalone是,会创建此类Executor。
  • KubernetesSessionClusterExecutorFactory:创建这样的executor,它们在一个存在的session集群上执行。该工厂创建的Executor是:KubernetesSessionClusterExecutor。
  • YarnJobClusterExecutorFactory:创建在指定的cluster上执行job的工厂类。该工厂创建的Executor是:YarnJobClusterExecutor。
  • YarnSessionClusterExecutorFactory:创建在yarnSession上执行的Executor的工厂类。该工厂创建的Executor是 :YarnSessionClusterExecutor。

Executor的工厂对象

不同的执行器工厂获得的执行器对象不同。使用哪种执行器工厂来创建执行器是由Flink任务的执行参数来决定的。Executor的工厂对象和执行器类的对应关系如下表:

执行器工厂执行器说明
YarnSessionClusterExecutorFactoryYarnSessionClusterExecutorYarn session模式
YarnJobClusterExecutorFactoryYarnJobClusterExecutorYarn job模式
RemoteExecutorFactoryRemoteExecutorstandalone模式
LocalExecutorFactoryLocalExecutor本地模式
KubernetesSessionClusterExecutorFactoryKubernetesSessionClusterExecutork8s模式
EmbeddedExecutorFactoryEmbeddedExecutor嵌入模式
WebSubmissionExecutorFactoryEmbeddedExecutor

每个具体的执行器都有一个execute()函数,该函数是StreamGraph的执行者。execute函数的流程和每个执行器和执行环境相关,每个执行器的具体的执行过程,后面会继续分析。

小结

本文描述了Flink任务执行的总体流程,不同的执行环境其执行的流程不同,具体的执行流程在后面继续分析。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-27 09:57:56  更:2021-11-27 10:00:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 13:55:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码