何时创建连接
??根据我以前的文章,我们知道kafka的java生产者在实例已经创建,就创建了TCP连接。生产者的入口类KafkaProducer在构建实例时,会在后台启动Sender线程,这个线程负责Sockert连接的创建。 ??那java的消费者是什么时候创建的连接呢?是实例创建时吗?并不是,消费者是在调用KafkaConsumer.poll方法时创建的连接。 具体的创建时机有三个:
-
发起FindCoordinator请求时。 ??在前面的的文章"kafka消费者重平衡可以避免吗?"中 ,说过一个协调者(Coordinator),它是驻留在Borker端的内存中的。负责消费者组成员和各个消费者的位移提交管理。当消费者程序首次调用poll方法时,他需要向Kafka集群发送一个FindCoordinator的请求,从而让集群告诉它那个Borker是管理它的协调者。 ??具体发给那个Borker呢?理论上任何一个Borker都可以回答他的维问题。因为集群中的任何一个Borker都有整个集群的信息。不过一般这个请求是发给负载最小的Borker。 -
连接协调者时 ??上一步的请求得到回应之后,消费者就知道了那个Borker是真正的协调者了。之后就会创建于协调者之间的Socket连接了。只有真正连入协调者才能开启正常的组协调操作。如:加入组,位移管理等。 -
消费数据时 ??消费者会为每个要消费的分区创建与该分区领导者副本所在Broker连接的TCP。
创建几个连接
消费者程序会创建3类TCP连接:
- 确定协调者和获取集群元数据的连接
- 连接协调者,令其执行组成员管理操作的连接
- 实际的消息获取的连接。
何时关闭连接
??和生产者端相同,主类有两种:主动关闭连接和kafka自动关闭连接。 ??如果手动的调用KafkaConsumer.close()方法,或者是执行Kill命令。就是主动关闭连接了。kafka的自动关闭是由消费者端参数connection.max.idle.ms控制的。该参数的默认值是9分钟。即如果某个Socker连接上连续9分钟没有请求的话就会自动关闭了。 ??和生产者不同的是:如果在消费者程序中,你使用了循环的方式来调用poll方法消费消息。那上面的请求就会定期发送到Borker,因此Socket上也就可以保证一直由请求。也就实现了长连接。 ??需要注意的是:当上面的第三类TCP连接成功之后,消费者程序会废弃第一类TCP连接,之后的定期请求元数据的请求改为第三类TCP连接。
|