1.定义
1.1传统定义
- Kafka是一个分布式的基于发布订阅模式的消息队列,主要应用于大数据的实时处理领域
- 发布订阅:消息发布者不会直接将消息发送给订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息
1.2最新定义
- Kafka是一个开源的分布式流平台,被数千家公司用于高性能数据管道、流分析、数据集成、和关键任务应用
2.消息队列
2.1常见消息队列
- Kafka
- ActiveMQ
- RabbitMQ
- RocketMQ
2.2应用场景
- Kafka更多应用于大数据场景
- ActiveMQ、RabbitMQ、RocketMQ多应用于web服务器开发场景
2.3作用
- 缓存
- 流量削峰
- 解耦
- 异步通信
3.Kafka模式
3.1模式图鉴
-
点对点模式
-
发布/订阅模式
- 可以有多个Topic主题
- 消费者消费数据后,不删除数据
- 每个消费者相互独立、都可以消费到数据
3.2推拉模式各自优缺点
3.2.1推模式
- 优点
- 1.消息实时性高, Broker 接受完消息之后可以立马推送给 Consumer。
- 2.对于消费者使用来说更简单,只需要等着,反正有消息来了就会推过来。
- 缺点
- 1.
推送速率难以适应消费速率 ,推模式的目标就是以最快的速度推送消息, 当生产者往 Broker 发送消息的速率大于消费者消费消息的速率时, 随着时间的增长消费者那边可能就“爆仓”了,因为根本消费不过来啊。 当推送速率过快就像 DDos 攻击一样消费者就傻了。 - 2.并且
不同的消费者的消费速率还不一样 ,身为 Broker 很难平衡每个消费者 的推送速率,如果要实现自适应的推送速率那就需要在推送的时候消费者告诉 Broker ,我不行了你推慢点吧,然后 Broker 需要维护每个消费者的状态进行 推送速率的变更。这其实就增加了 Broker 自身的复杂度。 - 适用场景
3.2.2拉模式
-
优点
- 1.
消费者可以根据自身的情况来发起拉取消息的请求 。假设当前消费者觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。 - 2.拉模式下
Broker 就相对轻松 了,它只管存生产者发来的消息,至于消费的时候自然由消费者主动发起,来一个请求就给它消息呗,从哪开始拿消息,拿多少消费者都告诉它,它就是一个没有感情的工具人,消费者要是没来取也不关它的事。 -
缺点
- 1.
消息延迟 ,毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了。因此需要降低请求的频率,比如隔个 2 秒请求一次,你看着消息就很有可能延迟 2 秒了。 - 2.
消息忙请求 ,忙请求就是比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功。 - 3.
服务端压力变大 ,长时间消费速度跟不上生产速度,就会使得Broker存储的消息越来越多,极端情况下可能导致服务器内存爆仓 -
适用场景
- 拉模式可以
更合适的进行消息的批量发送 ,基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者到底能不能一次性处理这么多消息。而拉模式就更加合理,它可以参考消费者请求的信息来决定缓存多少消息之后批量发送。
4.架构
4.1基础架构
4.2完整架构
- 为方便扩展,提高吞吐量,一个topic分为多个partition
- 配合分区设计,提出消费者组的概念,组内每个消费者并行消费
一个分区的数据,只能由消费者组内的一个消费者消费,避免重复消费问题 如果没有对消费者分组,那么一个分区的数据可以被多个消费者消费 - 为提高可用性,为每个partition增加若干副本,类似NameNode HA
消费者只能从主partition进行消费,当主partition挂掉后,从partition可成为主partition - Zookeeper(Kafka2.8.0之前必须使用Zookeeper,2.8.0之后可选karft)
- 记录集群中哪些broker服务器上线了
- 记录哪个partition是主
5.下载安装
5.1下载
-
官网地址:https://kafka.apache.org/downloads wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.12-3.0.0.tgz -P /opt/software
-
解压 tar -zxvf /opt/software/kafka_2.12-3.0.0.tgz -C /opt/moudle
-
重命名 mv /opt/moudle/kafka_2.12-3.0.0 /opt/moudle/kafka
5.2修改配置
- 修改
server.properties
broker.id=0
log.dirs=/opt/moudle/kafka/datas
zookeeper.connect=first-node:2181,second-node:2181,third-node:2181/kafka
5.3集群分发
-
分发kafka软件包 xsync /opt/moudle/kafka
-
修改集群其他节配置文件的broker.id ssh second-node "sed -i.bak s/broker.id=0/broker.id=1/ /opt/moudle/kafka/config/server.properties"
ssh third-node "sed -i.bak s/broker.id=0/broker.id=2/ /opt/moudle/kafka/config/server.properties"
-
配置kafka环境变量:sudo vim /etc/profile.d/my_env.sh
export KAFKA_HOME=/opt/moudle/kafka
export PATH=$PATH:$KAFKA_HOME/bin
-
重新加载环境变量 source /etc/profile
-
环境变量配置文件分发
- 如果
root 用户未配置ssh免密登录,此处需要输入密码 sudo /home/fatpuffer/bin/xsync /etc/profile.d/my_env.sh
-
登录集群环境其他节点重新加载环境变量 source /etc/profile
5.4启动kafka
5.4.1首先需要启动ZK集群
zk.sh start
5.4.2启动Kafka
kafka-server-start.sh -daemon /opt/moudle/kafka/config/server.properties
5.4.3停止Kafka
kafka-server-stop.sh
6.Kafka集群
6.1启动停止脚本
-
1.用户bin目录下创建kfk.sh文件:vim /home/fatpuffer/bin/kfk.sh #! /bin/bash
case $1 in
"start"){
for node in first-node second-node third-node
do
echo ----------------------kafka $node start--------------------
ssh $node "kafka-server-start.sh -daemon /opt/moudle/kafka/config/server.properties"
done
};;
"stop"){
for node in first-node second-node third-node
do
echo ----------------------kafka $node stop--------------------
ssh $node "kafka-server-stop.sh"
done
};;
esac
-
2.修改脚本权限 comod +x kfk.sh
-
3.脚本使用 kfk.sh start
kfk.sh stop
6.2注意事项
- 启动Kafka前一定要确保ZK集群已经启动成功
- 必须等待Kafka完全停止,才可以停止Zoopeeper,否则会导致Kafka无法停止
6.3Kafka在ZK集群中的节点
-
查看节点/kafka 信息 [zk: localhost:2181(CONNECTED) 1] ls /kafka
[admin, brokers, cluster, config, consumers, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
-
查看节点/kafka/brokers [zk: localhost:2181(CONNECTED) 16] ls /kafka/brokers
[ids, seqid, topics]
-
查看节点/kafka/brokers/ids [zk: localhost:2181(CONNECTED) 11] ls /kafka/brokers/ids
[0, 1, 2]
-
查看节点/kafka/brokers/topics [zk: localhost:2181(CONNECTED) 17] ls /kafka/brokers/topics
[]
-
获取/kafka/brokers/ids/0 的数据 [zk: localhost:2181(CONNECTED) 13] get /kafka/brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://first-node:9092"],"jmx_port":-1,"features":{},"host":"first-node","timestamp":"1670480025960","port":9092,"version":5}
[zk: localhost:2181(CONNECTED) 14] get /kafka/brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://second-node:9092"],"jmx_port":-1,"features":{},"host":"second-node","timestamp":"1670480032519","port":9092,"version":5}
[zk: localhost:2181(CONNECTED) 15] get /kafka/brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://third-node:9092"],"jmx_port":-1,"features":{},"host":"third-node","timestamp":"1670480038549","port":9092,"version":5}
7.Kafka基本命令行操做
7.1脚本图鉴
7.2主题命令操做
-
命令参数
参数 | 描述 |
---|
–bootstrap-server <String: server connect to> | 连接的 Kafka Broker主机名称和端口 | –topic <String: topic> | 操做的Topic名称 | –create | 创建主题 | –delete | 删除主题 | –alter | 修改主题 | –list | 查看所有主题 | –describe | 查看主题详细描述 | –partitions <Integer: # of partitions> | 设置分区数 | –replication-factor <Integer: replication factor> | 设置分区副本 | –config <String: name=value> | 更新系统默认的配置 | –producer-property | 将自定义属性传递给生成器的机制,形如:key=value | –producer.config | 生产者配置属性文件 [–producer-property] 优先于此配置 | –property | 自定义消息读取器:parse.key=true|false key.separator=<key.separator> ignore.error=true | –request-required-acks | 生产者请求的确认方式:0、1(默认值)、all | –sync | 同步发送消息 |
-
查看当前服务器中的所有topic kafka-topics.sh --bootstrap-server first-node:9092 --list
-
创建一个名为demo1 的主题,设置分区数为1,分区副本为3 kafka-topics.sh --bootstrap-server first-node:9092 --topic demo1 --create --partitions 1 --replication-factor 3
[zk: localhost:2181(CONNECTED) 22] ls /kafka/brokers/topics
[demo1]
[zk: localhost:2181(CONNECTED) 23] ls /kafka/brokers/topics/demo1
[partitions]
[zk: localhost:2181(CONNECTED) 24] ls /kafka/brokers/topics/demo1/partitions
[0]
[zk: localhost:2181(CONNECTED) 25] ls /kafka/brokers/topics/demo1/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 26] ls /kafka/brokers/topics/demo1/partitions/0/state
[]
[zk: localhost:2181(CONNECTED) 27] get /kafka/brokers/topics/demo1
{"removing_replicas":{},"partitions":{"0":[2,1,0]},"topic_id":"D6Lse1d5TduQCu0mh-idwQ","adding_replicas":{},"version":3}
[zk: localhost:2181(CONNECTED) 28] get /kafka/brokers/topics/demo1/partitions/0/state
{"controller_epoch":5,"leader":2,"version":1,"leader_epoch":0,"isr":[2,1,0]}
-
查看demo1 主题详情 kafka-topics.sh --bootstrap-server first-node:9092 --describe demo1
Topic: demo1 TopicId: D6Lse1d5TduQCu0mh-idwQ PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: demo1 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
-
修改主题demo1 分区数为3 kafka-topics.sh --bootstrap-server first-node:9092 --topic demo1 --alter --partitions 3
[zk: localhost:2181(CONNECTED) 34] ls /kafka/brokers/topics
[demo1]
[zk: localhost:2181(CONNECTED) 35] ls /kafka/brokers/topics/demo1
[partitions]
[zk: localhost:2181(CONNECTED) 36] ls /kafka/brokers/topics/demo1/partitions
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 37] ls /kafka/brokers/topics/demo1/partitions/1
[state]
[zk: localhost:2181(CONNECTED) 38] ls /kafka/brokers/topics/demo1/partitions/1/state
[]
[zk: localhost:2181(CONNECTED) 39] get /kafka/brokers/topics/demo1
{"removing_replicas":{},"partitions":{"2":[1,2,0],"1":[0,1,2],"0":[2,1,0]},"topic_id":"D6Lse1d5TduQCu0mh-idwQ","adding_replicas":{},"version":3}
[zk: localhost:2181(CONNECTED) 40] get /kafka/brokers/topics/demo1/partitions/0/state
{"controller_epoch":5,"leader":2,"version":1,"leader_epoch":0,"isr":[2,1,0]}
[zk: localhost:2181(CONNECTED) 41] get /kafka/brokers/topics/demo1/partitions/1/state
{"controller_epoch":5,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1,2]}
[zk: localhost:2181(CONNECTED) 55] get /kafka/brokers/topics/demo1/partitions/2/state
{"controller_epoch":5,"leader":1,"version":1,"leader_epoch":0,"isr":[1,2,0]}
[fatpuffer@first-node ~]$ kafka-topics.sh --bootstrap-server first-node:9092 --describe demo1
Topic: demo1 TopicId: D6Lse1d5TduQCu0mh-idwQ PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: demo1 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: demo1 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: demo1 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
-
生产者向broker中的demo1 主题发送一条消息 kafka-console-producer.sh --bootstrap-server first-node:9092 --topic demo1
-
消费者连接broker,并且监听deno1 主题,broker监控到demo1 主题中有消息产生,则将该消息推送给所有demo1 主题的监听者 kafka-console-consumer.sh --bootstrap-server first-node:9092 --topic demo1
kafka-console-consumer.sh --bootstrap-server first-node:9092 --topic demo1 --from-beginning
-
发送带key 的消息 kafka-console-producer.sh --bootstrap-server first-node:9092 --topic demo1 --producer-property parse.key=true
8.生产者(producer)
8.1发送原理
在消息发送过程中,涉及到了两个线程,main线程 和sender 线程。在main线程中创建了一个双端队列:RecordAccumulator 。main线程将消息发送给:RecordAccumulator,sender 线程不断从:RecordAccumulator中拉取消息发送到:Kafka Broker
8.2API发送流程
-
创建配置对象 -
向配置对象写入kafka集群连接信息 -
向配置对象写入序列化信息 -
根据配置对象,创建生产者对象 -
使用生产者对象发送数据 -
关闭生产者对象 kafka-python:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
8.3生产者分区
8.3.1分区好处
便于合理使用存储资源 ,每个Partion在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡 的效果。提高并行度 ,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据
8.3.2生产者发送消息的分区策略
- 在发送消息时
指定了分区 ,则所有消息发送到该分区 - 在发送消息时
未指定分区 ,但指定了key
- 将key的hash值与topic的partition数进行取余得到partition值
- 例如:key1的hash值为5,key2的hash值为6,topic的partition数为2,那么key1对应的value1就写入1号分区,key2对应的value2写入0号分区
- 在发送消息时
未指定分区 ,也未指定key
Kafka 采用Sticky Partition(黏性分区器) ,会随机选择一个分区,并尽可能一直使用该分区,等待该分区的batch.size 已满或已完成,Kafka 再随机一个分区进行使用(和上一次的分区不同)
- 例如:第一次随机选择0号分区,等0号分区当前批次(batch.size)已经满了(默认16k),或者
linger.ms 设置的时间到了,Kafka 再随机选择一个分区进行使用(如果是0,还会继续随机) - Java:
hashcode ------------------------->Python:ord("a")
8.3.3自定义分区器
- 根据发送数据内容进行过滤,将不同类型数据发送到不同分区
8.4提高吞吐量
- 1.主要涉及两个参数的调优,当
linger.ms 时常,越接近batch.size:16kb ,则效率越高
- 1.
batch.size :传输量,16或32 - 2.
linger.ms :延迟,5-10ms - 2.数据压缩
compression.type :压缩snappy - 3.RecordAccumulator:缓冲区大小,修改为64M
8.5数据可靠性
8.5.1生产者发送过来数据就不管了:ack=0 ,可靠性差,效率高
8.5.2可靠性总结
acks=0 ,生产者发送过来数据就不管了,可靠性差,效率高acks=1 ,生产者发送过来数据,Leader(落盘)应答,可靠性中等,效率中等acks=-1 ,生产者发送过来数据,Leader和ISR队列里面的所有Follower应答,可靠性高,效率低- 生产环境中,
acks=0 很少使用;acks=1 一般使用传输普通日志,允许丢个别;acks=-1 一般用于和钱相关的数据,对可靠性要求比较高的场景。
8.5.2数据重复
1.产生原因
acks=-1(all) :生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐后应答
- 重复原因分析:当Leader收到消息,且已经被其他ISR队列中的Follower同步完成,但是再应答生产者时,Leader挂了,Kafka重新选举出Leader。生产者未收到应答,会再次发送数据,新的Leader拿到数据后,其他ISR队列中的Follower又来拉取一次数据,此时数据就出现了重复。
2.数据传递语义
- 至少一次(At Least Once):ACK级别设置为
-1 + 分区副本大于等于2 + ISR里应答得最小副本数大于等于2 - 最多一次(At Most Once):ACK级别设置为
0
3.总结
At Least Once :可以保证数据不丢失,但是不能保证数据不重复 At Most Once :可以保证数据不重复,但是不能保证数据不丢失
4.精确一次(Exact Once)
- 对于一些非常重要的信息,比如和钱相关的数据,要求数据
既不能重复,也不能丢失 - Kafka 0.11版本以后,引入了一项重大特性:
幂等性和事务
5.幂等性原理
- 幂等性就是指
Producer 不论向Broker 发送多少次重复数据,Broker 端都只会持久化一条 ,保证了不重复。 - 精确一次(Exact Once)= 幂等性 + 至少一次(At Least Once)(
ack = -1 + 分区副本数 >= 2 + ISR最小副本数 >= 2 )
6.重复判断的标准
- 具有
<PID,Partition,SeqNumber> 相同主键的消息提交时,Broker只会持久化一条。
PID 是kafka每次重启都会分配一个新的;Partition 标识分区号;Sequence Number 是单调自增的。 - 所以幂等性
只能保证的是在单分区单会话内不重复
7.开启幂等性
enable.idempotence ,默认为true ,false 为关闭
8.6事务
1.为什么要使用事务
- 幂等性只能保证单分区、单会话内数据不重复,一旦kafka挂掉,重启后
PID 就会变化,此时就无法保证数据不重复 - 如果分区号变化,也依然无法保证数据不重复
2.事务原理
开启事务,必须开启幂等性
3.API操做
8.7.数据有序
8.8.数据乱序
- kafka在1.x版本之前,保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性) - kafka在1.x版本之后,保证数据单分区有序,条件如下:
- 1.未开启幂等性
max.in.flight.requests.per.connection=1 - 2.开启幂等性
max.in.flight.requests.per.connection 需要设置小于等于5- 原因说明:因为在kafka1.x之后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近的5个request的数据是有序的(根据序列号单调递增,如果有序,直接落盘,遇到无序的,先缓存内存中)
9.Broker
9.1.1ZK存储Kafka信息
9.2Broker工作流程
9.3节点服役退役
9.3.1服役新节点
- 准备新节点
- 1.基于历史节点克隆(克隆前关机克隆目标节点)
- 2.修改IP地址
- 3.修改主机名hostname
- 4.重新启动克隆目标节点和克隆节点
- 5.修改克隆后节点kafka配置文件中的
broker.id - 6.删除克隆后节点kafka数据文件(
datas 和logs ) - 7.启动原有集群
- 8.单独启动新克隆节点的kafka
- 执行负载均衡(历史主题等数据,不会出现在新节点上,所以需要使用负载均衡来迁移)
- 1.创建一个要均衡的主题:
vim topics-to-move.json {
"topics": [
"topic": "demo1"
],
"version": 1
}
- 2.生成一个负载均衡计划
/opt/module/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server first-node:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
- 3.创建副本存储计划(所有副本存储在broker0、broker1、broker2、broker3中):
vim increase-replication-factor.json
- 来源于上述命令生成结果:
Proposed partition reassignment configuration {"version":1, "partitions":[{"topics":"demo1", "partition":0, "replicase":[3, 1], "log_dirs":["any", "any"]}, {"topics":"demo1", "partition":1, "replicase":[0, 2], "log_dirs":["any", "any"]}, {"topics":"demo1", "partition":2, "replicase":[1, 3], "log_dirs":["any", "any"]}, {"topics":"demo1", "partition":3, "replicase":[2, 0], "log_dirs":["any", "any"]}, {"topics":"demo1", "partition":4, "replicase":[3, 2], "log_dirs":["any", "any"]}, {"topics":"demo1", "partition":5, "replicase":[0, 3], "log_dirs":["any", "any"]}, {"topics":"demo1", "partition":6, "replicase":[1, 0], "log_dirs":["any", "any"]}]}
- 4.执行副本存储计划
/opt/module/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server first-node:9092 --reassignment-json-file increase-replication-factor.json --execute
- 出现
Successfully started... 即表示执行成功 - 5.验证副本存储计划
/opt/module/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server first-node:9092 --reassignment-json-file increase-replication-factor.json --verify
9.3.1退役旧节点
- 执行负载均衡操做(将要退役节点数据迁移到其他节点)
-
1.创建一个要均衡的主题:vim topics-move.json {
"topics": [
"topic": "demo1"
],
"version": 1
}
-
2.创建执行计划 /opt/module/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server first-node:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
-
3.创建副本存储计划(所有副本存储在broker0、broker1、broker2中):vim increase-replication-factor.json {"version":1, "partitions":[{"topics":"demo1", "partition":0, "replicase":[2, 0, 1], "log_dirs":["any", "any", "any"]}, {"topics":"demo1", "partition":1, "replicase":[0, 1, 2], "log_dirs":["any", "any", "any"]}, {"topics":"demo1", "partition":2, "replicase":[1, 2, 0], "log_dirs":["any", "any", "any"]}]}
-
4.执行副本存储计划 /opt/module/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server first-node:9092 --reassignment-json-file increase-replication-factor.json --execute
- 出现
Successfully started... 即表示执行成功 -
5.验证副本存储计划 /opt/module/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server first-node:9092 --reassignment-json-file increase-replication-factor.json --verify
- 停止退役后的kafka节点
kafka-server-stop.sh
9.4kafka副本
9.4.1副本信息
- Kafka副本作用:提高数据可靠性
- Kafka默认1个副本,生产环境下一般配置2个,保证数据可靠性,太多副本会增加磁盘存储空间,增加网络传输时长,降低效率
- Kafka副本分为
Leader 和Follower 。Kafka生产者只会把数据发往Leader ,然后Follower 找Leader 进行数据同步 - Kafka分区中的所有副本系统称为
AR(Assigned Repllicas)
ISR :标识和Leader保持同步的Follwer 集合,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR队列。该时间的阈值由replica.lag.time.max.ms 参数设定,默认30s。Leader发生故障后,就会从ISR中选出新的LeaderOSR :标识Follower和Leader副本同步时,延时过多的副本
9.4.2选举流程
- Kafka中有一个Broker的
Controller 会被选举为Controller Leader ,负责管理集群broker的上下线 ,所有topic 的分区副本分配和Leader 选举等工作 Controller 的信息同步工作是依赖于Zookeeper 的- 1.启动
Broker 时,在zookeeper 中/kafka/brokers/ids 下进行注册 - 2.每个
Broker 中的Controller 抢先注册zookeeper 中的/kafka/controller 节点,谁抢到了谁就是controller leader - 3.
controller leader 监听/kafka/broker/ids 的变化 - 4.
Broker 节点启动过半后,Controller 开始选举Leader
- 选举规则:在ISR中存活为前提,按照AR中排在前面的优先。例如:ar[1, 0 ,2],isr[1, 0, 2],那么leader就会按照1,0,2的顺序轮询
- 5.
Controller 选举出Leader 后,开始向zookeeper 的/kafka/brokers/topics/demo1/partitions/0/state 节点写入信息 - 6.其他
Broker 上的Controller 从zookeeper 上同步相关信息,防止成为leader的Controller 意外挂掉 - 7.假首次选举出的leader位于
Broker1 ,此时如果Broker1 上的Leader 意外挂了 - 8.此时
Controller 监测到/kafka/brokers/ids 上的变化 - 9.
Controller 从/kafka/brokers/topics/demo1/partitions/0/state 节点获取ISR 信息 - 10.再次按照选举规则,开始新一轮选举
- 11.选举完成后更新
/kafka/brokers/topics/demo1/partitions/0/state 节点信息(主要是leader ,isr )
9.4.3手动调整分区副本存储
9.4.3.1原因
- 在生产环境中,每台服务器的配置和性能可能不一致,但是kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别性能低的服务器存储压力较大,所以需要手动调整分区副本的存储
- 需求:创建一个新的topic,4个分区,2个副本,名称为
demo2 ,将该topic的所有副本都存储到broker0 和broker1 两台服务器上
9.4.3.2流程
- 创建
demo2 主题kafka-topics.sh -bootstrap-server first-node:9092 --create --topic demo2 --partitions 4 --replication-factor 2
- 查看分区副本存储情况
kafka-topics.sh -bootstrap-server first-node:9092 --describe --topic demo2
Topic: demo2 Topicld: -tfY5h@3Rg236b6sooxs7g PartitionCount: 4 ReplicationFactor: 2 configs; segment. bytes=1073741824
Topic: demo2 Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: demo2 Partition: 1 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: demo2 Partition: 2 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: demo2 Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1
- 创建副本存储计划(将所有副本都指定存储在broker0和broker1中):
vim increase-replication-factor.json {
"version": 1,
"partitions": [
{"topic": "demo2", "partition": 0, "replicas": [0, 1]},
{"topic": "demo2", "partition": 1, "replicas": [0, 1]},
{"topic": "demo2", "partition": 2, "replicas": [0, 1]},
{"topic": "demo2", "partition": 3, "replicas": [0, 1]},
]
}
- 执行副本存储计划
kafka-reassign-partitions.sh -bootstrap-server first-node:9092 --reassignment-json-file increase-replication-factor.json --execute
- 验证副本存储计划
kafka-reassign-partitions.sh -bootstrap-server first-node:9092 --reassignment-json-file increase-replication-factor.json --verify
- 查看分区副本存储情况
kafka-topics.sh -bootstrap-server first-node:9092 --describe --topic demo2
9.4.4自动平衡(Leader Partition)
- 正常情况下,kafka本身会
自动把Leader Partition平均分散在各个机器上 ,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上 ,这会导致少数几台broker的读写请求压力过高,其他宕机broker重启后都是follower partition ,读写请求很低,造成负载不均衡。 auto.leader.rebalance.enable :默认是true 。自动平衡(如果机器性能不一致,且做了手动调整分区存储,那么不建议将其设置为true )
leader.imbalance.per.broker.percentage :每个broker允许的不平衡leader的比率,默认10% ,如果超过这个值,控制器会触发leader 的平衡leader.imbalance.check.interval.seconds :检查leader负载是否平衡的间隔时间。- 示例如下:
- 红色圈起来的部分,Leader和AR顺序排列不符,说明集群中有broker曾宕机过
- 针对broker0节点,分区2的AR优先副本是0节点,但是0节点却不是leader节点,所以不平衡数+1,AR副本总数是4
- 所以broker0节点的不平衡率为:1/4=25%,大于10%,会触发再平衡
- broker2和broker3节点和broker0节点不平衡率一样,也会触发再平衡,broker1的不平衡数为0,不需要再平衡
9.4.5增加副本因子
9.4.5.1原因
- 在生产环境中,由于某个主题的重要等级需要提升,我们考虑增加副本,副本数的增加需要先制定计划,然后根据计划执行
9.4.5.2流程
-
创建topic kafka-topics.sh -bootstrap-server first-node:9092 --create --topic demo3 --partitions 3 --replication-factor 1
-
手动增加副本存储,创建副本存储计划(所有副本都指定存储在broker0、broker1、broker2中):vim increase-replication-factor.json {
"version": 1,
"partitions": [
{"topic": "demo3", "partition": 0, "replicas": [0, 1, 2]},
{"topic": "demo3", "partition": 1, "replicas": [0, 1, 2]},
{"topic": "demo3", "partition": 2, "replicas": [0, 1, 2]}
]
}
-
执行副本存储计划 kafka-reassign-partitions.sh -bootstrap-server first-node:9092 --reassignment-json-file increase-replication-factor.json --execute
9.5文件存储
9.5.1文件存储机制
-
Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应一个log文件 ,该log文件中存储的就是producer生产的数据。producer生产的数据会被不断追加到该log文件末端 ,为防止log文件过大导致数据定位效率低下,kafka采取了分片和索引机制,将每个partition分为多个segment ,每个segment 包括:.index 偏移量索引文件,.log 日志文件,和.timeindex 时间戳索引等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区号,例如:demo1-0 。 -
消费者消费完成后不会删除数据,数据默认保存7天,由timeindex 来计算 -
查看.log 文件 kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index Dumping ./00000000000000000000.index
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: 0 isTransactional: false isControl: false position: 75 CreateTime:1670918886067 size: 75 magic: 2 compresscodec: none crc: 73416945 isvalid: true
-
查看.index 文件 kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log Dumping ./00000000000000000000.log
offset: 0 position: 0
9.5.1文件清除策略
- Kafka中
默认的日志保存时间为7天 ,可以通过调整如下参数修改保存时间
log.retention.hours ,最低优先级小时,默认7天log.retention.minutes ,分钟log.retention.ms ,最高优先级毫秒log.retention.check.interval.ms ,负责设置检查周期,默认5分钟 - 那么日志一旦超过了设置时间,怎么处理
- Kafka中提供的日志清理策略有
delete 和compact 两种 delete :日志删除,将过期数据删除
log.cleanup.policy=delete :所有数据启用删除策略- 思考:如果一个
segment 中有一部分数据过期,一部分数据没有过期,怎么处理?
基于时间 :默认打开,以segment中所有记录中的最大时间戳作为该文件时间戳,如果最大时间戳距今超过7天,就直接删除基于大小 :默认关闭,超过设置的所有日志总大小 ,删除最早的segment
log.retention.bytes 默认等于-1 ,标识无穷大 compact :日志压缩(对于相同的key的不同value值,只保留最新的,类似于python字典中的update方法)
- 压缩后
offset 可能是不连续的,当从这些offset中消费消息时,将会拿到比这个offset大的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。 - 这种策略只适合特殊场景,比如消息的key是用户id,value是用户资料,通过这种压缩策略,整个消息集里就保存了用户最新的资料
9.6高效读写数据
- Kafka本身是分布式集群,可以采用分区技术,并行度高
- 读数据采用
稀疏索引 ,可以快速定位药消费的数据 - 顺序写磁盘
- kafka的producer生产数据,要写入log文件中,写的过程是
一直追加 到文件末端,为顺序写。官方测试结果:顺序写能达到600M/s,而随机写只有100K/s。这与磁盘的机械结构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间 - 页缓存 + 零拷贝
零拷贝 :kafka的数据加工处理操做交由kafka生产者和kafka消费者处理,kafka broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。页缓存(PageCache) :kafka重度依赖底层操做系统提供的PageCache 功能,当上层有写操作时,操做系统只是将数据写入PageCache 。当度操做发生时,先从PageCache 中查找,如果找不到,再去磁盘中读取。实际上PageCache 是把尽可能多的空闲内存都当作了磁盘使用
10.消费者(consumer)
10.1Kafka消费方式
pull(拉)模式 :kafka采用
- kafka采用从broker中主动拉取数据
- pull模式的不足之处:如果broker没有数据,消费者可能会陷入循环中,一直返回空
push(推)模式 :
- kafka没有采取这种方式,因为broker决定消息发送速率,很难适应所有消费者的消费速率,例如推送的速度是:
50m/s ,而Consumer的消费能力如果只有20m/s ,则来不及处理
10.2Kafka消费者工作流程
10.2.1消费者总体流程
- 生产者生产消息,发往集群中每个broker上topic的leader分区,follower分区同步数据
- 消费者从topic上的leader分区拉取数据,进行消费
每个消费者可以从多个分区拉取数据,也可以多个消费者从同一个分区消费数据 如果消费者建立分组,那么分组内的多个消费者不可以消费一个分区的数据 - 如果一个消费者消费demo1分区中的数据,消费前该消费者挂了,此时其他消费者怎么知道demo1分区中的数据消费到哪里了?然后从未消费的地方开始消费,避免再次从头开始,出现重复消费
- 新版的kafka有一个默认主题:
__consumer_offsets ,来记录主题内消费偏移信息,即使某一个消费者挂了,其他消费者依然可以从未消费的位置开始消费
10.2.2消费者组原理
Consumer Group(CG) :消费者组由多个consumer组成,形成一个消费者组的条件,是所有消费者的gropuid 相同
消费者组内的每个消费者负责不同分区的数据,一个分区只能由组内的一个消费者消费 消费者组之间互不影响。 所有的消费者都属于某个消费者组。即消费者组是逻辑上的一个订阅者
10.2.3消费者组初始化流程
coordinator :辅助实现消费者组的初始化和分区的分配。
coordinator 节点的选择 = groupid的hashcode值%50
- 为什么是50,
__consumer_offsets的分区数量 = 50 - 例如:groupid的hashcode值=1,1%50=1,那么
__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator 作为这点消费者组的老大,消费者组下的所有消费者提交offset的时候就往这个分区去提交offset
10.2.4消费者组详细消费流程
10.3Kafka消费者API
- 独立消费者(订阅主题)
- 独立消费者(订阅分区)
- 消费者组
10.4Kafka分区的分配以及再平衡
10.4.1分区平衡
- 一个
consumer group 中有多个consumer 组成,一个topic 有多个partition 组成,现在的问题是,到底是由哪个consumer来消费哪个partition的数据
10.4.2分区平衡策略
Range RoundRobin Sticky CooperativeSticky - 可以通过配置参数:
partition.assignment.stratge ,修改分区的配置策略。默认策略是:Range + CooperativeSticky ,kafka可以同时使用多个分区配置策略。
10.4.3Range分区策略
Range是对每个topic而言 - 首先对同一个topic里面的
分区按照序号进行排序 ,并对消费者按照字母顺序排序
- 假设现在有7个分区,3个消费者,排序后的分区将会是:
0,1,2,3,4,5,6 ,消费者排序后:C0,C1,C2 - 通过
partitions数 / consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者会多消费一个分区 。
- 例如,7 / 3 = 2 余 1,那么消费者C0便会多消费一个分区。8 / 3 = 2 余 2,那么C0,C1分别多消费一个分区
注意 :如果只是针对一个topic而言,C0消费者多消费一个分区影响不是很大,但是如果有N多个topic,那么针对每个topic消费者C0都将多消费一个分区,topic越多,C0消费的分区会比其他消费者明显多消费N个分区,容易产生数据倾斜 。- 如果C0消费者挂了,生产者在45s内,发送过来的消息,将全部直接交由存活的消费者中的某一个,而不是均分
- C1:0、1、2、3、4 --------------> C2:4、5
- 或者
- C1:3、4 --------------> C2:0、1、2、4、5
- 超过45s该消费者还未修复的话,就会触发再平衡,此时按照剩余的消费者进行range策略
- C1:0、1、2、3
- C2:4、5、6
10.4.4RoundRobin分区策略
- RoundRobin:针对集群中所有topic而言
- 把所有的partition和所有的consumer都列出来,然后按照hashcode进行排序,最后通过
轮询 算法来分配partition给到各个消费者 - 假使C0这个消费者挂了,生产者在45s内,向C0所负责消费的分区0、3、6,发送过来的消息,将全部直接交由其他存活的消费者(C1、C2),0交由C1、3交由C2、6交由C1
- 超过45s该消费者还未修复的话,就会
触发再平衡 ,此时按照剩余的消费者进行range策略
- C1:0、2、4、6
- C2:1、3、5
- 设置分区分配策略
10.4.5Sticky以及再平衡
- 黏性分区定义:分配的结果带有
黏性的 ,即再执行一次新的分配前,考虑上一次分配的结果,尽量少的调整分配的变动,可以减少大量的开销 - 黏性分区是从kafka0.11版本开始引入的,首先会尽量均衡的放置分区到消费者上,在出现同一消费者组内消费者出现问题的时候,会保持原有分配的分区不会变化
- 消费者
随机均分 分区 - 假使C1这个消费者挂了,生产者在45s内,发送过来的消息,将全部直接交由其他存活的消费者
- 假使C1挂之前分区如下:
- C0:
0、1 分区 - C1:
4、5、6 分区 - C2:
2、3 分区 - C1分区数据超时后交给C0和C2随机均分
- 超过45s该消费者C1还未修复的话,就会
触发再平衡 ,此时按照剩余的消费者进行range策略
- 设置分区分配策略
10.5offset位移
10.5.1概念
- 从0.9版本开始,consumer默认将offset保存在kafka一个
内置的topic 中,该topic为__consumer_offsets - 0.9版本之前,consumer默认将offset保存在zookeeper中(所有消费者都要和zookeeper交互,效率低下)
__consumer_offsets 主题里面采用了key 和value 的方式存储数据,key:group.id + topic + 分区号 ,value 就是当前offset 的值。每隔一段时间,kafka内部会对这个topic 进行compact (压缩:覆盖更新),也就是每个roup.id + topic + 分区号就保留最新数据
10.5.2查看__consumer_offsets 这个系统主题
- 在配置文件
/opt/module/kafka/config/consumer.properties 中添加配置exclude.internal.topics=false ,默认为true,表示不能消费系统主题,为了查看系统主题数据,所以需要修改为false xsync分发脚本 - 查看
__consumer_offsets 主题数据kafka-console-consumer.sh --bootstrap-server first-node:9092 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
10.5.3自动提交
- 为使我们能够能够专注于自己的业务逻辑,kafka提供了自动提交offset的功能
- 自动提交offset相关参数
enable.auto.commit :是否开启自动提交offset 功能,默认是true auto.commit.interval.ms :自动提交offset 的间隔时间,默认是5s
10.5.3手动提交
- 虽然自动提交
offset 十分便利,但由于其是基于时间提交的,开发人员难以把握offset 提交的时机,因此kafka还提供了手动提交offset 的api - 手动提交
offset 的方法有两种,分别是:commitSync(同步提交) 和commitAsync(异步提交) 。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交 ;不同点是,同步提交阻塞当前线程 ,一直到提交成功,并且会自动失败尝试(由不可控因素导致,也会出现提交失败);而异步提交没有失败重试机制,故有可能提交失败 。
commitSync :必须等待offset 提交完毕,再去消费下一批数据。commitAsync :发送完提交offset 请求后,就开始消费下一批数据
10.5.4指定offset消费
- 当kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
auto.offset.reset=earliest|latest|none 默认是latest
earliest :自动将偏移量重置为最早的偏移量,--from-beginning least :自动将偏移量重置为最新偏移量none :如果未找到消费者组的先前偏移量,则向消费者抛出异常
10.5.4指定时间消费
- 需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据
10.5.4漏消费和重复消费
10.6消费者事务
10.6.1条件
- 如果想完成
Consumer 端的精准一次性消费,那么需要kafka消费端将消费过程和提交offset过程做原子绑定 。此时我们需要将kafka的offset保存到支持事务的自定义介质(比如Mysql)。
10.7消费者数据积压
10.7.1消费者如何提高吞吐量
- 如果kafka消费能力不足,则可以考虑
增Topic的分区数 ,并且同时提升消费组的消费者数量,消费者数=分区数 。(两者缺一不可) - 如果是下游数据处理不及时:
提高每批次拉取数据的数量 。批次拉取数量过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压
|