IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark分析(十一)Spark Streaming运行流程详解(6) -> 正文阅读

[大数据]Spark分析(十一)Spark Streaming运行流程详解(6)

2021SC@SDUSC

前言

上一篇博客讲到了receiverTracker和inputInfoTracker,接下来继续讲解

DStreamGraph

先看ssc.graph.clearMetadata(time)代码

// DStreamGraph.clearMetadata

def clearMetadata(time: Time) {
	logDebug("Clearing metadata for time " + time)
	this.synchronized {
		outputStreams.foreach(_.clearMetadata(time))
	}
	logDebug("Cleared old metadata for time " + time)

}

其中清理了所有OutputDStream的一些相关元数据。
DStream.clearMetadata会清理掉当前DStream的rememberDuration之前的元数据。DStream的子类会覆写此方法。rememberDuration是DStream的成员。
spark.streaming.unpersist(默认是true)的配置可以用来设置是否自动非持久化。这可以显著的减少Spark在RDD上的内存使用,同时也可以改善GC行为。
rememberDuration在启动DStreamGraph时被设置,可参考DStream.initialize,这里不做更细的分析了。源码中的设置大体是sildeDuration,如果设置了checkpointDuration则是2*checkpointDuration.还可以通过DStreamGraph.rememberDuration设置:如果想自行设置,可以在应用程序中使用StreamingContext.remember方法,不过自行设置的值要大于内部计算得到的值时才会生效。
另外,后面的DStream会调整前面的DStream的rememberDuration,例如,如果用了窗口操作,会有跨若干Batch Duration的情况,则在此之前的DStream的rememberDuration都需要加上windowDuration。
最后把依赖的RDD也清理掉了。这是递归调用。

下面分析其中调用的BlockRDD.removeBlocks:

// BlockRDD.removeBlocks
/**
 * Remove the data blocks that this BlockRDD is made from. NOTE: This is an
 * irreversible operation, as the data in the blocks cannot be recovered back
 * once removed. Use it with caution.
 */
private[spark] def removeBlocks() {
	blockIds.foreach { blockId =>
		sparkContext.env.blockManager.master.removeBlock(blockId)
	}
	 
	_isValid = false

}

利用BlockManagerMaster删除当前RDD相关的所有Block。
如果需要设置检查点,则发送DoCheckpoint消息。
DoCheckpoint消息的发送则暂时不做分析,下面分析不需要设置检查单的情况:

先看jobScheduler.receiverTracker.cleanupOldBlocksAndBatches。

// ReceiverTracker.cleanupOldBlocksAndBatches

/**
 * Clean up the data and metadata of blocks and batches that are strictly
 * older than the threshold time. Note that this does not
 */

def cleanupOldBlocksAndBatches(cleanupThreshTime: Time) {
	// Clean up old block and batch metadata
	receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false)


	// Signal the receivers to delete old block data
	if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
		logInfo (s"Cleanup old received batch data: $cleanupThreshTime")
		synchronized {
			if (isTrackerStarted) {
				endpoint.send(CleanupOldBlocks(cleanpuThreshTime))
				}
				
			}
			
		}
		

ReceiverTracker.cleanupOldBlocksAndBatches调用了ReceivedBlockTracker.cleanupOldBatches:

// ReceivedBlockTracker.cleanupOldBatches
/**
 * Clean up block information of old batches. If waitForCompletion is true, this method
 * returns only after the files are cleaned up.
 */
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
	require(cleanupThreshTime.miliseconds < clock.getTimeMolis())
	val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
	logInfo("Deleting batches " + timesToCleanup)
	if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
		timeToAllocatedBlocks --= timesToCleanup
		writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
	} else {
		logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
	}

}

cleanupOldBatches清理了旧Batch的Block元数据。
最后回到JobGenerator.clearMetadata,看一看jobScheduler.inputInfoTracker.cleanup:

// inputInfoTracker.cleanup
def cleanup(batchThreshTime: Time): Unit = synchronized {
	val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)
	logInfo)s"remove old batch metadata: ${timesToCleanup.mkString("")}")
	batchTimeToInputInfos --= timesToCleanup

}

cleanup清理了旧Batch的跟踪输入源的元数据信息。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-10 11:07:42  更:2021-12-10 11:08:09 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 7:53:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码