Kafka的sender线程核心发送方法就是sendProducerData
sendProducerData读源数据缓存
发送前需要获取消息的目的地broker节点和RecordAccumulator中要发送的消息,分为两步
step1 先拉取本地缓存的元信息 step2 accumulator.ready查看哪个队列可以发送
Cluster cluster = metadata.fetch();
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
sendProducerData调用accumulator.ready方法得到信息
readyNodes储存可以发送的节点
unknownLeaderTopics 存储未知leader的topic
exhausted 标记了线程池的内存不足等待队列是否有东西,里面是condition,内存池那篇讲过。内存都不够用了,赶紧标记起来后面优先安排。
遍历batches中的队列dq,从元数据找leader Node leader = cluster.leaderFor(part);
要查看dq里面的东西做判断了,锁上
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<ProducerBatch> deque = entry.getValue();
Node leader = cluster.leaderFor(part);
synchronized (deque) {
}
}
判断内容,leader是空,那得拉取服务端元数据看看了,unknownLeaderTopics加进去相应topics
dq拿出第一个batch后:
backingOff是重试相关,下面是true代表还在重试时间中,换句话说,还不能发送 boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
expired判断没重试dq等待时间超过lingerMs(设置的dq接受信息后等多久,不为0的话多等一会,可能有更多内容一起走) expired判断重试的dq等待是否超过预设
full判断dq的第一个batch是否满了,或者是dq中有新的batch
如何判断是否可发送呢,full,expired,exhausted,生产者 关了,flushInProgress(这块没明白)
可以发送的leader节点加入readyNodes
else分支中,batch是空,那就取所有batch为空dq的最小timeLeftMs,作为下一次查看是否准备好的时间
if (leader == null && !deque.isEmpty()) {
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
sendProducerData处理无信息的leader
accumulator.ready返回的信息封装了unknownLeaderTopics,如果里面确实有东西,就加到metadata里面
还记得之前的文章吗,metadata add后会把请求更新的标志设为true,metadata.requestUpdate(),这个方法更改了标志位但是并没更新
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
sendProducerData检查leader网络情况
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
client.ready方法筛选canSendRequest的node,后面关于网络的文章会展开
sendProducerData制作网络请求
首先让recordAccumulator 调用drain一下,把发往相同leader的batch放一起
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
sendProducerData 使用addToInflightBatches方法将batches放入在途批次
addToInflightBatches(Map<Integer, List> batches) 方法,batches 是map,k是leader,因此每个leader都有个自己的inflightBatchList
private void addToInflightBatches(List<ProducerBatch> batches) {
for (ProducerBatch batch : batches) {
List<ProducerBatch> inflightBatchList = inFlightBatches.get(batch.topicPartition);
if (inflightBatchList == null) {
inflightBatchList = new ArrayList<>();
inFlightBatches.put(batch.topicPartition, inflightBatchList);
}
inflightBatchList.add(batch);
}
}
最后是client.send(clientRequest, now)发送出去
|