IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka源码解析(5)消息发送 -> 正文阅读

[大数据]kafka源码解析(5)消息发送

Kafka的sender线程核心发送方法就是sendProducerData

sendProducerData读源数据缓存

发送前需要获取消息的目的地broker节点和RecordAccumulator中要发送的消息,分为两步

step1 先拉取本地缓存的元信息
step2 accumulator.ready查看哪个队列可以发送

		Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

sendProducerData调用accumulator.ready方法得到信息

readyNodes储存可以发送的节点

unknownLeaderTopics 存储未知leader的topic

exhausted 标记了线程池的内存不足等待队列是否有东西,里面是condition,内存池那篇讲过。内存都不够用了,赶紧标记起来后面优先安排。

遍历batches中的队列dq,从元数据找leader
Node leader = cluster.leaderFor(part);

要查看dq里面的东西做判断了,锁上

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        Set<String> unknownLeaderTopics = new HashSet<>();

        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<ProducerBatch> deque = entry.getValue();

            Node leader = cluster.leaderFor(part);
            synchronized (deque) {
                //做判断,下面展开
            }
        }

判断内容,leader是空,那得拉取服务端元数据看看了,unknownLeaderTopics加进去相应topics

dq拿出第一个batch后:

backingOff是重试相关,下面是true代表还在重试时间中,换句话说,还不能发送
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;

expired判断没重试dq等待时间超过lingerMs(设置的dq接受信息后等多久,不为0的话多等一会,可能有更多内容一起走)
expired判断重试的dq等待是否超过预设

full判断dq的第一个batch是否满了,或者是dq中有新的batch

如何判断是否可发送呢,full,expired,exhausted,生产者 关了,flushInProgress(这块没明白)

可以发送的leader节点加入readyNodes

else分支中,batch是空,那就取所有batch为空dq的最小timeLeftMs,作为下一次查看是否准备好的时间

				if (leader == null && !deque.isEmpty()) {
                    unknownLeaderTopics.add(part.topic());
                } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
                    ProducerBatch batch = deque.peekFirst();
                    if (batch != null) {
                        long waitedTimeMs = batch.waitedTimeMs(nowMs);
                        boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        boolean full = deque.size() > 1 || batch.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {
                            long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }

sendProducerData处理无信息的leader

accumulator.ready返回的信息封装了unknownLeaderTopics,如果里面确实有东西,就加到metadata里面

还记得之前的文章吗,metadata add后会把请求更新的标志设为true,metadata.requestUpdate(),这个方法更改了标志位但是并没更新

if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);

            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                result.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }

sendProducerData检查leader网络情况

		Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

client.ready方法筛选canSendRequest的node,后面关于网络的文章会展开

sendProducerData制作网络请求

首先让recordAccumulator 调用drain一下,把发往相同leader的batch放一起

		Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, 				this.maxRequestSize, now);

sendProducerData 使用addToInflightBatches方法将batches放入在途批次

addToInflightBatches(Map<Integer, List> batches) 方法,batches 是map,k是leader,因此每个leader都有个自己的inflightBatchList

private void addToInflightBatches(List<ProducerBatch> batches) {
        for (ProducerBatch batch : batches) {
            List<ProducerBatch> inflightBatchList = inFlightBatches.get(batch.topicPartition);
            if (inflightBatchList == null) {
                inflightBatchList = new ArrayList<>();
                inFlightBatches.put(batch.topicPartition, inflightBatchList);
            }
            inflightBatchList.add(batch);
        }
    }

最后是client.send(clientRequest, now)发送出去

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-20 15:51:25  更:2021-09-20 15:53:14 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 11:45:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码