IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark源码分析:第二章(4) -> 正文阅读

[大数据]Spark源码分析:第二章(4)

2.9 启动测量系统 MetricsSystem

使用 codahale 提供的第三方测量仓库 Metrics
MetricsSystem 有三个概念: instance 谁在用测量数据, source 从哪收集数据, sink 往哪里输出数据;

按照instance分类有 master, worker, application, driver, executor
按照sink分类有 consolesink csvsink jmxsink metricssevlet graphitesink 等
MetricsServlet作为默认的Sink
SparkContext的启动代码

// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
_env.metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
//start
def start(registerStaticSources: Boolean = true) {
    require(!running, "Attempting to start a MetricsSystem that is already running")
    running = true
    if (registerStaticSources) {
      StaticSources.allSources.foreach(registerSource)
      registerSources()
    }
    registerSinks()
    sinks.foreach(_.start)
  }

SparkEnv.scala脚本中的启动代码如下:

val metricsSystem = if (isDriver) {
    // Don't start metrics system right now for Driver.
    // We need to wait for the task scheduler to give us an app ID.
    // Then we can start the metrics system.
    MetricsSystem.createMetricsSystem("driver", conf, securityManager)
  } else {
    // We need to set the executor ID before the MetricsSystem is created because sources and
    // sinks specified in the metrics configuration file will want to incorporate this executor's
    // ID into the metrics they report.
    conf.set("spark.executor.id", executorId)
    val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
    ms.start()
    ms
  }

MetricsSystem 启动过程如下:
注册source->注册sinks->给sinks增加Jetty的ServeletContextHandler
SparkEnv.metricsSystem -> val ms = MetricsSystem.createMetricsSystem -> MetricsSystem的objcet MetricsSystem->
MetricsSystem.MetricsSystem.createMetricsSystem new MetricsSystem ->MetricsSystem.scala
构建代码如下

private[spark] class MetricsSystem private (
    val instance: String,
    conf: SparkConf,
    securityMgr: SecurityManager)
  extends Logging {

  private[this] val metricsConfig = new MetricsConfig(conf)

  private val sinks = new mutable.ArrayBuffer[Sink]
  private val sources = new mutable.ArrayBuffer[Source]
  private val registry = new MetricRegistry()

  private var running: Boolean = false

  // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
  private var metricsServlet: Option[MetricsServlet] = None

通过_env.metricsSystem.start()这个逻辑启动metricssystem

_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))//启动了handler

attachHandler->new ServletParams(request => getMetricsSnapshot(request), “text/json”), securityMgr, conf) -> getMetricsSnapshot

2.9.1 注册sources
registersources 注册 sources;告诉测量系统从哪收集数据

private def registerSources() {
  val instConfig = metricsConfig.getInstance(instance)
  val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
  // Register all the sources related to instance
  sourceConfigs.foreach { kv =>
    val classPath = kv._2.getProperty("class")
    try {
      val source = Utils.classForName(classPath).newInstance()
      registerSource(source.asInstanceOf[Source])
    } catch {
      case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
    }
  }
}

从metricsconfig中获取参数;匹配driver的properties中以source.开头的属性
将每个source的metricregistry注册到concurrentmap

2.9.2 注册sinks
registerSinks;类似registersources

private def registerSinks() {
    val instConfig = metricsConfig.getInstance(instance)
    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)

    sinkConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      if (null != classPath) {
        try {
          val sink = Utils.classForName(classPath)
            .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
            .newInstance(kv._2, registry, securityMgr)
          if (kv._1 == "servlet") {
            metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
          } else {
            sinks += sink.asInstanceOf[Sink]
          }
        } catch {
          case e: Exception =>
            logError("Sink class " + classPath + " cannot be instantiated")
            throw e
        }
      }
    }
  }

首先获取参数,然后利用sink_regex正则表达式,获取的配置信息
metricsServlet反射得到metricsservlet实例;如果属性的key是servlet,将其设置为 MetricsServlet;如果是sink,加入到[Sink]中

