4个分区,1个Consumer | 4个分区,2个Consumer | | | 4个分区,4个Consumer | 4个分区,5个Consumer | | |
?
?1,当多个Consumer订阅同一个主题并属于同一个Consumer Groups时,该组中的每个Consumer 将从该主题中的不同分区中接收消息。
2,1个partition只能被同组的一个consumer消费
3,自动提交消费偏移量(在拉取消息后就提交)
4,手动提交偏移量(注意红色部分)
@KafkaListener(topics = {"xxx"}, groupId = "xxx")
public void vinQueueListener(ConsumerRecord<?, String> record, Acknowledgment ack) {
????????dosomething();
????????// 手动提交offset
????????ack.acknowledge();
}
对应配置:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
auto-commit-interval: 1000
enable-auto-commit: false
group-id: xxx
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
concurrency: 4
ack-mode: MANUAL_IMMEDIATE
5,对于未commit偏移量的消息,消费者会在rebalance(比如重启消费者服务)后重新从上一次? ? ? ? ? 提交的offset+1出开始重新消费。? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?(自己验证的时候发现一个问题:比如,消息1未提交offset,消息2未提交offset,然后消息3提交了offset,消息4未提交offset,那么在重启消费者服务后,只会从消息4继续开始消费,消息1和2就会被丢弃,消费不到)
|