| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> spring-kafka消费者源码分析 -> 正文阅读 |
|
[大数据]spring-kafka消费者源码分析 |
https://blog.csdn.net/qq_26323323/article/details/84938892 这篇文章对spring-kafka消费端源码分析较为详细,可查看其customer初始化的过程。 整个初始化的开始是从@EnableKafka开始讲起的 初始化的工程归纳如下: ? ? ? ?得到一个含有KafkaListener基本信息的Endpoint,最后Endpoint被封装到KafkaListenerEndpointDescriptor,KafkaListenerEndpointDescriptor被添加到KafkaListenerEndpointRegistrar.endpointDescriptors中,也就是一个list中 在afterPropertiesSet方法中遍历endpointDescriptors,并执行对应的方法。
@KafkaListener注解变为ConcurrentMessageListenerContainer类,这个Container中包含了我们所需要的topic相关信息 ConcurrentMessageListenerContainer的分析该类实现了lifeCycle接口 启动
最终将我们@KafkaListener中的topicPartitions ?@KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0","1","2" })}) 并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者
ListenerConsumer的工作
总结:通过构造方法,ListenerConsumer完成了Consumer的创建以及topic和partition的监听 轮询拉取的业务逻辑
提交偏移量:那么消费者是如何提交偏移量的呢?消费者往一个 叫作 _consumer_offset 的特殊主题发送 消息,消息里包含每个分区的偏移量。 如果消费者一直处于运行状态,那么偏移量就没有 什么用处,因为每个消费者都会在内存中记录自己消费到哪里了。不过,如果悄费者发生崩溃或者有新 的消费者加入群组,就会触发再均衡,完 成再均衡之后,每个消费者可能分配到新 的分区,而不是之前处理的那个。为了能够继续 之前的工作,消费者需要读取每个分区最后一次提交 的偏移量,然后从偏移量指定的地方 继续处理。 提交偏移量有很多种方式 kafka自动提交 最简单的提交方式是让悄费者自动提交偏移量。如果enable.auto.commit被设为 true,那么每过5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西 一样,自动提交也是在轮询(poll() )里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那 么就会提交从上一次轮询返回的偏移量。所以这里auto.commit.interval.ms参数是两次提交偏移量的最小时间间隔,因为提交偏移量也是在poll中实现的,是每次poll的时候判断当前时间和上次提交的时间差是否大于等于5s,是,则需要提交偏移量,否则,本次轮询不需要提交偏移量。源码如下: 不过,在使用这种简便的方式之前,需要知道它将会带来怎样的结果。 假设我们仍然使用默认的 5s提交时间间隔,在最近一次提交之后的 3s发生了再均衡,再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无也完全避免的 。 在使用自动提交时 ,每次调用轮询方法都会把上一次调用返 回的偏移量提交上去,它并不 知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回 的消息 都已经处理完毕(在调用 close() 方法之前也会进行自动提交)。 一般情况下不会有什么问 题,不过在处理异常或提前退出轮询时要格外小心 提交偏移量除了自动提交,还有手动提交,AckMode中是所有的提交模式枚举 拉取消息的过程这篇博文写消费者分析很不错?https://blog.csdn.net/liyiming2017/article/details/89187474 KafkaConsumer有几个主要的组件:1、消费者要自己记录消费的位置(但也需要提交到服务端保存,为了rebalance后的消费能衔接上),所以我们需要SubScriptionState来保存消费的状态。 2、ConsumerCoordinator负责和GroupCoordinator通讯,例如在leader选举,入组,分区分配等过程。 3、ConsumerNetworkClient是对NetworkClient的封装,他对nio的组件进行封装,实现网络IO。 4、PartitionAssignor,这是分区分配策略,在进行分区分配的时候会用到。 5、Fetcher负责组织拉取消息的请求,以及处理返回。不过需要注意它并不做网络IO,网络IO还是由ConsumerNetworkClient完成。它其实对应生产者中的Sender。 可参考的文献:源码分析Kafka 消息拉取流程 https://cloud.tencent.com/developer/article/1551705 https://blog.csdn.net/evasnowind/article/details/108534598 kafka消费者--加入consumergroup流程 https://blog.csdn.net/asdfsadfasdfsa/article/details/104883173 自动提交是调用 https://zhuanlan.zhihu.com/p/112745985 Kafka consumer消息的拉取及偏移的管理 https://blog.csdn.net/E_Possible/article/details/109564700 Kafka消费者源码解析之二Fetcher https://blog.csdn.net/lt793843439/article/details/89634643 concurrency问题 https://blog.csdn.net/u010634066/article/details/109778491 https://hengheng.blog.csdn.net/article/details/107468648 https://blog.csdn.net/weixin_39672680/article/details/111744351 |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年4日历 | -2025/4/22 7:26:09- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |