kafka每个partition都有自己的offset,消费端处理完后要向kafka服务器提交offset。 spring-kafka组件有下面几种AckMode提交模式:
模式 | 描述 |
---|
MANUAL | poll()拉取一批消息,在处理完业务手动调用Acknowledgment.acknowledge()方法先放到map缓存,在下一次poll之前从缓存拿出来批量提交 | MANUAL_IMMEDIATE | 每处理完业务手动调用Acknowledgment.acknowledge()后立即提交 | RECORD | 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 | BATCH | 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 | TIME | 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交 | COUNT | 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交 | COUNT_TIME | TIME或COUNT满足其中一个时提交 |
MANUAL和MANUAL_IMMEDIATE模式要将spring.kafka.consumer.enable-auto-commit设置为false
@KafkaListener注解
@KafkaListener(
topics = "test",
groupId = "testGroup"
)
public void listener(ConsumerRecord<String, String> record, Acknowledgment ack) {
// 处理业务
ack.acknowledge();
}
如果spring.kafka.consumer.enable-auto-commit=true,方法参数不能包含Acknowledgment ack,否则会报错。
@KafkaListener(
topics = "test",
groupId = "testGroup"
)
public void listener(ConsumerRecord<String, String> record) {
// 处理业务
// 不用手动提交
}
org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#run 项目启动后这个run方法会不停从kafka服务器broke拉取(poll)消息回来消费
while (isRunning()) {
try {
if (!this.autoCommit && !this.isRecordAck) {
processCommits();
}
processSeeks();
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
this.lastPoll = System.currentTimeMillis();
if (records != null && this.logger.isDebugEnabled()) {
this.logger.debug("Received: " + records.count() + " records");
}
if (records != null && records.count() > 0) {
if (this.containerProperties.getIdleEventInterval() != null) {
lastReceive = System.currentTimeMillis();
}
invokeListener(records);
}
其中this.consumer.poll()方法拉取消息,invokeListener方法是执行处理消息逻辑。
private void invokeListener(final ConsumerRecords<K, V> records) {
if (this.isBatchListener) {
invokeBatchListener(records);
}
else {
invokeRecordListener(records);
}
}
private void invokeRecordListener(final ConsumerRecords<K, V> records) {
if (this.transactionTemplate != null) {
innvokeRecordListenerInTx(records);
}
else {
doInvokeWithRecords(records);
}
}
private void doInvokeWithRecords(final ConsumerRecords<K, V> records) throws Error {
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
final ConsumerRecord<K, V> record = iterator.next();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Processing " + record);
}
doInvokeRecordListener(record, null);
}
}
最终会走到doInvokeRecordListener方法
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,
@SuppressWarnings("rawtypes") Producer producer) throws Error {
try {
if (this.acknowledgingMessageListener != null) {
this.acknowledgingMessageListener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null);
}
else {
this.listener.onMessage(record);
}
ackCurrent(record, producer);
}
最终会调用onMessage方法,其中第二个参数就是Acknowledgment,有可能是null,看设置的auto.commit值决定。
public void ackCurrent(final ConsumerRecord<K, V> record, @SuppressWarnings("rawtypes") Producer producer) {
if (this.isRecordAck) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
if (producer == null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Committing: " + offsetsToCommit);
}
if (this.containerProperties.isSyncCommits()) {
this.consumer.commitSync(offsetsToCommit);
}
else {
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
}
}
else {
this.acks.add(record);
}
}
else if (!this.isAnyManualAck && !this.autoCommit) {
this.acks.add(record);
}
if (producer != null) {
try {
sendOffsetsToTransaction(producer);
}
catch (Exception e) {
this.logger.error("Send offsets to transaction failed", e);
}
}
}
当我们设置了手动提交,也就是MANUAL或者MANUAL_IMMEDIATE,是通过调用ack.acknowledge()方法
@Override
public void acknowledge() {
Assert.state(ListenerConsumer.this.isAnyManualAck,
"A manual ackmode is required for an acknowledging listener");
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(this.records)) {
processAck(record);
}
}
private void processAck(ConsumerRecord<K, V> record) {
if (!Thread.currentThread().equals(this.consumerThread)) {
try {
this.acks.put(record);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new KafkaException("Interrupted while storing ack", e);
}
}
else {
if (this.isManualImmediateAck) {
try {
ackImmediate(record);
}
catch (WakeupException e) {
// ignore - not polling
}
}
else {
addOffset(record);
}
}
}
private void processAck(ConsumerRecord<K, V> record) {
if (!Thread.currentThread().equals(this.consumerThread)) {
try {
this.acks.put(record);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new KafkaException("Interrupted while storing ack", e);
}
}
else {
if (this.isManualImmediateAck) {
try {
ackImmediate(record);
}
catch (WakeupException e) {
// ignore - not polling
}
}
else {
addOffset(record);
}
}
}
MANUAL_IMMEDIATE模式直接提交offset,MANUAL模式则先把要提交的offset放到map中,然后返回。
再看回最上面的run方法,其中processCommits()就是从map中拿出要提交的offset,然后批量提交。也就是在下一次poll之前做了提交,相当于处理完了上一次poll回来的所有消息后,然后再一起提交。
他们区别:MANUAL_IMMEDIATE是消费完一个消息就提交,MANUAL是处理完一批消息,在下一次拉取消息之前批量提交。拉取批量消息可以通过max.poll.record设置最大,默认是500条。前提是消息大小满足最大限制,否则一批也拉取不到最大的500条。
|