第1章 简介
本篇文章从源码的角度,介绍Kafka生产者如何封装消息,细节详见代码中注释。
第2章 详细步骤
2.1 消息大小的校验
在封装前会先进行数据大小的校验
org.apache.kafka.clients.producer.KafkaProducer#doSend
//TODO 校验消息大小
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
org.apache.kafka.clients.producer.KafkaProducer#ensureValidRecordSize
private void ensureValidRecordSize(int size) {
// 一条消息的最大size 默认1M
if (size > maxRequestSize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " +
ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
// 一条消息的总内存size 默认32M
if (size > totalMemorySize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the total memory buffer you have configured with the " +
ProducerConfig.BUFFER_MEMORY_CONFIG +
" configuration.");
}
2.2?消息封装
2.2.1 消息封装整体流程
org.apache.kafka.clients.producer.KafkaProducer#doSend
//TODO 把消息封装入RecordAccumulator,内部是多个Deque<ProducerBatch>队列,队列中再分批次;内部定义Bufferpool管理内存
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
其中accumulator是在构造函数中已经初始化的对象。
接下来我们进入RecordAccumulator对象,具体看看是如何封装的。
在org.apache.kafka.clients.producer.internals.RecordAccumulator#append方法中进行封装,这里可以看出整体的流程:
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// check if we have an in-progress batch
// TODO 根据Partition分区结果,获取到应该加入的队列(双端队列),一个Partition分区对应一个队列
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
// TODO 尝试往队列里面添加数据,如果未申请内存,这里appendResult==null
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true);
}
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
/**
* TODO 获得消息的大小,默认配置大小和实际数据大小取最大值
* 注意:生成环境要注意发送数据的大小,如果发送的数据过大,而配置的大小只能包含一条消息,则相当于每条消息发送一次,
* 就会打破kafka生产者批次发送的设计,从而造成过多的网络IO。
*/
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
// TODO 根据计算的消息大小,进行内存分配
buffer = free.allocate(size, maxTimeToBlock);
// Update the current time in case the buffer allocation blocked above.
nowMs = time.milliseconds();
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");
// TODO 尝试往队列里面添加数据,如果未创建batch,这里appendResult==null
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
// TODO 根据申请的内存大小,封装batch
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
// TODO 尝试往batch里添加数据
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, nowMs));
// TODO 往队列尾端添加batch
dq.addLast(batch);
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
// TODO batch在使用,设置buffer为null,防止在finally里中释放内存
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
// buffer!=null,没有新创建batch,则可以释放掉本次申请的内存
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
接下来从这个整体流程中,找重点的方法进行分解。
2.2.2?getOrCreateDeque获取队列
org.apache.kafka.clients.producer.internals.RecordAccumulator#getOrCreateDeque
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
// TODO 从batches的数据结构 ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,可以看出是一个Parition分区对应一个Deque队列,并且是线程安全的
// TODO 注意这是一个高频的操作!!
Deque<ProducerBatch> d = this.batches.get(tp);
if (d != null)
return d;
// TODO 如果没有该partition对应的队列,则新建一个ArrayDeque队列
d = new ArrayDeque<>();
// TODO 将新建的队列加入到batches中,并返回
// TODO 注意这是一个低频的操作!!
Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
return previous;
}
其中,batches的数据结构是一个实现了ConcurrentMap接口的Map
// TODO 线程安全的Map,partition分区->Deque队列
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
batches在构造函数中初始化是一个CopyOnWriteMap:
/**
* TODO 线程安全的Map
* TODO write:synchronized加锁并copy一个新的对象,在新copy的map中进行写操作,写操作完毕后赋值给原有的Map(volatile修饰)
* TODO read: 不加锁,直接从map中获取值
* TODO 适合读多写少的场景
*/
this.batches = new CopyOnWriteMap<>();
CopyOnWriteMap是kafka自定义的一个数据结构:
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
private volatile Map<K, V> map;
//...
}
2.2.3?tryAppend填充数据
org.apache.kafka.clients.producer.internals.RecordAccumulator#tryAppend
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque, long nowMs) {
// TODO 获取队列尾端的ProducerBatch
ProducerBatch last = deque.peekLast();
// TODO 如果获取到ProducerBatch,则往ProducerBatch中添加数据
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
}
return null;
}
2.2.4?allocate分配内存
org.apache.kafka.clients.producer.internals.BufferPool#allocate
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
// TODO 如果申请的size大于最大内存(默认32M)
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
if (this.closed) {
this.lock.unlock();
throw new KafkaException("Producer closed while allocating memory");
}
try {
// check if we have a free buffer of the right size pooled
// TODO 如果申请的size==ProducerBatch && Deque<ByteBuffer> free不是空,则直接从队列头部返回ByteBuffer
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
// TODO 计算free总内存大小= Deque<ByteBuffer> free的size * 一个ProducerBatch默认的大小(16KB)
int freeListSize = freeSize() * this.poolableSize;
// TODO 可用内存大于要申请的内存,备注:总可用内存=nonPooledAvailableMemory + freeListSize
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
// TODO 扣减内存
this.nonPooledAvailableMemory -= size;
} else {
// TODO 总可用内存不足
// we are out of memory and will have to block
// 当前分配到的内存大小
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
// TODO 加入到等待队列
this.waiters.addLast(moreMemory);
// loop over and over until we have a buffer or have reserved
// enough memory to allocate one
// TODO 循环等待其他线程释放内存
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
// TODO 等待,需要被唤醒:释放内存,这里就被唤醒
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
recordWaitTime(timeNs);
}
if (this.closed)
throw new KafkaException("Producer closed while allocating memory");
if (waitingTimeElapsed) {
this.metrics.sensor("buffer-exhausted-records").record();
throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
// TODO 如果申请的size==ProducerBatch && Deque<ByteBuffer> free不是空,则直接从free队列头头部获取ByteBuffer
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
} else {
// TODO 如果依然不够
// we'll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
// TODO 计算本次获得到的内存大小got
int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
// TODO 进行内存扣减,占用内存
this.nonPooledAvailableMemory -= got;
// TODO 累加获得到的内存,进行下一次循环等待
accumulated += got;
}
}
// Don't reclaim memory on throwable since nothing was thrown
accumulated = 0;
} finally {
// When this loop was not able to successfully terminate don't loose available memory
this.nonPooledAvailableMemory += accumulated;
// TODO 移除等待队列
this.waiters.remove(moreMemory);
}
}
} finally {
// signal any additional waiters if there is more memory left
// over for them
try {
if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
// Another finally... otherwise find bugs complains
lock.unlock();
}
}
if (buffer == null)
// TODO 分配内存
return safeAllocateByteBuffer(size);
else
return buffer;
}
2.2.5?deallocate释放内存
org.apache.kafka.clients.producer.internals.BufferPool#deallocate(java.nio.ByteBuffer)
public void deallocate(ByteBuffer buffer, int size) {
lock.lock();
try {
// TODO 归还的size==poolableSize(16K)
if (size == this.poolableSize && size == buffer.capacity()) {
// TODO 清空buffer
buffer.clear();
// TODO 把清空后的buffer放入free队列
this.free.add(buffer);
} else {
// TODO 内存加入到nonPooledAvailableMemory
this.nonPooledAvailableMemory += size;
}
// TODO 获取等待队列中队首的Condition,并唤醒
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
moreMem.signal();
} finally {
lock.unlock();
}
}
public void deallocate(ByteBuffer buffer) {
deallocate(buffer, buffer.capacity());
}
总结,经过上面这些步骤,最终封装成RecordAccumulator.RecordAppendResult消息对象,在后续进行发送使用。
?
|