KafkaApis模块是Kafka中负责不同业务请求的具体实现逻辑,本文主要讲一下KafkaApis处理ProducerRequest请求的流程。
当生产者发送消息保存至kafka集群或者高级消费者发送偏移量保存至kafka集群时,都会发生此类请求,对于后者可以看作是一类特殊的消息,里面保存的是特定topic的偏移量。当Broker Server接收到ProducerRequest时,主要完成两个操作:1)持久化消息,2)组装响应格式。其详细的实现过程如下:
def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
val (produceRequest, offsetCommitRequestOpt) =
//如果是高级消费者发送的,则需要包装成ProduceRequest
if (request.requestId == RequestKeys.OffsetCommitKey) {
val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
OffsetCommitRequest.changeInvalidTimeToCurrentTime(offsetCommitRequest)
(producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
} else {
(request.requestObj.asInstanceOf[ProducerRequest], None)
}
//检验requiredAcks的有效性,ack只能是1,0,-1
if (produceRequest.requiredAcks > 1 || produceRequest.requiredAcks < -1) {
warn(("Client %s from %s sent a produce request with request.required.acks of %d, which is now deprecated and will " +
"be removed in next release. Valid values are -1, 0 or 1. Please consult Kafka documentation for supported " +
"and recommended configuration.").format(produceRequest.clientId, request.remoteAddress, produceRequest.requiredAcks))
}
//将消息持久化到本地
val sTime = SystemTime.milliseconds
val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)
//过滤异常错误
val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)
//统计出现错误的partition个数
val numPartitionsInError = localProduceResults.count(_.error.isDefined)
if(produceRequest.requiredAcks == 0) {
if (numPartitionsInError != 0) {
//当ack=0时,如果出现持久化消息异常,则主动关闭连接
info(("Send the close connection response due to error handling produce request " +
"[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
.format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
requestChannel.closeConnection(request.processor, request)
} else {
if (firstErrorCode == ErrorMapping.NoError)
//没有任何错误,则将高级消费者发送的偏移量保存至offsetManager的内存中
offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo))
if (offsetCommitRequestOpt.isDefined) {
//如果是高级消费者发送的,即使ack=0,也需要将消息的持久化结果返回给消费者
val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else
/*
ack=0,表示客户端不关心服务端的具体执行情况,只关心服务器是否接收到请求,因此不需要返
回详细的执行结果
*/
requestChannel.noOperation(request.processor, request)
}
}
else if (produceRequest.requiredAcks == 1 ||
produceRequest.numPartitions <= 0 ||
numPartitionsInError == produceRequest.numPartitions) {
//ack=1或者目标分区个数无效或者持久化全部失败的情况下,需要返回具体的执行结果
if (firstErrorCode == ErrorMapping.NoError) {
//没有任何错误,则将高级消费者发送的偏移量保存在offsetManager的内存中。
offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
}
//将具体的请求执行结果返回给客户端
val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
.getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
else {
// ack=-1,需要等待(min.insync.replicas-1)个副本同步数据后才返回相应
val producerRequestKeys = produceRequest.data.keys.toSeq
val statuses = localProduceResults.map(r =>
r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
val delayedRequest = new DelayedProduce(
producerRequestKeys,
request,
produceRequest.ackTimeoutMs.toLong,
produceRequest,
statuses,
offsetCommitRequestOpt)
//判断请求是否满足返回的条件,如果满足就返回,否则延迟返回响应
val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
if (satisfiedByMe)
producerRequestPurgatory.respond(delayedRequest)
}
// we do not need the data anymore
produceRequest.emptyData()
}
其中,produceRequest.requiredAcks 即ack有3个可选值,分别是1,0,-1。
? ? 1)、ack=1,简单来说就是,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这里有一个地方需要注意,这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。
注意,ack的默认值就是1。这个默认值其实就是吞吐量与可靠性的一个折中方案。生产上我们可以根据实际情况进行调整,比如如果你要追求高吞吐量,bioj那么就要放弃可靠性。
? ? ?2)、ack=0,简单来说就是,producer不关心Broker Server持久化是否成功,只需要Broker接收到消息就可以了。
? ? 3)、ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
当ack=0,或者1的时候比较好理解,那么当ack=-1的时候,kafka是如何做得延时答复请求的呢?它采取的是一种叫做Purgatory的策略,下面介绍一下这种策略的实现原理。
DelayedProduce的定义如下:
class DelayedProduce(override val keys: Seq[TopicAndPartition],
override val request: RequestChannel.Request,
override val delayMs: Long,
val produce: ProducerRequest,
val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus],
val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None)
extends DelayedRequest(keys, request, delayMs) with Logging {
DelayedProduce内部有个变量keys,针对当前ProducerRequest中不同的TopicAndPartition对DelayedProduce进行分类。
我们看一下RequestPurgatory的定义
abstract class RequestPurgatory[T <: DelayedRequest](
brokerId: Int = 0,
purgeInterval: Int = 1000)
extends Logging with KafkaMetricsGroup {
//针对DelayedProduce内部不同的TopicAndPartition产生对应的watchers,watchers内部保存对应的
DelayedProduce
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
/* 检测超时请求线程,如果该请求超过一定时间没有达到满足条件,则无论如何都将请求的响应返回给客户端 */
private val expiredRequestReaper = new ExpiredRequestReaper
private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false)
RequestPurgatory内部不同的TopicAndPartition产生对应的watchers,watchers内部的requests保存相同的TopicAndPartition对应的DelayedProduce。它们的关系如下:?
接下来我们看一下什么时候会触发Broker Server回复DelayedProduce的响应,只有当该Broker Server上对应的partition的HighWaterMark发生变化时才会去检查是否满足条件。如果满足就返回,否则继续等待直到超时。判断是否满足的代码如下:
def isSatisfied(replicaManager: ReplicaManager) = {
partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) =>
// 判断每个分区的数据是否在等待其它Broker server来获取
if (fetchPartitionStatus.acksPending) {
//获取对应的partition对象
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
//判断该partition是否有足够的副本达到了指定的偏移量
val (hasEnough, errorCode) = partitionOpt match {
case Some(partition) =>
partition.checkEnoughReplicasReachOffset(
fetchPartitionStatus.requiredOffset,
produce.requiredAcks)
case None =>
(false, ErrorMapping.UnknownTopicOrPartitionCode)
}
//如果发生异常,则返回错误
if (errorCode != ErrorMapping.NoError) {
fetchPartitionStatus.acksPending = false
fetchPartitionStatus.responseStatus.error = errorCode
} else if (hasEnough) {
//当hasEnough为true,设置acksPending为false
fetchPartitionStatus.acksPending = false
fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError
}
}
}
// 只有该DelayedProduce所有的keys都满足的时候才认为满足
val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
satisfied
}
只有该DelayedProduce所有的keys对应的不同TopicAndPartition都满足的时候才认为该请求满足答复的条件,TopicAndPartition判断是否满足的代码如下:
def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
//获取ISR列表
val curInSyncReplicas = inSyncReplicas
//ISR中每个Replica的记录偏移量是否大于指定大小
val numAcks = curInSyncReplicas.count(r => {
if (!r.isLocal)
r.logEndOffset.messageOffset >= requiredOffset
else
true /* also count the local (leader) replica */
})
//获取min.insync.replicas
val minIsr = leaderReplica.log.get.config.minInSyncReplicas
//如果ack<0且highwatermark大于指定大小
if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
判断minIsr是否小于当前isr
if (minIsr <= curInSyncReplicas.size) {
(true, ErrorMapping.NoError)
} else {
(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
}
} else if (requiredAcks > 0 && numAcks >= requiredAcks) {
(true, ErrorMapping.NoError)
} else
(false, ErrorMapping.NoError)
case None =>
(false, ErrorMapping.NotLeaderForPartitionCode)
}
}
Partition主要判断Leader Replica的HighWaterMark是否大于指定大小,则说明ISR中所有的replica都Fetch到足够的数据了,并且ISR的大小是否大于等于指定的minIsr,如果满足就说明Broker Server能回复这些延时请求了。
|