图片来源
三个重点:
- TaskScheduler,如何注册的Application、Executor如何反注册
- DAGScheduler
- SparkUI
TaskScheduler
SparkContext.createTaskScheduler返回一个tuple。 追踪进createTaskScheduler: 可以看到三个形参,其中master是启动模式,针对不同的master会match不同的启动逻辑 看一下standalone模式 可以看到,首先创建了TaskSchedulerImpl 追踪进入,看到TaskSchedulerImpl的描述: 翻译一下: 通过 SchedulerBackend 为多种类型的集群安排任务。它还可以通过使用 [[LocalSchedulerBackend]] 并将 isLocal 设置为 true 来处理本地设置。它处理通用逻辑,例如确定跨作业的调度顺序、唤醒以启动推测性任务等。 客户端应该首先调用 initialize() 和 start(),然后通过 runTasks 方法提交任务集。
OK,源码先不看,看看创建完TaskSchedulerImpl后干了啥
创建了StacnaloneSchedulerBackend
完了后scheduler调用initialize方法
追踪进入initialize方法
def initialize(backend: SchedulerBackend) {
this.backend = backend
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
}
}
schedulableBuilder.buildPools()
}
可以看到,进来就创建了一个pool,会根据具体的调度算法来创建。
至此,createTaskScheduler就完了
完了之后看TaskSchedulerImpl的start()方法
进来就调用了backend.start()方法
进来backend的start方法
这里standalone模式的backend是StandaloneSchedulerBackend
override def start() {
super.start()
launcherBackend.connect()
val driverUrl = RpcEndpointAddress(
sc.conf.get("spark.driver.host"),
sc.conf.get("spark.driver.port").toInt,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val testingClassPath =
if (sys.props.contains("spark.testing")) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
}
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
val initialExecutorLimit =
if (Utils.isDynamicAllocationEnabled(conf)) {
Some(0)
} else {
None
}
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
进来首先是参数准备,然后一个重要方法
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
这是创建了application的描述 进入ApplicationDescription看看
private[spark] case class ApplicationDescription(
name: String,
maxCores: Option[Int],
memoryPerExecutorMB: Int,
command: Command,
appUiUrl: String,
eventLogDir: Option[URI] = None,
eventLogCodec: Option[String] = None,
coresPerExecutor: Option[Int] = None,
initialExecutorLimit: Option[Int] = None,
user: String = System.getProperty("user.name", "<unknown>")) {
override def toString: String = "ApplicationDescription(" + name + ")"
}
主要就是描述了当前application的一些情况,比如名字、内存、分配cores等等
完了之后创建AppClient
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
进去看看 一段描述
允许应用程序与 Spark 独立集群管理器对话的接口。获取集群事件的主 URL、应用描述和侦听器,并在发生各种事件时回调侦听器。(翻译的)
就是说它的个接口,用来个spark集群通信的,它能接收master 的url、还有applicationDescription、还有一个集群事件的监听器,以及各种事件发生时监听器的回调函数 说白了就是个application和集群之间负责通信的组件
进入client.start()中看看
只需启动一个 rpcEndpoint;它将回调到侦听器
其中的ClientEndpoint,进去看看 它是AppClient的一个内部类 其中有两个方法
-
tryRegisterAllMasters():异步注册所有 master 并返回一个数组“Future”以进行取消 -
registerWithMaster:异步注册所有的master。它将每隔 REGISTRATION_TIMEOUT_SECONDS 秒调用 registerWithMaster ,直到超过 REGISTRATION_RETRIES 次。一旦成功连接到 master,所有调度工作和 Futures 将被取消。 nthRetry 表示这是第 n 次尝试向 master 注册。
至此,代码逻辑和开头的图就对应上了。
DAGScheduler
回到SparkContext的createTaskScheduler中,在TashScheduler启动完了,会紧接着启动DAGScheduler 看到其中的DAGScheduler
进来看看
一大段描述 翻译一下:
实现面向stage调度的高级调度层。它为每个作业计算stage的 DAG,跟踪实现了哪些 RDD 和stage输出,并找到运行作业的最小时间表。然后它将stage作为taskset提交给在集群上运行它们的底层 TaskScheduler 实现。 TaskSet 包含完全独立的任务,可以根据集群上已有的数据立即运行(例如,前一阶段的地图输出文件),但如果这些数据不可用,它可能会失败。 Spark stage是通过在 shuffle 边界打破 RDD 图来创建的。具有“窄”依赖关系的 RDD 操作,如 map() 和 filter(),在每个阶段都被流水线化为一组任务,但具有 shuffle 依赖关系的操作需要多个阶段(一个阶段编写一组映射输出文件,另一个阶段)在shuffle后读取这些文件)。最终,每个阶段将只对其他阶段有 shuffle 依赖,并且可能在其中计算多个操作。这些操作的实际流水线发生在各种 RDD(MappedRDD、FilteredRDD 等)的 RDD.compute() 函数中。 除了提出阶段的 DAG 之外,DAGScheduler 还根据当前缓存状态确定运行每个任务的首选位置,并将这些传递给低级 TaskScheduler。此外,它处理由于 shuffle 输出文件丢失而导致的失败,在这种情况下,可能需要重新提交旧阶段。不是由 shuffle 文件丢失引起的 stage 中的故障由 TaskScheduler 处理,它会在取消整个 stage 之前重试每个任务少量次。
至此,大概的内容差不多,具体细节等到具体情况分析,代码太多也不适合全看。
|