IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spring-kafka几种AckMode模式介绍,MANUAL_IMMEDIATE和MANUAL区别 -> 正文阅读

[大数据]spring-kafka几种AckMode模式介绍,MANUAL_IMMEDIATE和MANUAL区别

kafka每个partition都有自己的offset,消费端处理完后要向kafka服务器提交offset。
spring-kafka组件有下面几种AckMode提交模式:

模式描述
MANUALpoll()拉取一批消息,在处理完业务手动调用Acknowledgment.acknowledge()方法先放到map缓存,在下一次poll之前从缓存拿出来批量提交
MANUAL_IMMEDIATE每处理完业务手动调用Acknowledgment.acknowledge()后立即提交
RECORD当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
BATCH当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
TIME当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
COUNT当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
COUNT_TIMETIME或COUNT满足其中一个时提交

MANUAL和MANUAL_IMMEDIATE模式要将spring.kafka.consumer.enable-auto-commit设置为false

@KafkaListener注解

@KafkaListener(
            topics = "test",
            groupId = "testGroup"
    )
    public void listener(ConsumerRecord<String, String> record, Acknowledgment ack) {
    // 处理业务
    ack.acknowledge();
    }

如果spring.kafka.consumer.enable-auto-commit=true,方法参数不能包含Acknowledgment ack,否则会报错。

@KafkaListener(
            topics = "test",
            groupId = "testGroup"
    )
    public void listener(ConsumerRecord<String, String> record) {
    // 处理业务
    // 不用手动提交
    }

org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#run
项目启动后这个run方法会不停从kafka服务器broke拉取(poll)消息回来消费

while (isRunning()) {
				try {
					if (!this.autoCommit && !this.isRecordAck) {
						processCommits();
					}
					processSeeks();
					ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
					this.lastPoll = System.currentTimeMillis();

					if (records != null && this.logger.isDebugEnabled()) {
						this.logger.debug("Received: " + records.count() + " records");
					}
					if (records != null && records.count() > 0) {
						if (this.containerProperties.getIdleEventInterval() != null) {
							lastReceive = System.currentTimeMillis();
						}
						invokeListener(records);
					}

其中this.consumer.poll()方法拉取消息,invokeListener方法是执行处理消息逻辑。

private void invokeListener(final ConsumerRecords<K, V> records) {
			if (this.isBatchListener) {
				invokeBatchListener(records);
			}
			else {
				invokeRecordListener(records);
			}
		}
private void invokeRecordListener(final ConsumerRecords<K, V> records) {
			if (this.transactionTemplate != null) {
				innvokeRecordListenerInTx(records);
			}
			else {
				doInvokeWithRecords(records);
			}
		}
private void doInvokeWithRecords(final ConsumerRecords<K, V> records) throws Error {
			Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
			while (iterator.hasNext()) {
				final ConsumerRecord<K, V> record = iterator.next();
				if (this.logger.isTraceEnabled()) {
					this.logger.trace("Processing " + record);
				}
				doInvokeRecordListener(record, null);
			}
		}

最终会走到doInvokeRecordListener方法

private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,
				@SuppressWarnings("rawtypes") Producer producer) throws Error {
			try {
				if (this.acknowledgingMessageListener != null) {
					this.acknowledgingMessageListener.onMessage(record,
							this.isAnyManualAck
									? new ConsumerAcknowledgment(record)
									: null);
				}
				else {
					this.listener.onMessage(record);
				}
				ackCurrent(record, producer);
			}

最终会调用onMessage方法,其中第二个参数就是Acknowledgment,有可能是null,看设置的auto.commit值决定。

public void ackCurrent(final ConsumerRecord<K, V> record, @SuppressWarnings("rawtypes") Producer producer) {
			if (this.isRecordAck) {
				Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
						Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
								new OffsetAndMetadata(record.offset() + 1));
				if (producer == null) {
					if (this.logger.isDebugEnabled()) {
						this.logger.debug("Committing: " + offsetsToCommit);
					}
					if (this.containerProperties.isSyncCommits()) {
						this.consumer.commitSync(offsetsToCommit);
					}
					else {
						this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
					}
				}
				else {
					this.acks.add(record);
				}
			}
			else if (!this.isAnyManualAck && !this.autoCommit) {
				this.acks.add(record);
			}
			if (producer != null) {
				try {
					sendOffsetsToTransaction(producer);
				}
				catch (Exception e) {
					this.logger.error("Send offsets to transaction failed", e);
				}
			}
		}

当我们设置了手动提交,也就是MANUAL或者MANUAL_IMMEDIATE,是通过调用ack.acknowledge()方法

@Override
			public void acknowledge() {
				Assert.state(ListenerConsumer.this.isAnyManualAck,
						"A manual ackmode is required for an acknowledging listener");
				for (ConsumerRecord<K, V> record : getHighestOffsetRecords(this.records)) {
					processAck(record);
				}
			}
private void processAck(ConsumerRecord<K, V> record) {
			if (!Thread.currentThread().equals(this.consumerThread)) {
				try {
					this.acks.put(record);
				}
				catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					throw new KafkaException("Interrupted while storing ack", e);
				}
			}
			else {
				if (this.isManualImmediateAck) {
					try {
						ackImmediate(record);
					}
					catch (WakeupException e) {
						// ignore - not polling
					}
				}
				else {
					addOffset(record);
				}
			}
		}
private void processAck(ConsumerRecord<K, V> record) {
			if (!Thread.currentThread().equals(this.consumerThread)) {
				try {
					this.acks.put(record);
				}
				catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					throw new KafkaException("Interrupted while storing ack", e);
				}
			}
			else {
				if (this.isManualImmediateAck) {
					try {
						ackImmediate(record);
					}
					catch (WakeupException e) {
						// ignore - not polling
					}
				}
				else {
					addOffset(record);
				}
			}
		}

MANUAL_IMMEDIATE模式直接提交offset,MANUAL模式则先把要提交的offset放到map中,然后返回。

再看回最上面的run方法,其中processCommits()就是从map中拿出要提交的offset,然后批量提交。也就是在下一次poll之前做了提交,相当于处理完了上一次poll回来的所有消息后,然后再一起提交。

他们区别:MANUAL_IMMEDIATE是消费完一个消息就提交,MANUAL是处理完一批消息,在下一次拉取消息之前批量提交。拉取批量消息可以通过max.poll.record设置最大,默认是500条。前提是消息大小满足最大限制,否则一批也拉取不到最大的500条。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-25 11:45:16  更:2021-07-25 11:46:43 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年4日历 -2024/4/30 16:40:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码