一、梳理重要知识点
本文内容主要是说明kafka如何保证消息的有序性?首先我们需要先回顾若干个重要的概念
- kafka生产者缓冲区包含若干个缓冲队列,每一个缓冲队列对应kafka服务端的一个主题的一个分区。
- 缓冲队列的数据结构是Deque,是一个双端队列,一端放入数据,一端取出数据。
二、如何实现消息的有序性?
其实看了上面的这个图,kafka如何保证消息的有序性就呼之欲出了
- 在生产者中的双端缓冲队列中,消息是可以保证顺序的,一端进一端出。
- 每一个双端队列对应kafka服务端的一个主题的分区,所以kafka是可以保证消息数据在一个分区内的有序性。
如果我们希望消息是有序被发送的并且是有序被处理的,需要满足以下场景中的一个:
- 某个主题只有一个分区,那么这个主题的所有数据"发送"和“消费”都是有序的。这种方式应用场景比较窄,针对数据量比较小的主题可以考虑创建主题时只包含一个分区。
- 某个主题有多个分区也可以通过自定义分区器,来实现需要有序的消息被发往同一个分区。自定义分区器会在后面的文章中讲解。
- 某个主题有多个分区,可以在发送消息的时候为消息指定key值,具有相同key值的消息,会被发送到同一个分区。因此具有相同key值的消息数据可以保证有序性。如下所示:生产者发送数据,下文ProducerRecord构造函数的第二个参数是key。
producer.send(
new ProducerRecord<>(
"producer_test",
"producer_key",
"noCallback value:" + i)
);
三、如何避免重试机制导致的消息顺序错乱?
在本专栏上一篇文章中已经说明,kafka生产者提供了消息发送的重试机制,也就是说消息发送失败后,kafka生产者会重新发送消息。
我们来看可能出现的下面的这样一种状况:
- 第一个批次的消息发送后,因为某种特殊原因(如主题分区正在重新选举Leader)导致数据发送失败了
- 第二个批次的消息发送,服务端数据保存成功了。
- 因为第一个批次消息发送失败,kafka重新尝试发送第一个批次的数据,这次成功了
这就会导致了发往kafka分区的数据出现了顺序上的错乱。该如何避免这个问题?需要设置下面的这样一个参数。这个配置参数仍然是apache kafka生产者客户端的配置,不是kafka 服务端配置。
max.in.flight.requests.per.connection=1
这个参数的作用是:对于一个kafka客户端请求连接(可以认为是一个生产者),一旦出现1个批次的消息发送失败,在该批次的数据重试(重新发送)成功之前,下一个批次的消息数据发送处于阻塞状态。上一个批次不成功,下一个批次就永远发不出去。
虽然这种方式保证消息的顺序性方面是有效的,但是修改配置之前:一定要问自己一个问题,是不是真的要保证消息的有序性,因为配置该参数,会导致生产者发送数据的效率降低。
|