Spark Event Log Start
上一节已经看过操作Spaek Event Log的类是org.apache.spark.scheduler.EventLoggingListener,现在来分析一下start()方法
def start() {
if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.")
}
val workingPath = logPath + IN_PROGRESS
val path = new Path(workingPath)
val uri = path.toUri
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"
if (shouldOverwrite && fileSystem.delete(path, true)) {
logWarning(s"Event log $path already exists. Overwriting...")
}
val dstream =
if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
new FileOutputStream(uri.getPath)
} else {
hadoopDataStream = Some(fileSystem.create(path))
hadoopDataStream.get
}
try {
val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
val bstream = new BufferedOutputStream(cstream, outputBufferSize)
EventLoggingListener.initEventLog(bstream, testing, loggedEvents)
fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
writer = Some(new PrintWriter(bstream))
logInfo("Logging events to %s".format(logPath))
} catch {
case e: Exception =>
dstream.close()
throw e
}
}
接着到EventLoggingListener.initEventLog(bstream, testing, loggedEvents)看看怎么初始化的。
def initEventLog(
logStream: OutputStream,
testing: Boolean,
loggedEvents: ArrayBuffer[JValue]): Unit = {
val metadata = SparkListenerLogStart(SPARK_VERSION)
val eventJson = JsonProtocol.logStartToJson(metadata)
val metadataJson = compact(eventJson) + "\n"
logStream.write(metadataJson.getBytes(StandardCharsets.UTF_8))
if (testing && loggedEvents != null) {
loggedEvents += eventJson
}
}
看一下val eventJson = JsonProtocol.logStartToJson(metadata)
def logStartToJson(logStart: SparkListenerLogStart): JValue = {
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.logStart) ~
("Spark Version" -> SPARK_VERSION)
}
对比一下上一篇Event Log的第一行:
{"Event":"SparkListenerLogStart","Spark Version":"2.4.4"}
是不是很熟悉,能对的上吧。 那么问题来了,start()方法是什么时候被调用的?
什么时候调用Start方法
明天再续
|