1. Kafka 简介
Kafka 是一种分布式的,基于发布 / 订阅的消息系统。主要设计目标如下:
- 以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输。
- 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- Scale out:支持在线水平扩展。
2. 架构分析
在学习Kafka之前,首先给出下图,相信大家肯定看的很懵逼,问题不大,我们继续往下看,等了解相关组件之后我们在回过头重新看该图。
2.1 Broker
Kafka作为一个中间件,是帮我们存储和转发消息的,它做的事情有点像中介,所以我们把Kafka的服务叫做Broker,默认是9092的端口。生产者和消费者都需要跟这个Broker建立一个连接,才可以实现消息的收发。
2.2 消息
客户端之间传输的数据叫做消息,或者叫做记录。
在客户端的代码中,记录可以是一个KV键值对。
生产者对应的封装类是ProducerRecord,消费者对应的封装类是ConsumerRecord。
消息在传输的过程中需要序列化,所以代码里面需要指定序列化工具。
消息在服务端的存储格式为RecordBatch和Record。
2.3 生产者
发送消息的一方叫做生产者,接受消息的一方叫做消费者。
为了提升消息发送速率,Kafka中的生产者不是逐条发送消息给Broker,而是批量发送的。
每次发送多少条由参数决定:
pros.put("batch.size",16384);
2.4 消费者
一般来说消费者获取消息有两种模式,一种是pull模式,一种是push模式。
Pull模式就是消费放在Broker,消费者自己决定什么时候去获取。Push模式是消息放在Consumer,只要有消息到达Broker,都直接推给消费者。
RabbitMQ中两者都支持,一般使用push。但是Kafka只有pull模式。
为什么消费者用pull模式呢?因为kafka经常用在海量数据的环境中,所以如果在push模式下,消息产生速度远远大于消费者消费信息的速率,那消费者就会不堪重负,直接挂掉。
消费者可以控制自己一次性获取多少消息:
max.poll.records
默认500.在pull方法里面可以直接指定。
2.5 Topic
生产者和消费者是怎么关联起来的呢?或者说,生产者发送的消息,怎么才能到达某个指定的消费者?他们需要通过队列关联起来,也就是说,生产者发送消息,要指定发送给哪一个队列;消费者接受消息,需要指定从哪一个队列接受。
在Kafka里面,这个队列叫做Topic,可以理解为一组消息的集合。
生产者和Topic以及Topi和消费者的关系都是多对多。一个生产者可以发送消息给多个Topic,一个消费者也可以从多个Topic中获取消息。(但是不建议从多个中获取)
注意,生产者发送消息的时候,如果Topic不存在,会自动创建,通过以下参数控制:
auto.create.topics.enable
默认为true。如果要彻底删掉一个Topic,这个参数必须改成false,否则只要代码中使用这个Topic,就会自动创建。
2.6 Partition 与 Cluster
如果说一个Topic中的消息太多,会带来两个问题:
- 不方便横向扩展。比如我想要在集群中把数据分布在不同的机器上实现扩展,而不是通过升级硬件做到,如果一个Topic的消息无法在物理上拆到多台机器的时候,这个是做不到的。
- 并发或者负载问题。所有的客户端操作的都是同一个Topic,在高并发的场景下性能会大大下降。
怎么解决这些问题呢?我们想到的就是把Topic进行拆分。(分片思想)
分区在创建Topic的时候指定,每个Topic至少有一个分区。
创建Topic的命令:
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-test-topic
如果没有指定分区数,默认的分区数是一个,这个参数可以修改:
num.partitions=1
partitions是分区数,replication-factor是主题的副本数。
partition思想上有点类似与分库分表,实现的也是横向扩展和负载的目的。
举个例子:Topic有3个分区,生产者依次发送9条消息,对消息进行编号:
第一个分区 1 4 7;第二个分区 2 5 8 ;第三个分区 3 6 9 ;这就实现了负载的功能。
每个partition都有一个物理目录。在配置的数据目录下(日志就是数据):
/tmp/kafka-logs/
跟RabbitMQ不一样的地方在于Partition里面的消息被读取之后不会被删除,所以同一批消息在一个Partition顺序、追加写入。这个也是Kafka吞吐量大的一个很重要的原因。
分区数量怎么选择?分区数量越多越好?不一定,不同的机器网络环境,这个答案是不同的,最后通过性能测试脚本进行验证。
2.7 Partition 副本 Replica 机制
如果partition的数据只存储一份,在发生网络或者硬件故障的时候该分区的数据就无法访问或者无法恢复了。
Kafka在0.8版本之后增加了副本机制。
每个partition可以有若干个副本,副本必须在不同的Broker上面。一般我们说的副本包括其中的主节点。
由replication-factor指定一个Topic的副本数。
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-test-topic
服务端有一个参数控制默认的副本数:
offsets.topic.replication.factor
2.8 Segment
Kafka是数据是放在后缀.log的文件里面的。如果一个partition只有一个log文件,消息不断的追加,这个log文件也会变得越来越大,这个时候检索数据的效率就很低了。
所以干脆把partition在做一个切分,切分出来的单位就叫做段。实际上kafka的存储文件段来存储的。
每个segment都有至少一个数据文件和两个索引文件,这三个文件是成套出现的。
partition一个目录,一个segment一套文件。
segment多大一个呢?默认大小是1G,由以下参数控制:
log.segment.bytes
2.9 Consumer Group
如果生产者生产消息的速率过快,会造成消息在Broker的堆积,影响Broker的性能。怎么提升消息的消费速率呢?我们可以通过增加消费者的数量,但是这么多消费者,怎么知道大家是不是消费的同一个topic呢?
所以引入了Consumer Group这个概念,在代码中通过group id来配置。消费同一个topic的消费者不一定是同一个组,只有group id相同的消费者才是同一个消费者组。
注意:同一个Group中的消费者,不能消费相同的partition——partition在消费者之间分配。
- 如果消费者比partition少,一个消费者可能消费多个partition。
- 如果消费者比partition多,肯定由消费者没有partition可以消费,不会出现一个group里面的消费者消费同一个partition的情况。如果想要同时消费同一个partition的消息,那么需要其他组来消费。
2.10 Consumer Offset
我们前面说过了,partition里面的消息是顺序写入的,被读取之后不会被删除
如果消费者挂了或者下一次读取,想要接着上次的位置读取消息,或者从某个特定的位置来读取消息,怎么办?会不会出现重复消费的情况?
因为消息是有序的,我们可以对消息进行编号,用来标识一条唯一的消息。
这个编号我们就把他叫做offset,偏移量。
offset记录下一条将要发送给consumer的消息的序号。
这个消费者跟partition之间的偏移量没有保存在ZK中,而是直接保存在服务端。
2.11 总结
现在我们学完后,回头看下之前的架构图。
解读:
首先,我们由3台Broker;
由两个Topic:Topic0和Topic1;
Topic0有2个分区:partition0,partition1,每个分区一共3个副本。
Topic1只有1个分区:partition0,每个分区一种3个副本。
图中红色字体的副本代表是leader,黑色字体的副本代表是follower。绿色的线代表数据同步。
蓝色的线代表写消息;橙色的线是读消息;都是针对leader节点;
有两个消费者组,第一个消费者组只有一个消费者它消费了topic0的两个分区;
第二个消费组,既消费了topic0,又消费了topic1。其中有一个消费者,消费topic0的partition0,还消费topic1的partition0;有一个消费者消费topic0的partition1;还有一个消费者没有partition消费。
|