1. 需求
每一个消费者消费消息后,异步调用任务时需要对任务的并发数量进行控制
2. 实现思路
1. 消费者消费消息,发送异步任务
2. 添加线程安全的计数器控制并发数量
3. 消费者A并发阻塞后,需要让A进行"下线", 使得其他消费者空闲的情况下可以消费A对应分区的消息(ps: 每个分区只会对应一个消费者,不下线,其他消费者无法消费该分区会形成消息积压)
3. 代码实现
4. 验证
4.1 创建topic
分区数设置为3??--partitions 3
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 3 --topic msg
4.2 查看Topic
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --describe --topic msg
4.3 消费者消费分区查看
windows命令(linux跟换.bat为.sh)如下
kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group jsGroup --describe

?可以看到3个消费者分别对应3个不同的分区
4.4 代码运行结果查看
4.4.1 3个消费者(每个并发为2)开始依次执行

4.4.2 第一次全部下线后,查看消费组

?
内容待整理:
参考内容:?https://blog.csdn.net/yy756127197/article/details/103961012
|