IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 山东大学软件工程应用与实践: Spark(七) 代码分析 -> 正文阅读

[大数据]山东大学软件工程应用与实践: Spark(七) 代码分析

2021SC@SDUSC


目录

SparkUI

1.listenerBus

2.构造JobProgressListener


SparkUI

1.listenerBus

listenerBus的类型是LiveListenerBus。LiveListenerBus实现了监听器模型,通过监听事件触发对各种监控器监听状态信息的修改,达到Ui界面的数据刷新的效果。它由以下几部分组成:

  • 事件阻塞队列,类型为LinkedBlockingQueue[SparkListenerEvent]
  • 监听器数组,类型为ArrayBuffer[SparkListener]
  • 事件匹配监听器的线程

listenerBus的代码如下:

private val EVENT_QUEUE_CAPACITY = 10000
    private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent] (EVENT_QUEUE_CAPACITY)
    private var queueFullErrorMessageLogged = false
    private var started = false
    // A counter that represents the number of events produced and consumed in the queue
    private val eventLock = new Semaphore(0)

    private val listenerThread = new Thread("SparkListenerBus") {
        setDaemon(true)
        override def run(): Unit = Utils.logUncaughtExeceptions {
            while(true) {
                eventLock.acquire()
                // Atomically remove and process this event
                LiveListenerBus.this.synchronized {
                    val event = eventQueue.poll
                    if (event == Spark.ListenerShutdown) {
                        // Get out of the while loop and shutdown the daemon thread return
                    }
                    Option(event).foreach(postToAll)
                }
            }
        }
    }
    

    def start() {
        if (started) {
            throw new IllegalStateException("Listener bus already started!")
        }
        listenerThread.start()
        started = true
        }
    def post(event: SparkListenerEvent) {
        val eventAdded = eventQueue.offer(event)
        if (eventAdded) {
            eventLock.release()
        } else {
            logQueueFullErrorMessage()
        }
    }

    def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }
    
    def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty}

    def stop() {
        if (!started) {
            throw new IllegalStateException("Listener bus already started!")
        }
        listenerThread.start()
        started = true
        }
    def post (event: SparkListenerEvent) {
        val eventAdded = eventQueue.offer(event)
        if (eventAdded) {
            eventLock.release()
        } else {
            logQueueFullErrorMessage()
        }
    }
    
    def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }

    def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }

    def stop() {
        if (!started) {
            throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
        }
        post(SparkListenerShutdown)
        listenerThread.join()
    }

LiveListenerBus中调用的postToAll方法实际定义在父类SparkListenerBus中,代码如下:

protected val sparkListeners = new ArrayBuffer[SparkListener]
    with mutable.SynchronizedBuffer[SparkListener]

def addListener(listener; SparkListener) {
    sparkListeners += listener
}

def postToAll(event: SparkListenerEvent) {
    event match {
        case stageSubmitted: SparkListenerStageSubmitted => 
            foreachListener(_.onStageSubmitted(stageSubmitted))
        case stageCompleted: SparkListenerStageCompleted => 
            foreachListener(_.onStageCompleted(stageCompleted))
        case jobStart: SparkListenerJobStart =>
            foreachListener(_.onJobStart(jobStart)) 
        case jobEnd: SparkListenerJobEnd => 
            foreachListener(_.onJobEnd(jobEnd)) 
        case taskStart: SparkListenerTaskStart => 
            foreachListener(_.onTaskStart(taskStart))
        case taskGettingResult: SparkListenerTaskGettingResult => 
            foreachListener(_.onTaskGettingResult(taskGettingResult)) 
        case taskEnd: SparkListenerTaskEnd => 
            foreachListener(_.onTaskEnd (taskEnd)) 
        case environmentUpdate: SparkListenerEnvironmentUpdate => 
            foreachListener(_.onEnvironmentUpdate(environmentUpdate)) 
        case blockManagerAdded: SparkListenerBlockManagerAdded => 
            foreachListener(_.onBlockManagerAdded(blockManagerAdded)) 
        case blockManagerRemoved: SparkListenerBlockManagerRemoved => 
            foreachListener(_.onBlockManagerRemoved(blockManagerRemoved)) 
        case unpersistRDD: SparkListenerUnpersistROD =>
            foreachListener(_.onUnpersistRDD(unpersistRDD)) 
        case applicationStart: SparkListenerApplicationStart => 
            foreachListener(_.onApplicationStart(applicationStart))
        case applicationEnd: SparkListenerApplicationEnd => 
            foreachListener(_.onApplicationEnd(applica七ionEnd))
        case metricsUpdate: SparkListener:ExecutorMetricsUpdate => 
            foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))             
        case SparkListenerShutdown =>
    }
}

private def foreachListener(f: SparkListener => Unit): Unit = {
    sparkListeners.foreach { listener =>
        try{
            f(listener)
        } catch {
            case e: Exception =>
            logError(s"Listener ${Utils.getFormattedClassName(Listener)} threw an                     
                exception", e)
        }
    }
}

2.构造JobProgressListener

JobProgressListener是SparkContext中重要的组成部分,通过监听listenerBus中的事件更新任务进度。创建JobProgressListener的代码如下:

private[spark] val jobProgressListener = new JobProgressListener(conf)
listenerBus.addListener(jobProgressListener)

val statusTracker = new SparkStatusTracker(this)

JobProgressListener的作用是通过HashMap、ListBuffer等数据结构存储JobId及对应的JobUIdata信息,并按照激活、完成、失败等job状态统计。对于StageId、StageInfo等信息按照激活、完成、忽略、失败等Stage状态统计,并且存储StageId与JobId的一对多关系。JobProgressListener的数据结构代码如下:

class JobProgressListener(conf: SparkConf) extends SparkListener with Logging{
    
    import JobProgressListener._
        
    type JobId = Int
    type StageId = Int
    type StageAttemptId = Int
    type PoolName = String
    type ExecutorId = String
    
    //Jobs: 
    val activeJobs = new HashMap[JobId, JobUIData]
    val completedJobs = ListBuffer[JobUIData]()
    val skippedStages = ListBuffer[StageInfo]()
    val failedStages = ListBuffer[StageInfo]()
    val stageIdToData = new HashMap[ (StageId, StageAttemptId), StageUiData]
    val stageIdToInfo = new HashMap[StageId, StageInfo]
    val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]]
    val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]] ()
    val numCompletedStages = 0
    val numFailedStages = 0
    
    //Misc:
    val executorIdToBlockManagerId = HashMap[executorId, BlockManagerId] ()
    def blockManagerIds = executorIdToBlockManagerId.values.toSeq
    
    var schedulingMode: Option[SchedulingMode] = None
    
    val retainedStages = conf.getint("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
    
    val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)

JobProgressListener实现了onJobStart、 onJobEnd、 onStageCompleted、 onStageSubmitted、 onTaskStart、 onTask.End等方法.

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-04 13:30:42  更:2021-12-04 13:32:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 10:37:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码