1、定时任务入口
这里选择kraft的模式启动的定时任务,所以入口是在BrokerServer.scala 文件中,如果选择ZooKeeper模式的入口在KafkaServer.scala
def startup(): Unit = {
if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
try {
info("Starting broker")
,
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()
logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time,
brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true)
} catch {
case e: Throwable =>
maybeChangeStatus(STARTING, STARTED)
fatal("Fatal error during broker startup. Prepare to shutdown", e)
shutdown()
throw e
}
}
2、LopManager(这个是日志抽象层,实际逻辑不在这里)
object LogManager {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000
def apply(config: KafkaConfig,
initialOfflineDirs: Seq[String],
configRepository: ConfigRepository,
kafkaScheduler: KafkaScheduler,
time: Time,
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel,
keepPartitionMetadataFile: Boolean): LogManager = {
val defaultProps = LogConfig.extractLogConfigMap(config)
LogConfig.validateValues(defaultProps)
val defaultLogConfig = LogConfig(defaultProps)
val cleanerConfig = LogCleaner.cleanerConfig(config)
new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
configRepository = configRepository,
initialDefaultConfig = defaultLogConfig,
cleanerConfig = cleanerConfig,
recoveryThreadsPerDataDir = config.numRecoveryThreadsPerDataDir,
flushCheckMs = config.logFlushSchedulerIntervalMs,
flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
maxPidExpirationMs = config.transactionalIdExpirationMs,
scheduler = kafkaScheduler,
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel,
time = time,
keepPartitionMetadataFile = keepPartitionMetadataFile,
interBrokerProtocolVersion = config.interBrokerProtocolVersion)
}
}
上面的new LopManager 是下面的
(1) 把日志清理加入定时任务中
@threadsafe
class LogManager(logDirs: Seq[File],
initialOfflineDirs: Seq[File],
configRepository: ConfigRepository,
val initialDefaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
recoveryThreadsPerDataDir: Int,
val flushCheckMs: Long,
val flushRecoveryOffsetCheckpointMs: Long,
val flushStartOffsetCheckpointMs: Long,
val retentionCheckMs: Long,
val maxPidExpirationMs: Int,
interBrokerProtocolVersion: ApiVersion,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel,
time: Time,
val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
import LogManager._
def startup(topicNames: Set[String]): Unit = {
val defaultConfig = currentDefaultConfig
startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
}
private[log] def startupWithConfigOverrides(defaultConfig: LogConfig, topicConfigOverrides: Map[String, LogConfig]): Unit = {
loadLogs(defaultConfig, topicConfigOverrides)
if (scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs _,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
}
if (cleanerConfig.enableCleaner) {
_cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
_cleaner.startup()
}
}
}
3、清理符合条件的日志
def cleanupLogs(): Unit = {
debug("Beginning log cleanup...")
var total = 0
val startMs = time.milliseconds
try {
deletableLogs.foreach {
case (topicPartition, log) =>
debug(s"Garbage collecting '${log.name}'")
total += log.deleteOldSegments()
val futureLog = futureLogs.get(topicPartition)
if (futureLog != null) {
debug(s"Garbage collecting future log '${futureLog.name}'")
total += futureLog.deleteOldSegments()
}
}
} finally {
if (cleaner != null) {
cleaner.resumeCleaning(deletableLogs.map(_._1))
}
}
debug(s"Log cleanup completed. $total files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds")
}
上面log.deleteOldSegments() 调用的是Log.scala的deleteOldSegments
def deleteOldSegments(): Int = {
if (config.delete) {
deleteLogStartOffsetBreachedSegments() +
deleteRetentionSizeBreachedSegments() +
deleteRetentionMsBreachedSegments()
} else {
deleteLogStartOffsetBreachedSegments()
}
}
private def deleteRetentionMsBreachedSegments(): Int = {
if (config.retentionMs < 0) return 0
val startMs = time.milliseconds
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
startMs - segment.largestTimestamp > config.retentionMs
}
deleteOldSegments(shouldDelete, RetentionMsBreach)
}
private def deleteRetentionSizeBreachedSegments(): Int = {
if (config.retentionSize < 0 || size < config.retentionSize) return 0
var diff = size - config.retentionSize
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
if (diff - segment.size >= 0) {
diff -= segment.size
true
} else {
false
}
}
deleteOldSegments(shouldDelete, RetentionSizeBreach)
}
private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean,
reason: SegmentDeletionReason): Int = {
lock synchronized {
val deletable = deletableSegments(predicate)
if (deletable.nonEmpty)
deleteSegments(deletable, reason)
else
0
}
}
看上面的代码就知道,如果你配置了按最大超时时间 和日志最大存储大小 ,那两个定时清理都会被执行
(1)deletableSegments(把需要删除的segment加入待删除的集合)
private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
if (segments.isEmpty) {
Seq.empty
} else {
val deletable = ArrayBuffer.empty[LogSegment]
val segmentsIterator = segments.values.iterator
var segmentOpt = nextOption(segmentsIterator)
while (segmentOpt.isDefined) {
val segment = segmentOpt.get
val nextSegmentOpt = nextOption(segmentsIterator)
val (upperBoundOffset: Long, isLastSegmentAndEmpty: Boolean) =
nextSegmentOpt.map {
nextSegment => (nextSegment.baseOffset, false)
}.getOrElse {
(logEndOffset, segment.size == 0)
}
if (highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt) && !isLastSegmentAndEmpty) {
deletable += segment
segmentOpt = nextSegmentOpt
} else {
segmentOpt = Option.empty
}
}
deletable
}
}
这里需要注意,在segment加入待删除集合之前,必须有isLastSegmentAndEmpty 的判断,代表的是如果此segment 是最后一个(出现的条件就是此segment 的大小长度还没超过segment.bytes ,默认是1G),所以此segment 就不会加入待删除的集合 注意: 通过上面的讲解的源码和说明,如果你每个分区的最后一个segment不超过segment.bytes ,那不会被日志清理清理掉
下面的每一个segment有什么,可以看Kafka的Log存储原理再析,下面是我复制过来的一张图
主要是index,log,timeindexs三种文件, 而index和log文件的映射是下面这种
(2) deleteSegments(对待删除的segment集合删除)
private def deleteSegments(deletable: Iterable[LogSegment], reason: SegmentDeletionReason): Int = {
maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
val numToDelete = deletable.size
if (numToDelete > 0) {
if (numberOfSegments == numToDelete)
roll()
lock synchronized {
checkIfMemoryMappedBufferClosed()
removeAndDeleteSegments(deletable, asyncDelete = true, reason)
maybeIncrementLogStartOffset(segments.firstSegment.get.baseOffset, SegmentDeletion)
}
}
numToDelete
}
}
def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
val start = time.hiResClockMs()
lock synchronized {
checkIfMemoryMappedBufferClosed()
val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset)
val logFile = Log.logFile(dir, newOffset)
if (segments.contains(newOffset)) {
if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
warn(s"Trying to roll a new log segment with start offset $newOffset " +
s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " +
s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," +
s" size of offset index: ${activeSegment.offsetIndex.entries}.")
removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true, LogRoll)
} else {
throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" +
s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
s"segment is ${segments.get(newOffset)}.")
}
} else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
throw new KafkaException(
s"Trying to roll a new log segment for topic partition $topicPartition with " +
s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment")
} else {
val offsetIdxFile = offsetIndexFile(dir, newOffset)
val timeIdxFile = timeIndexFile(dir, newOffset)
val txnIdxFile = transactionIndexFile(dir, newOffset)
for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
Files.delete(file.toPath)
}
segments.lastSegment.foreach(_.onBecomeInactiveSegment())
}
producerStateManager.updateMapEndOffset(newOffset)
producerStateManager.takeSnapshot()
val segment = LogSegment.open(dir,
baseOffset = newOffset,
config,
time = time,
initFileSize = config.initFileSize,
preallocate = config.preallocate)
addSegment(segment)
updateLogEndOffset(nextOffsetMetadata.messageOffset)
scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.")
segment
}
}
}
|