2021SC@SDUSC
前言
上一篇博客讲到了receiverTracker和inputInfoTracker,接下来继续讲解
DStreamGraph
先看ssc.graph.clearMetadata(time)代码
def clearMetadata(time: Time) {
logDebug("Clearing metadata for time " + time)
this.synchronized {
outputStreams.foreach(_.clearMetadata(time))
}
logDebug("Cleared old metadata for time " + time)
}
其中清理了所有OutputDStream的一些相关元数据。 DStream.clearMetadata会清理掉当前DStream的rememberDuration之前的元数据。DStream的子类会覆写此方法。rememberDuration是DStream的成员。 spark.streaming.unpersist(默认是true)的配置可以用来设置是否自动非持久化。这可以显著的减少Spark在RDD上的内存使用,同时也可以改善GC行为。 rememberDuration在启动DStreamGraph时被设置,可参考DStream.initialize,这里不做更细的分析了。源码中的设置大体是sildeDuration,如果设置了checkpointDuration则是2*checkpointDuration.还可以通过DStreamGraph.rememberDuration设置:如果想自行设置,可以在应用程序中使用StreamingContext.remember方法,不过自行设置的值要大于内部计算得到的值时才会生效。 另外,后面的DStream会调整前面的DStream的rememberDuration,例如,如果用了窗口操作,会有跨若干Batch Duration的情况,则在此之前的DStream的rememberDuration都需要加上windowDuration。 最后把依赖的RDD也清理掉了。这是递归调用。
下面分析其中调用的BlockRDD.removeBlocks:
private[spark] def removeBlocks() {
blockIds.foreach { blockId =>
sparkContext.env.blockManager.master.removeBlock(blockId)
}
_isValid = false
}
利用BlockManagerMaster删除当前RDD相关的所有Block。 如果需要设置检查点,则发送DoCheckpoint消息。 DoCheckpoint消息的发送则暂时不做分析,下面分析不需要设置检查单的情况:
先看jobScheduler.receiverTracker.cleanupOldBlocksAndBatches。
def cleanupOldBlocksAndBatches(cleanupThreshTime: Time) {
receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false)
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logInfo (s"Cleanup old received batch data: $cleanupThreshTime")
synchronized {
if (isTrackerStarted) {
endpoint.send(CleanupOldBlocks(cleanpuThreshTime))
}
}
}
ReceiverTracker.cleanupOldBlocksAndBatches调用了ReceivedBlockTracker.cleanupOldBatches:
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.miliseconds < clock.getTimeMolis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo("Deleting batches " + timesToCleanup)
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks --= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
} else {
logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
}
}
cleanupOldBatches清理了旧Batch的Block元数据。 最后回到JobGenerator.clearMetadata,看一看jobScheduler.inputInfoTracker.cleanup:
def cleanup(batchThreshTime: Time): Unit = synchronized {
val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)
logInfo)s"remove old batch metadata: ${timesToCleanup.mkString("")}")
batchTimeToInputInfos --= timesToCleanup
}
cleanup清理了旧Batch的跟踪输入源的元数据信息。
|