如果有哪里描述的有误,敬请指出。谢谢~
前言
kafka消息从producer发送出去时并不是一条一条发送的,而是先发送到一个消息批次(RecordAccumulate)中,然后由sender线程异步的将消息批次中的消息发到broker。这也是kafka吞吐量高的主要原因之一。 那么问题来了,一条消息是怎么被发送到批次中的呢?
正文
RecordAccumulate结构
我们先来看数据结构  本次不涉及到的字段先忽略,消息批次中包含一个内存池和队列,这个队列就是实际存储消息的地方。我们可以通过下面这张图看一下消息批次的抽象结构。 
- 相同topic+partition的消息只会被发到相同的Deque中
- 每个Deque包含多个ProducerBatch,每个ProducerBatch可以存储多条消息
- 消息是按ProducerBatch的维度发送给broker的,并且是对头的ProducerBatch先出队
- 内存池默认为32M,每个内存块默认16kb
流程图
下图描述了一条消息被放入批次的整个过程。 
- 要把消息放入ProducerBatch,首先得找到对应的deque,如果没有就新建一个,对应代码如下:
 - 拿到deque后就判断队尾的ProducerBatch是否有剩余空间写入这条消息。如果是第一次进来,deque是空的,那么肯定是不满足的。对应代码如下:
 - 走到这里说明deque中没有可用的ProderBatch,此时需要新建。新建ProderBatch第一步就是向内存池申请内存
3.1 如果申请内存的大小和内存块大小相等并且有空闲内存块则直接返回一个空闲内存块,对应代码如下:  3.2 如果内存池有足够的可用内存,会直接取堆上申请内存,对应代码如下:  3.3 此时内存池可用内存不够,会阻塞等待占用的内存被释放,对应代码如图:  - 将申请到的内存和新建的ProducerBatch绑定并将消息写入,对应代码如下:
 - 此时消息已写入ProducerBatch,只需要将其入到deque即可,对应代码如下:

多说一句
CopyOnWriteMap
通过上面内容我们可以知道,一条消息首先通过topic+partition找到对应的Deque,然后追加到这个Deque队尾的ProducerBatch中。kafka用map存储了topic+partition -> Deque (ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches) 的映射逻辑,而在生产环境中肯定是多线程取发消息的,所以这个map必须要保证线程安全。 
kafka自定义了CopyOnWriteMap来实现了ConcurrentMap,重写了put方法。
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
private volatile Map<K, V> map;
@Override
public V get(Object k) {
return map.get(k);
}
@Override
public synchronized V put(K k, V v) {
Map<K, V> copy = new HashMap<K, V>(this.map);
V prev = copy.put(k, v);
this.map = Collections.unmodifiableMap(copy);
return prev;
}
.......
CopyOnWriteMap采用写时复制的想法,读不需要加锁,适用于读多写少的情况。而kafka只有当某个topic+partition下的第一条消息进行写入时才会写入数据,大部分情况都是读,符合读多写少的情况。
内存池
如果不采用内存池的方法,那么kafka在发消息时会频繁产生内存分配、内存释放,造成频繁GC,从而影响生产者性能。
|