2021SC@SDUSC
目录
SparkUI
1.listenerBus
2.构造JobProgressListener
SparkUI
1.listenerBus
listenerBus的类型是LiveListenerBus。LiveListenerBus实现了监听器模型,通过监听事件触发对各种监控器监听状态信息的修改,达到Ui界面的数据刷新的效果。它由以下几部分组成:
- 事件阻塞队列,类型为LinkedBlockingQueue[SparkListenerEvent]
- 监听器数组,类型为ArrayBuffer[SparkListener]
- 事件匹配监听器的线程
listenerBus的代码如下:
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent] (EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false
// A counter that represents the number of events produced and consumed in the queue
private val eventLock = new Semaphore(0)
private val listenerThread = new Thread("SparkListenerBus") {
setDaemon(true)
override def run(): Unit = Utils.logUncaughtExeceptions {
while(true) {
eventLock.acquire()
// Atomically remove and process this event
LiveListenerBus.this.synchronized {
val event = eventQueue.poll
if (event == Spark.ListenerShutdown) {
// Get out of the while loop and shutdown the daemon thread return
}
Option(event).foreach(postToAll)
}
}
}
}
def start() {
if (started) {
throw new IllegalStateException("Listener bus already started!")
}
listenerThread.start()
started = true
}
def post(event: SparkListenerEvent) {
val eventAdded = eventQueue.offer(event)
if (eventAdded) {
eventLock.release()
} else {
logQueueFullErrorMessage()
}
}
def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }
def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty}
def stop() {
if (!started) {
throw new IllegalStateException("Listener bus already started!")
}
listenerThread.start()
started = true
}
def post (event: SparkListenerEvent) {
val eventAdded = eventQueue.offer(event)
if (eventAdded) {
eventLock.release()
} else {
logQueueFullErrorMessage()
}
}
def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }
def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }
def stop() {
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
}
post(SparkListenerShutdown)
listenerThread.join()
}
LiveListenerBus中调用的postToAll方法实际定义在父类SparkListenerBus中,代码如下:
protected val sparkListeners = new ArrayBuffer[SparkListener]
with mutable.SynchronizedBuffer[SparkListener]
def addListener(listener; SparkListener) {
sparkListeners += listener
}
def postToAll(event: SparkListenerEvent) {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
foreachListener(_.onStageSubmitted(stageSubmitted))
case stageCompleted: SparkListenerStageCompleted =>
foreachListener(_.onStageCompleted(stageCompleted))
case jobStart: SparkListenerJobStart =>
foreachListener(_.onJobStart(jobStart))
case jobEnd: SparkListenerJobEnd =>
foreachListener(_.onJobEnd(jobEnd))
case taskStart: SparkListenerTaskStart =>
foreachListener(_.onTaskStart(taskStart))
case taskGettingResult: SparkListenerTaskGettingResult =>
foreachListener(_.onTaskGettingResult(taskGettingResult))
case taskEnd: SparkListenerTaskEnd =>
foreachListener(_.onTaskEnd (taskEnd))
case environmentUpdate: SparkListenerEnvironmentUpdate =>
foreachListener(_.onEnvironmentUpdate(environmentUpdate))
case blockManagerAdded: SparkListenerBlockManagerAdded =>
foreachListener(_.onBlockManagerAdded(blockManagerAdded))
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))
case unpersistRDD: SparkListenerUnpersistROD =>
foreachListener(_.onUnpersistRDD(unpersistRDD))
case applicationStart: SparkListenerApplicationStart =>
foreachListener(_.onApplicationStart(applicationStart))
case applicationEnd: SparkListenerApplicationEnd =>
foreachListener(_.onApplicationEnd(applica七ionEnd))
case metricsUpdate: SparkListener:ExecutorMetricsUpdate =>
foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
case SparkListenerShutdown =>
}
}
private def foreachListener(f: SparkListener => Unit): Unit = {
sparkListeners.foreach { listener =>
try{
f(listener)
} catch {
case e: Exception =>
logError(s"Listener ${Utils.getFormattedClassName(Listener)} threw an
exception", e)
}
}
}
2.构造JobProgressListener
JobProgressListener是SparkContext中重要的组成部分,通过监听listenerBus中的事件更新任务进度。创建JobProgressListener的代码如下:
private[spark] val jobProgressListener = new JobProgressListener(conf)
listenerBus.addListener(jobProgressListener)
val statusTracker = new SparkStatusTracker(this)
JobProgressListener的作用是通过HashMap、ListBuffer等数据结构存储JobId及对应的JobUIdata信息,并按照激活、完成、失败等job状态统计。对于StageId、StageInfo等信息按照激活、完成、忽略、失败等Stage状态统计,并且存储StageId与JobId的一对多关系。JobProgressListener的数据结构代码如下:
class JobProgressListener(conf: SparkConf) extends SparkListener with Logging{
import JobProgressListener._
type JobId = Int
type StageId = Int
type StageAttemptId = Int
type PoolName = String
type ExecutorId = String
//Jobs:
val activeJobs = new HashMap[JobId, JobUIData]
val completedJobs = ListBuffer[JobUIData]()
val skippedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()
val stageIdToData = new HashMap[ (StageId, StageAttemptId), StageUiData]
val stageIdToInfo = new HashMap[StageId, StageInfo]
val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]]
val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]] ()
val numCompletedStages = 0
val numFailedStages = 0
//Misc:
val executorIdToBlockManagerId = HashMap[executorId, BlockManagerId] ()
def blockManagerIds = executorIdToBlockManagerId.values.toSeq
var schedulingMode: Option[SchedulingMode] = None
val retainedStages = conf.getint("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
JobProgressListener实现了onJobStart、 onJobEnd、 onStageCompleted、 onStageSubmitted、 onTaskStart、 onTask.End等方法.
|