1.生产者发送消息
1.1 找到kafka生产者入口–这里通过KafkaTemplate发送消息
点击send()方法,进入doSend()
ProducerRecord主要是创建要发送到 Kafka 的记录,topic需要指定,其他参数可自配 KafkaTemplate通过工厂模式创建kafkaProducer
找到ProducerFactory的实现类进入DefaultKafkaProducerFactory的createProducer方法 找到创建kafkaProducer的位置
在kafkaProducer里面,创建kafkaProduce实例,并且创建了一个sender对象,启动了一个io线程 这个io线程就是kafka发送线程,生产者发送消息时,这个线程会被唤醒 kafkaProducer创建完成之后,在回到消息发送的位置
调用send()方法发送消息 在kafkaProducer sender发送线程被唤醒之前,kafkaProducer的main线程会进行几个逻辑处理
1.拦截器逻辑
走代码中可以看出,生产者可以实现你多个拦截器,形成一个拦截链
发送消息测试
2.用指定的序列化器分别对key,value进行序列化
3.指定分区器
kafka的消息会发送到partiton中,但是具体到哪个分区器,可以自己指定,没有指定的话,kafka已经实现自动分配,分4中情况 1.在指定partition情况下,直接将指定的partition作为partiton值 2.没有指定partiton但是自定义了分区器 3.没有指定partiton但是存在key的情况下,会使用默认的分区器DefaultPartitioner 4.在既没有指定partiton也没有指定key的情况下,第一次调用时,会随机生成一个整数(之后每次调用在这个基础上自增)然后将这个值对当前topic可用的partiton取余(轮询round-robin) 4.消息累加器 分区选择完成后,并没有立即将消息发送出去,而是把消息放进了一个累加器(ConcurrentMap)缓存起来,什么时候发送跟batch-size有关 kafka尽可能的保证消息先往一个分区里发送,当这个分区的batch-size已经不足以在继续追加存放消息,kafka会考虑更换一个pattition分区继续往该分区的的累加器里累加消息,如果再次追加失败,这个时候kafka会新建batch,不会在尝试了,新建与否通过这个参数控制abortForNewBatch
最后唤醒sender线程发送消息
|