2.9.3 给sinks增加Jetty的ServeletContextHandler

为了在sparkui访问到测量数据,需要给sinks增加jetty的ServeletContextHandler

//MetricsSystem的 getServletHandler
def getServletHandlers: Array[ServletContextHandler] = {
  require(running, "Can only call getServletHandlers on a running MetricsSystem")
  metricsServlet.map(_.getHandlers(conf)).getOrElse(Array())
}
//调用了metricsServlet的getHandlers;实现方式如下
def getHandlers(conf: SparkConf): Array[ServletContextHandler] = {
  Array[ServletContextHandler](
    createServletHandler(servletPath,
      new ServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr, conf)
  )
}

生成 ServletContextHandler

2.10 创建和启动 ExecutorAllocationManager

ExecutorAllocationManager 用于对已分配的executor进行管理,创建和启动executorallocationmanager.在SparkContext.scala中

private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] =_executorAllocationManager
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
  if (dynamicAllocationEnabled) {
    schedulerBackend match {
      case b: ExecutorAllocationClient =>
        Some(new ExecutorAllocationManager(
          schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
          _env.blockManager.master))
      case _ =>
        None
    }
  } else {
    None
  }
_executorAllocationManager.foreach(_.start())

默认情况下,不会创建 ExecutorAllocationManager; 可以修改属性 isDynamicAllocationEnabled
可以设置动态分配最小executor数量,动态分配最大executor数量,每个executor可以运行的task数量等信息
start方法将 ExecutorAllocationManager 加入listenerbus中, ExecutorAllocationListener 通过监听 listenerbus 的事件, 动态添加删除executor
通过thread不断添加 executor 遍历 executor 将超时的 executor 杀掉并移除
ExecutorAllocationManager的实现如下

// Polling loop interval (ms)
private val intervalMillis: Long = if (Utils.isTesting) {
    conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100)
  } else {
    100
  }
//clock
// Clock used to schedule when executors should be added and removed
private var clock: Clock = new SystemClock()
//listener
// Listener for Spark events that impact the allocation policy
val listener = new ExecutorAllocationListener
//start
def start(): Unit = {
  listenerBus.addToManagementQueue(listener)
  val scheduleTask = new Runnable() {
    override def run(): Unit = {
      try {
        schedule()
      } catch {
        case ct: ControlThrowable =>
          throw ct
        case t: Throwable =>
          logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
      }
    }
  }
  executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
  client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
}

2.11 ContextCleaner 的创建和启动

ContextCleaner 用于清理那些超出应用范围的 RDD, ShuffleDependency, broadcast等对象
由于配置属性 spark.cleaner.ReferenceTracking 默认是true; 所以会构造启动 ContextCleaner

private[spark] def cleaner: Option[ContextCleaner] = _cleaner
_cleaner =
  if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
    Some(new ContextCleaner(this))
  } else {
    None
  }
_cleaner.foreach(_.start())

追踪到 ContextCleaner

private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

  /**
   * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they
   * have not been handled by the reference queue.
   */
  private val referenceBuffer =
    Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)

  private val referenceQueue = new ReferenceQueue[AnyRef]

  private val listeners = new ConcurrentLinkedQueue[CleanerListener]()

  private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

  private val periodicGCService: ScheduledExecutorService =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
    ...

referenceBuffer:缓存AnyRef的虚引用
referenceQueue:缓存顶级的AnyRef的引用
listeners:监听器数组
cleaningThread:用于具体清理工作的线程

ContextCleaner 和listenerbus一样,监听器模式
线程来处理;调用 keepCleaning; ContextCleaner.keepCleaning;不断调用remove,clean,来清理

2.12 Spark环境更新
SparkContext 初始化中 可能对环境造成影响 所以需要更新环境;调用下面两个方法
postEnvironmentUpdate()
postApplicationStart()

private def postApplicationStart() {
    // Note: this code assumes that the task scheduler has been initialized and has contacted
    // the cluster manager to get an application ID (in case the cluster manager provides one).
    listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
      startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
  }

