IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark源码——注册机制 -> 正文阅读

[大数据]Spark源码——注册机制

worker向master注册

  • worker在启动之后,就会向master进行注册

  • 对于worker状态为DEAD的,过滤掉。在compeleRecover()中,master在一定时间期限已经完成了恢复,但是发现其中的worker为UNKNOWN的情况下,对worker进行remove,将worker的状态设置为DEAD,如果过了很长时间worker又莫名其妙的向master进行注册的情况下,直接过滤掉。

  • 对于worker状态为UNKNOWN,master会将旧的worker信息给清理掉,替换成新的worker信息。比如说master刚启动的时候,会向worker发送新的地址的时候,master会将该worker状态设置为unknown,worker向master返回注册信息的时候,master会将旧的worker信息给清理掉,替换成新的worker信息

  • 将worker加入内存缓存中(HashMap),用持久化引擎将worker信息持久化,可能是文件系统,可能是zookeeper

  • 调用schedule()方法进行调度

进去源码看看

找到register相关的代码

  private def registerWorker(worker: WorkerInfo): Boolean = {
    // There may be one or more refs to dead workers on this same node (w/ different ID's),
    // remove them.
    workers.filter { w =>
      (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
    }.foreach { w =>
      workers -= w
    } //过滤状态为DEAD的worker

    val workerAddress = worker.endpoint.address
    if (addressToWorker.contains(workerAddress)) {
    //如果worker地址存在,则说明该worker是oldworker
      val oldWorker = addressToWorker(workerAddress)
      if (oldWorker.state == WorkerState.UNKNOWN) {
        // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
        // The old worker must thus be dead, so we will remove it and accept the new worker.
        //如果worker状态为UNKNOWN,说明在恢复过程中被重启了,则该worker要被杀死
        removeWorker(oldWorker)
      } else {
        logInfo("Attempted to re-register worker at same address: " + workerAddress)
        return false
      }
    }

    workers += worker //将新的worker添加到hashset中
    idToWorker(worker.id) = worker
    addressToWorker(workerAddress) = worker//地址也添加
    if (reverseProxy) {
       webUi.addProxyTargets(worker.id, worker.webUiAddress)
    }
    true
  }

driver向master注册

  • 用spark-submit提交sparkApplication的时候,dirver首先就会向master进行注册,将driver信息放入到内存缓存中,也就是hashmap中
  • 加入等待调度队列,也就是ArrayBuffer
  • 用持久化引擎将driver信息持久化,可能是文件系统,可能是zookeeper
  • 调用schedule()方法进行调度
case RequestSubmitDriver(description) =>
      if (state != RecoveryState.ALIVE) {
      //如果状态不为Alive,则提交失败,只能提交到Alive的Master
        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
          "Can only accept driver submissions in ALIVE state."
        context.reply(SubmitDriverResponse(self, false, None, msg))
      } else {
        logInfo("Driver submitted " + description.command.mainClass)
        //调用createDriver方法,创建Driver
        val driver = createDriver(description)
        //使用持久化引擎将Driver持久化
       
        persistenceEngine.addDriver(driver)
       //加入内存结构
        waitingDrivers += driver
        drivers.add(driver)
		//调度
        schedule()

        // TODO: It might be good to instead have the submission client poll the master to determine
        //       the current status of the driver. For now it's simply "fire and forget".

		//成功提交
        context.reply(SubmitDriverResponse(self, true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}"))
      }


 private def createDriver(desc: DriverDescription): DriverInfo = {
    val now = System.currentTimeMillis()
    val date = new Date(now)
    new DriverInfo(now, newDriverId(date), desc, date)
  }

application向master进行注册(registerApplication()方法)

  • Driver启动好之后,会执行我们的application代码,执行sparkContext的初始化,底层的SparkDeploySchedulerBackend,会通过AppClient内部的线程,ClientEndPoint发送RegisterAppliction,到master进行Application进行注册。
  • 在这里插入图片描述
 /**
     *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
     */
    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self)) 
            //这里调用了RegisterApplication
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }

在Master中找到RegisterApplication这个东西

 case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
      //如果master不是alive,则不理这个application
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)
        //用ApplicationDescription信息,创建ApplicationInfo
        registerApplication(app)//注册application
        //将Application加入缓存,将Application加入等待调度的队列
        logInfo("Registered app " + description.name + " with ID " + app.id)
        //持久化
        persistenceEngine.addApplication(app)
        //反向,向SparkDeploySchedulerBackend的AppClient的ClientEndPoint发送消息,也就是registeredApplication,而不是registerApplication

        driver.send(RegisteredApplication(app.id, self))
        schedule()
      }
  • 将application信息放入到内存缓存中,也就是hashmap中
  • 将application加入等待的调度队列,也就是ArrayBuffer
  • 用持久化引擎将application信息持久化,可能是文件系统,可能是zookeeper

进到createApplication

private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
      ApplicationInfo = {
    val now = System.currentTimeMillis()
    val date = new Date(now)//创建一个时间戳
    val appId = newApplicationId(date)//创建ApplicaitonId,传入时间戳
    new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
  }

看看registerApplication

private def registerApplication(app: ApplicationInfo): Unit = {
    val appAddress = app.driver.address
    if (addressToApp.contains(appAddress)) {
    // 如果driver的地址存在的情况下,就直接返回,就相当于对driver进行重复注册
      logInfo("Attempted to re-register application at same address: " + appAddress)
      return
    }
	//将Application的信息加入到内存缓存中
    applicationMetricsSystem.registerSource(app.appSource)
    apps += app
    idToApp(app.id) = app
    endpointToApp(app.driver) = app
    addressToApp(appAddress) = app
     //将Application的信息加入到等待调度的队列中,调度的算法为FIFO
    waitingApps += app
    //private val waitingApps = new ArrayBuffer[ApplicationInfo]
    if (reverseProxy) {
      webUi.addProxyTargets(app.id, app.desc.appUiUrl)
    }
  }
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章           查看所有文章
加:2021-12-14 16:00:53  更:2021-12-14 16:03:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 7:30:44-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码