6.kafka的Log存储原理解析
从「kafka源码分析」kafka基础知识介绍 中,我们知道了kafka是如何实现批量发送消息的,但是发送的消息之后到了哪里,怎么存储,怎么读取我们还是一头雾水,这个章节我们从消息发送到Broker开始出发。
1.获取topic的etadata信息,检测topic是否可用 2.key、value序列化 3.获取partition的值(可以重写partition()方法,默认采用hash取余的方式获取) 4.达到batch.size大小,唤起sender线程去发送RecordBatch 5.发送RecordBatch
1.解读阶段
1.Producer发送ProduceRequest请求
i.produceRequest结构
{
"apiKey":0,
"type":"request",
"listeners":["zkBroker","broker"],
"name":"ProduceRequest",
"validVersions":"0-9",
"flexibleVersions":"9+",
"fields":[
{
"name":"TransactionalId",
"type":"string",
"versions":"3+",
"nullableVersions":"3+",
"default":"null",
"entityType":"transactionalId",
"about":"The transactional ID, or null if the producer is not transactional."
},
{
"name":"Acks",
"type":"int16",
"versions":"0+",
"about":"The number of acknowledgments the producer requires the leader to have received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader and -1 for the full ISR."
},
{
"name":"TimeoutMs",
"type":"int32",
"versions":"0+",
"about":"The timeout to await a response in miliseconds."
},
{
"name":"TopicData",
"type":"[]TopicProduceData",
"versions":"0+",
"about":"Each topic to produce to.",
"fields":[
{
"name":"Name",
"type":"string",
"versions":"0+",
"entityType":"topicName",
"mapKey":true,
"about":"The topic name."
},
{
"name":"PartitionData",
"type":"[]PartitionProduceData",
"versions":"0+",
"about":"Each partition to produce to.",
"fields":[
{
"name":"Index",
"type":"int32",
"versions":"0+",
"about":"The partition index."
},
{
"name":"Records",
"type":"records",
"versions":"0+",
"nullableVersions":"0+",
"about":"The record data to be produced."
}
]
}
]
}
]
}
根据需要发送的内容构建请求格式
ii.构建ProduceRequest

我们可以看到真正要发送的内容是在 TopicData.PartitionData.Records里,这里的处理其实很简单。
2.broker收到Produce请求

我们看一下服务端收到请求后,是怎么处理的
i.处理ProduceRequest请求
def handleProduceRequest(request: RequestChannel.Request): Unit = {
val produceRequest = request.body[ProduceRequest]
val requestSize = request.sizeInBytes
val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()
val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC,
produceRequest.data().topicData().asScala)(_.name())
produceRequest.data.topicData.forEach(topic => topic.partitionData.forEach { partition =>
val topicPartition = new TopicPartition(topic.name, partition.index)
val memoryRecords = partition.records.asInstanceOf[MemoryRecords]
if (!authorizedTopics.contains(topicPartition.topic))
unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
try {
ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords)
authorizedRequestInfo += (topicPartition -> memoryRecords)
} catch {
case e: ApiException =>
invalidRequestResponses += topicPartition -> new PartitionResponse(Errors.forException(e))
}
})
@nowarn("cat=deprecation")
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses
var errorInResponse = false
mergedResponseStatus.forKeyValue { (topicPartition, status) =>
if (status.error != Errors.NONE) {
errorInResponse = true
debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
request.header.correlationId,
request.header.clientId,
topicPartition,
status.error.exceptionName))
}
}
val timeMs = time.milliseconds()
val bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, requestSize, timeMs)
val requestThrottleTimeMs =
if (produceRequest.acks == 0) 0
else quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
if (maxThrottleTimeMs > 0) {
request.apiThrottleTimeMs = maxThrottleTimeMs
if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
quotas.produce.throttle(request, bandwidthThrottleTimeMs, requestChannel.sendResponse)
} else {
quotas.request.throttle(request, requestThrottleTimeMs, requestChannel.sendResponse)
}
}
if (produceRequest.acks == 0) {
if (errorInResponse) {
val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
topicPartition -> status.error.exceptionName
}.mkString(", ")
info(
s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
s"from client id ${request.header.clientId} with ack=0\n" +
s"Topic and partition to exceptions: $exceptionsSummary"
)
closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
} else {
sendNoOpResponseExemptThrottle(request)
}
} else {
sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None)
}
}
def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
processingStats.forKeyValue { (tp, info) =>
updateRecordConversionStats(request, tp, info)
}
}
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
replicaManager.appendRecords(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
origin = AppendOrigin.Client,
entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback,
recordConversionStatsCallback = processingStatsCallback)
produceRequest.clearPartitionRecords()
}
}
以上内容看不懂没有关系,大概内容就是校验
1.是否有topic权限
2.版本apiVersion是否>=3,并且Record是Records子类,magic==2,apiVersion<7 日志压缩方式不能是ZSTD
如果条件满足,这里会调用replicaManager#appendRecords()方法。
ii.追加日志到文件管道
replicaManager.appendRecords(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
origin = AppendOrigin.Client,
entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback,
recordConversionStatsCallback = processingStatsCallback)

现在是不是觉得可以开始进入正题了呢?


minIsr来自配置min.insync.replicas ,如果想要了解kafka最新配置脑图可以查看kafka配置脑图章节
这里有一点需要注意的是:当Producer将ack设置为-1,min.insync.replicas指定了被认为写入成功的最小副本数时,min.insync.replicas和acks允许最小值的acks表示成功,这可以提高效率以及可靠性
下面开始append到segment文件中



这里我们看到了kafka写入到主存利用的是NIO的channel(FileChannelImpl)把ByteBuffer写入到内存中
iii.管道内容刷盘


此时,所有channel内的内容都刷到磁盘里了,这时才是真正的持久化到磁盘。
这里有一点要注意,log的flush是利用FileChannelImpl实现的,index相关的是利用mmap(内存映射)实现的

2.原理阶段
kafka的Message是以topic为基本单位组织的,不同的topic之间是相互独立的,每个topic分成多个partiton,每个peitition利用副本机制实现高可用,每个partition存储一部分Message,每个partition的底层存储是多个segment,如下图所示:

1.segment结构
每个partition都会有多个segment结构,每个segment都包含log,index,timeindex三个文件

- .log存储消息
- .index存储消息的索引
- .timeIndex,时间索引文件,通过时间戳做索引
每一个topic都会有这三个文件,1958的意思是offset从1958开始。这样设计查找会很方便根据log文件的内容
i.log文件结构

ii.查看log内容
我们可以通过指令kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-log/#{topic}/000000000000000000.log --print-data-log

iii.查看index内容

2.如何读取到某个message日志

1.index文件里存储的是offset的区间范围,首先根据offset找到确定索引文件
2.索引文件中根据相对的offset找到对应的最近的索引记录,这个记录中有position(message的物理地址)
3.找到offset最近的log文件找到position与步骤2相同的position,顺序的往下扫描文件,找到offset=xxx的消息内容
注:索引文件是采用稀疏索引的方式,即加快了消息查找的速度,也顾及了存储的开销。
|