2.9 启动测量系统 MetricsSystem
使用 codahale 提供的第三方测量仓库 Metrics MetricsSystem 有三个概念: instance 谁在用测量数据, source 从哪收集数据, sink 往哪里输出数据;
按照instance分类有 master, worker, application, driver, executor 按照sink分类有 consolesink csvsink jmxsink metricssevlet graphitesink 等 MetricsServlet作为默认的Sink SparkContext的启动代码
_env.metricsSystem.start()
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
def start(registerStaticSources: Boolean = true) {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
if (registerStaticSources) {
StaticSources.allSources.foreach(registerSource)
registerSources()
}
registerSinks()
sinks.foreach(_.start)
}
SparkEnv.scala脚本中的启动代码如下:
val metricsSystem = if (isDriver) {
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
MetricsSystem 启动过程如下: 注册source->注册sinks->给sinks增加Jetty的ServeletContextHandler SparkEnv.metricsSystem -> val ms = MetricsSystem.createMetricsSystem -> MetricsSystem的objcet MetricsSystem-> MetricsSystem.MetricsSystem.createMetricsSystem new MetricsSystem ->MetricsSystem.scala 构建代码如下
private[spark] class MetricsSystem private (
val instance: String,
conf: SparkConf,
securityMgr: SecurityManager)
extends Logging {
private[this] val metricsConfig = new MetricsConfig(conf)
private val sinks = new mutable.ArrayBuffer[Sink]
private val sources = new mutable.ArrayBuffer[Source]
private val registry = new MetricRegistry()
private var running: Boolean = false
private var metricsServlet: Option[MetricsServlet] = None
通过_env.metricsSystem.start()这个逻辑启动metricssystem
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
attachHandler->new ServletParams(request => getMetricsSnapshot(request), “text/json”), securityMgr, conf) -> getMetricsSnapshot
2.9.1 注册sources registersources 注册 sources;告诉测量系统从哪收集数据
private def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val source = Utils.classForName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}
从metricsconfig中获取参数;匹配driver的properties中以source.开头的属性 将每个source的metricregistry注册到concurrentmap
2.9.2 注册sinks registerSinks;类似registersources
private def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
sinkConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
if (null != classPath) {
try {
val sink = Utils.classForName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
if (kv._1 == "servlet") {
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
} else {
sinks += sink.asInstanceOf[Sink]
}
} catch {
case e: Exception =>
logError("Sink class " + classPath + " cannot be instantiated")
throw e
}
}
}
}
首先获取参数,然后利用sink_regex正则表达式,获取的配置信息 metricsServlet反射得到metricsservlet实例;如果属性的key是servlet,将其设置为 MetricsServlet;如果是sink,加入到[Sink]中
2.9.3 给sinks增加Jetty的ServeletContextHandler
为了在sparkui访问到测量数据,需要给sinks增加jetty的ServeletContextHandler
def getServletHandlers: Array[ServletContextHandler] = {
require(running, "Can only call getServletHandlers on a running MetricsSystem")
metricsServlet.map(_.getHandlers(conf)).getOrElse(Array())
}
def getHandlers(conf: SparkConf): Array[ServletContextHandler] = {
Array[ServletContextHandler](
createServletHandler(servletPath,
new ServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr, conf)
)
}
生成 ServletContextHandler
2.10 创建和启动 ExecutorAllocationManager
ExecutorAllocationManager 用于对已分配的executor进行管理,创建和启动executorallocationmanager.在SparkContext.scala中
private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] =_executorAllocationManager
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
_env.blockManager.master))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())
默认情况下,不会创建 ExecutorAllocationManager; 可以修改属性 isDynamicAllocationEnabled 可以设置动态分配最小executor数量,动态分配最大executor数量,每个executor可以运行的task数量等信息 start方法将 ExecutorAllocationManager 加入listenerbus中, ExecutorAllocationListener 通过监听 listenerbus 的事件, 动态添加删除executor 通过thread不断添加 executor 遍历 executor 将超时的 executor 杀掉并移除 ExecutorAllocationManager的实现如下
private val intervalMillis: Long = if (Utils.isTesting) {
conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100)
} else {
100
}
private var clock: Clock = new SystemClock()
val listener = new ExecutorAllocationListener
def start(): Unit = {
listenerBus.addToManagementQueue(listener)
val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
}
2.11 ContextCleaner 的创建和启动
ContextCleaner 用于清理那些超出应用范围的 RDD, ShuffleDependency, broadcast等对象 由于配置属性 spark.cleaner.ReferenceTracking 默认是true; 所以会构造启动 ContextCleaner
private[spark] def cleaner: Option[ContextCleaner] = _cleaner
_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())
追踪到 ContextCleaner
private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val referenceBuffer =
Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
private val referenceQueue = new ReferenceQueue[AnyRef]
private val listeners = new ConcurrentLinkedQueue[CleanerListener]()
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
private val periodicGCService: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
...
referenceBuffer:缓存AnyRef的虚引用 referenceQueue:缓存顶级的AnyRef的引用 listeners:监听器数组 cleaningThread:用于具体清理工作的线程
ContextCleaner 和listenerbus一样,监听器模式 线程来处理;调用 keepCleaning; ContextCleaner.keepCleaning;不断调用remove,clean,来清理
2.12 Spark环境更新 SparkContext 初始化中 可能对环境造成影响 所以需要更新环境;调用下面两个方法 postEnvironmentUpdate() postApplicationStart()
private def postApplicationStart() {
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
}
其中的jar包和文件的添加如下
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
->
private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala
->
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
if (jars != null) {
jars.foreach(addJar)
}
if (files != null) {
files.foreach(addFile)
}
溯源,postEnvironmentUpdate的实现
private def postEnvironmentUpdate() {
if (taskScheduler != null) {
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
addedFilePaths)
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
listenerBus.post(environmentUpdate)
}
}
...
val environmentDetails = SparkEnv.environmentDetails的实现如下:sparkENv.scala
def environmentDetails(
conf: SparkConf,
schedulingMode: String,
addedJars: Seq[String],
addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
import Properties._
val jvmInformation = Seq(
("Java Version", s"$javaVersion ($javaVendor)"),
("Java Home", javaHome),
("Scala Version", versionString)
).sorted
val schedulerMode =
if (!conf.contains("spark.scheduler.mode")) {
Seq(("spark.scheduler.mode", schedulingMode))
} else {
Seq.empty[(String, String)]
}
val sparkProperties = (conf.getAll ++ schedulerMode).sorted
SparkContext.postApplicationStart 实现如下
private def postApplicationStart() {
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
}
2.13 创建和启动DAGSchedulerSource, BlockManagerSource
创建 DAGSchedulerSource, BlockManagerSource 之前 首先调用 taskscheduler, poststarthook,为了等待backend就绪
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(_dagScheduler.metricsSource) -> private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
新版本,似乎是取消了 initialDriverMetrics,直接用registersource体现了 TaskScheduler.scala脚本中, postStartHook的实现是空的 TaskSchedulerImpl.scala中,才有 postStartHook 的实现
override def postStartHook() {
waitBackendReady()
}
然后可以层层追溯了 2.14 将SparkContext标记为激活 SparkContext 初始化最后,是将SparkContext状态从 contextBeingConstructed->activeContext
SparkContext.setActiveContext(this, allowMultipleContexts)
private[spark] def setActiveContext(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
contextBeingConstructed = None
activeContext.set(sc)
}
}
2.15 总结 SparkContext的构建 Driver->Conf->Context 首先是SparkEnv,创造执行环境 然后为了保持对所有持久化的RDD的跟踪,使用metadatacleaner; 然后构建SparkUI界面 要注意,Hadoop和Executor的配置和环境变量
接着开始创建任务调度器,TaskScheduler 创建和启动 DAGScheduler, 有向无环图的调度器 启动 TaskScheduler 启动测量系统 MetricsSystem
这就七七八八了 然后创建 ExecutorAllocationManager, 分配管理Executor 创建ContextCleaner,清理器 更新Spark环境,将给定的参数加进去 创建 DAGSchedulerSource BlockManagerSource 最后将SparkContext标记为激活就可以了
|