IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 图解kafka消息入批次RecordAccumulate过程 -> 正文阅读

[大数据]图解kafka消息入批次RecordAccumulate过程

如果有哪里描述的有误,敬请指出。谢谢~

前言

kafka消息从producer发送出去时并不是一条一条发送的,而是先发送到一个消息批次(RecordAccumulate)中,然后由sender线程异步的将消息批次中的消息发到broker。这也是kafka吞吐量高的主要原因之一。
那么问题来了,一条消息是怎么被发送到批次中的呢?

正文

RecordAccumulate结构

我们先来看数据结构
在这里插入图片描述
本次不涉及到的字段先忽略,消息批次中包含一个内存池和队列,这个队列就是实际存储消息的地方。我们可以通过下面这张图看一下消息批次的抽象结构。
在这里插入图片描述

  • 相同topic+partition的消息只会被发到相同的Deque中
  • 每个Deque包含多个ProducerBatch,每个ProducerBatch可以存储多条消息
  • 消息是按ProducerBatch的维度发送给broker的,并且是对头的ProducerBatch先出队
  • 内存池默认为32M,每个内存块默认16kb

流程图

下图描述了一条消息被放入批次的整个过程。
在这里插入图片描述

  1. 要把消息放入ProducerBatch,首先得找到对应的deque,如果没有就新建一个,对应代码如下:
    在这里插入图片描述
  2. 拿到deque后就判断队尾的ProducerBatch是否有剩余空间写入这条消息。如果是第一次进来,deque是空的,那么肯定是不满足的。对应代码如下:
    在这里插入图片描述
  3. 走到这里说明deque中没有可用的ProderBatch,此时需要新建。新建ProderBatch第一步就是向内存池申请内存
    3.1 如果申请内存的大小和内存块大小相等并且有空闲内存块则直接返回一个空闲内存块,对应代码如下:
    在这里插入图片描述
    3.2 如果内存池有足够的可用内存,会直接取堆上申请内存,对应代码如下:
    在这里插入图片描述
    3.3 此时内存池可用内存不够,会阻塞等待占用的内存被释放,对应代码如图:
    在这里插入图片描述
  4. 将申请到的内存和新建的ProducerBatch绑定并将消息写入,对应代码如下:
    在这里插入图片描述
  5. 此时消息已写入ProducerBatch,只需要将其入到deque即可,对应代码如下:
    在这里插入图片描述

多说一句

CopyOnWriteMap

通过上面内容我们可以知道,一条消息首先通过topic+partition找到对应的Deque,然后追加到这个Deque队尾的ProducerBatch中。kafka用map存储了topic+partition -> Deque (ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches)的映射逻辑,而在生产环境中肯定是多线程取发消息的,所以这个map必须要保证线程安全。
在这里插入图片描述

kafka自定义了CopyOnWriteMap来实现了ConcurrentMap,重写了put方法。

//基于读写分离,适合读多写少的场景,不保证实时性,只保证最终一致性
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
    //map是volatile的,具有线程可见性
    private volatile Map<K, V> map;

    @Override
    //不加锁保证高并发
    public V get(Object k) {
        return map.get(k);
    }
   
    @Override
    //加锁保证线程安全
    public synchronized V put(K k, V v) {
        Map<K, V> copy = new HashMap<K, V>(this.map);
        V prev = copy.put(k, v);
        //map是volatile的,具有线程可见性
        this.map = Collections.unmodifiableMap(copy);
        return prev;
    }
    .......

CopyOnWriteMap采用写时复制的想法,读不需要加锁,适用于读多写少的情况。而kafka只有当某个topic+partition下的第一条消息进行写入时才会写入数据,大部分情况都是读,符合读多写少的情况。

内存池

如果不采用内存池的方法,那么kafka在发消息时会频繁产生内存分配、内存释放,造成频繁GC,从而影响生产者性能。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-15 23:47:47  更:2021-07-15 23:47:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/8 1:07:43-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码