其中的jar包和文件的添加如下

val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
->
// Used to store a URL for each static file/jar together with the file's local timestamp
private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala
->	
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
//addFile, addJar 都和书中描述不一样了吧
// Add each JAR given through the constructor
if (jars != null) {
  jars.foreach(addJar)
}

if (files != null) {
  files.foreach(addFile)
}

溯源,postEnvironmentUpdate的实现

private def postEnvironmentUpdate() {
    if (taskScheduler != null) {
      val schedulingMode = getSchedulingMode.toString
      val addedJarPaths = addedJars.keys.toSeq
      val addedFilePaths = addedFiles.keys.toSeq
      val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
        addedFilePaths)
      val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
      listenerBus.post(environmentUpdate)
    }
  }
  ...

val environmentDetails = SparkEnv.environmentDetails的实现如下:sparkENv.scala

def environmentDetails(
      conf: SparkConf,
      schedulingMode: String,
      addedJars: Seq[String],
      addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {

    import Properties._
    val jvmInformation = Seq(
      ("Java Version", s"$javaVersion ($javaVendor)"),
      ("Java Home", javaHome),
      ("Scala Version", versionString)
    ).sorted

    // Spark properties
    // This includes the scheduling mode whether or not it is configured (used by SparkUI)
    val schedulerMode =
      if (!conf.contains("spark.scheduler.mode")) {
        Seq(("spark.scheduler.mode", schedulingMode))
      } else {
        Seq.empty[(String, String)]
      }
    val sparkProperties = (conf.getAll ++ schedulerMode).sorted

SparkContext.postApplicationStart 实现如下

private def postApplicationStart() {
		  // Note: this code assumes that the task scheduler has been initialized and has contacted
		  // the cluster manager to get an application ID (in case the cluster manager provides one).
		  listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
		    startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
		}

2.13 创建和启动DAGSchedulerSource, BlockManagerSource

创建 DAGSchedulerSource, BlockManagerSource 之前 首先调用 taskscheduler, poststarthook,为了等待backend就绪

// Post init
_taskScheduler.postStartHook()
 _env.metricsSystem.registerSource(_dagScheduler.metricsSource) ->   private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)

新版本,似乎是取消了 initialDriverMetrics,直接用registersource体现了
TaskScheduler.scala脚本中, postStartHook的实现是空的
TaskSchedulerImpl.scala中,才有 postStartHook 的实现

override def postStartHook() {
    waitBackendReady()
  }

然后可以层层追溯了
2.14 将SparkContext标记为激活
SparkContext 初始化最后,是将SparkContext状态从 contextBeingConstructed->activeContext

  // In order to prevent multiple SparkContexts from being active at the same time, mark this
  // context as having finished construction.
  // NOTE: this must be placed at the end of the SparkContext constructor.
  SparkContext.setActiveContext(this, allowMultipleContexts)
private[spark] def setActiveContext(
      sc: SparkContext,
      allowMultipleContexts: Boolean): Unit = {
    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
      assertNoOtherContextIsRunning(sc, allowMultipleContexts)
      contextBeingConstructed = None
      activeContext.set(sc)
    }
  }

2.15 总结
SparkContext的构建
Driver->Conf->Context
首先是SparkEnv,创造执行环境
然后为了保持对所有持久化的RDD的跟踪,使用metadatacleaner;
然后构建SparkUI界面
要注意,Hadoop和Executor的配置和环境变量

接着开始创建任务调度器,TaskScheduler
创建和启动 DAGScheduler, 有向无环图的调度器
启动 TaskScheduler
启动测量系统 MetricsSystem

这就七七八八了
然后创建 ExecutorAllocationManager, 分配管理Executor
创建ContextCleaner,清理器
更新Spark环境,将给定的参数加进去
创建 DAGSchedulerSource BlockManagerSource
最后将SparkContext标记为激活就可以了

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-05 20:22:40  更:2021-07-05 20:23:31 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/21 12:17:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码