IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark源码——SparkContext -> 正文阅读

[大数据]Spark源码——SparkContext

在这里插入图片描述
图片来源

三个重点:

  • TaskScheduler,如何注册的Application、Executor如何反注册
  • DAGScheduler
  • SparkUI

TaskScheduler

在这里插入图片描述
SparkContext.createTaskScheduler返回一个tuple。
追踪进createTaskScheduler:
在这里插入图片描述
可以看到三个形参,其中master是启动模式,针对不同的master会match不同的启动逻辑
看一下standalone模式
在这里插入图片描述
可以看到,首先创建了TaskSchedulerImpl
追踪进入,看到TaskSchedulerImpl的描述:
在这里插入图片描述
翻译一下:
通过 SchedulerBackend 为多种类型的集群安排任务。它还可以通过使用 [[LocalSchedulerBackend]] 并将 isLocal 设置为 true 来处理本地设置。它处理通用逻辑,例如确定跨作业的调度顺序、唤醒以启动推测性任务等。
客户端应该首先调用 initialize() 和 start(),然后通过 runTasks 方法提交任务集。

OK,源码先不看,看看创建完TaskSchedulerImpl后干了啥

在这里插入图片描述
创建了StacnaloneSchedulerBackend

完了后scheduler调用initialize方法

追踪进入initialize方法


 def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }

可以看到,进来就创建了一个pool,会根据具体的调度算法来创建。

至此,createTaskScheduler就完了

完了之后看TaskSchedulerImpl的start()方法

在这里插入图片描述
进来就调用了backend.start()方法

进来backend的start方法

这里standalone模式的backend是StandaloneSchedulerBackend

 override def start() {
    super.start()
    launcherBackend.connect()

    // The endpoint for executors to talk to us
    val driverUrl = RpcEndpointAddress(
      sc.conf.get("spark.driver.host"),
      sc.conf.get("spark.driver.port").toInt,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
    // If we're using dynamic allocation, set our initial executor limit to 0 for now.
    // ExecutorAllocationManager will send the real initial limit to the Master later.
    val initialExecutorLimit =
      if (Utils.isDynamicAllocationEnabled(conf)) {
        Some(0)
      } else {
        None
      }
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }

进来首先是参数准备,然后一个重要方法

val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)

这是创建了application的描述
进入ApplicationDescription看看

private[spark] case class ApplicationDescription(
    name: String, 
    maxCores: Option[Int],
    memoryPerExecutorMB: Int,
    command: Command,
    appUiUrl: String,
    eventLogDir: Option[URI] = None,
    // short name of compression codec used when writing event logs, if any (e.g. lzf)
    eventLogCodec: Option[String] = None,
    coresPerExecutor: Option[Int] = None,
    // number of executors this application wants to start with,
    // only used if dynamic allocation is enabled
    initialExecutorLimit: Option[Int] = None,
    user: String = System.getProperty("user.name", "<unknown>")) {

  override def toString: String = "ApplicationDescription(" + name + ")"
}

主要就是描述了当前application的一些情况,比如名字、内存、分配cores等等

完了之后创建AppClient

client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()

进去看看
一段描述

允许应用程序与 Spark 独立集群管理器对话的接口。获取集群事件的主 URL、应用描述和侦听器,并在发生各种事件时回调侦听器。(翻译的)

就是说它的个接口,用来个spark集群通信的,它能接收master 的url、还有applicationDescription、还有一个集群事件的监听器,以及各种事件发生时监听器的回调函数
说白了就是个application和集群之间负责通信的组件

进入client.start()中看看

在这里插入图片描述
只需启动一个 rpcEndpoint;它将回调到侦听器

其中的ClientEndpoint,进去看看
在这里插入图片描述
它是AppClient的一个内部类
其中有两个方法

  • tryRegisterAllMasters():异步注册所有 master 并返回一个数组“Future”以进行取消
    在这里插入图片描述

  • registerWithMaster:异步注册所有的master。它将每隔 REGISTRATION_TIMEOUT_SECONDS 秒调用 registerWithMaster,直到超过 REGISTRATION_RETRIES 次。一旦成功连接到 master,所有调度工作和 Futures 将被取消。 nthRetry 表示这是第 n 次尝试向 master 注册。
    在这里插入图片描述

至此,代码逻辑和开头的图就对应上了。

DAGScheduler

回到SparkContext的createTaskScheduler中,在TashScheduler启动完了,会紧接着启动DAGScheduler
在这里插入图片描述
看到其中的DAGScheduler

进来看看

一大段描述
在这里插入图片描述
翻译一下:

实现面向stage调度的高级调度层。它为每个作业计算stage的 DAG,跟踪实现了哪些 RDD 和stage输出,并找到运行作业的最小时间表。然后它将stage作为taskset提交给在集群上运行它们的底层 TaskScheduler 实现。 TaskSet 包含完全独立的任务,可以根据集群上已有的数据立即运行(例如,前一阶段的地图输出文件),但如果这些数据不可用,它可能会失败。
Spark stage是通过在 shuffle 边界打破 RDD 图来创建的。具有“窄”依赖关系的 RDD 操作,如 map() 和 filter(),在每个阶段都被流水线化为一组任务,但具有 shuffle 依赖关系的操作需要多个阶段(一个阶段编写一组映射输出文件,另一个阶段)在shuffle后读取这些文件)。最终,每个阶段将只对其他阶段有 shuffle 依赖,并且可能在其中计算多个操作。这些操作的实际流水线发生在各种 RDD(MappedRDD、FilteredRDD 等)的 RDD.compute() 函数中。
除了提出阶段的 DAG 之外,DAGScheduler 还根据当前缓存状态确定运行每个任务的首选位置,并将这些传递给低级 TaskScheduler。此外,它处理由于 shuffle 输出文件丢失而导致的失败,在这种情况下,可能需要重新提交旧阶段。不是由 shuffle 文件丢失引起的 stage 中的故障由 TaskScheduler 处理,它会在取消整个 stage 之前重试每个任务少量次。

至此,大概的内容差不多,具体细节等到具体情况分析,代码太多也不适合全看。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-13 12:53:31  更:2021-12-13 12:54:16 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 5:44:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码