定义
kafka是一个基于发布-订阅模式的高吞吐分布式消息队列,它轻量级,适合大数据处理
kafka的基础架构
-broker
kafka服务器,负责消息存储和转发
-topic
生产者将数据发送到topic,消费者可以订阅这些topic
-partition
为了提高topic的吞吐量,并发量,kafka将topic分成多个partition
且同一topic下的partition可以分布到不同的broker上
offset
消息在partition上的偏移量
zookeeper
1)保存着集群broker、topic、partition等元数据
2)另外还负责broker上下线、故障发现、leader选举、负载均衡
负载均衡主要体现在
当生产者生产消息时,通过hash或者轮询的方式尽量将消息投递到不同broker的partition里
当消费者消费消息时,zookeeper根据当前的partition以及consumer数量来实现动态负载均衡
如何判断一个节点是否还存活
两个条件,缺一不可
1)节点必须正常和zookeeper正常连接,zookeeper会通过心跳机制检查每个节点是否正常
2)如果节点是follower,那么它必须能及时和leader同步写操作,不能延迟太久
kafka和传统mq的区别
1)kafka吞吐量高,内部吃用零拷贝技术以及顺序存储/读取磁盘操作
2)依赖于zookeeper,通过zookeeper来存储一些元数据以及实现负载均衡
3)kafka是一个分布式系统,它以集群方式运行,在内部通过复制数据提升容错能力和高可用性
consumer group
同一topic下的同一partition只能被消费者内的一个消费者消费,且此时消费是有序的
消费者通过pull方式拉取消息,kafka不会删除已消费的消息,后面有新的消息产生还要顺序写磁盘
pull方式拉取消息时会根据自己的消费能力去拉取消息,可以保证消息不丢失
但是它有一个缺点,如果kafka没有数据,那么消费者将会频繁地返回空数据,这样很消耗资源
针对这一点,消费者在消费数据时会传入一个时长timeout,如果拉不到数据,那么消费者会等待一段时间再返回数据(空数据)
ISR、OSR、AR、LEO、HW分别代表什么意思
ISR:能跟leader保持同步的follower集合
OSR:不能跟leader保持同步的follower集合,新加入的follower也会被放入OSR
AR:分区的所有副本,AR=ISR+OSR
LEO:表示当前副本日志文件下一条待写入的offset
HW:表示一个分区里所有副本最小的offset,即HW=最小的LEO
消费者最多只能消费到HW所在位置的上一条消息
特别提醒:HW/LEO这两个都是指最后一条的吓一跳位置而不是指最后一条的位置
broker controller
第一个创建临时节点的broker将会是broker controller
broker controller也算是一种leader,如果发生故障,将会通知所有follower
然后再次竞争在该路径下创建节点,第一个创建的是leader,这也是broker leader的选举
作用:
broker controller 负责管理集群里broker的上下线(zookeeper的watch机制会通知controller)
既然有broker的上下线,那么分区就需要重新分配
新的broker要放分区副本放到里面去,下线的broker里的分区需要放到旧的分区里
新的broker里的follower副本往往会被选举成leader分区,为了负载均衡,降低一下其他broker的请求压力
因此broker controller还可以主持leader的选举,当然如果分区leader发生故障,那么也可以由它来主持选举(leader选举可以看下面)
kafka的文件存储机制  一个segment文件对应三个文件".index"文件,".log",".timeIndex"文件
log文件就是存储数据的地方,生产者生产的消息会不断追加到log文件末尾,为了防止segment文件过大,kafka采取了分段和索引机制
分段:将一个分区分成多个segment(逻辑分组,并不是真实存在)
索引:利用.index和timeIndex文件来进行索引搜索
PS:
同一分区对应的多个segment对应的多个index文件和log文件都放在同一个文件夹下
文件夹命名规则:topic名称+分区号
例如first这个topic有三个分区,那么对应partition存储的segment文件夹
first-0,first-1,first-2
index和log文件命名规则:index和log文件都是以当前segment的第一条数据的offset命名
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
如何找到offset=3的信息
-首先根据二分法找到对应index文件,用查找的offset值减去文件名的offset值,那么将得到offset=3的信息对应的索引值,例如等于3
如果没找到对应的索引值,由于index采用稀疏索引,它不会为每一条数据都建立索引
优点:可以减少存储空间
缺点:可能没办法一次性定位到数据的位置,需要再进一次扫描index文件,这次index文件会缩小范围
-通过索引可以得到对应数据的物理偏移地址值
-通过物理偏移地址值去log文件找到对应的数据  如何保证生产者-kafka这个过程数据传输的可靠性
当分区确定收到生产者传来的数据后,leader副本都会回传一个ack给生产者告知数据已收到
只有生产者收到ack才会进行下一轮发送,否则会重复发送
什么时候发送ack
假装这是非常重要的数据,那么kafka规定:当leader副本接收到数据后,只有全部follower副本和leader副本同步完成后才发送ack给生产者
但这样很容易出现问题,就是一个follower同步失败,那么ack发送不过去给生产者,那么生产者将会重新发数据
-ack=0,消息发送出去就当做kafka能正常接收了,这种模式下生产者都只会发送一次数据,无论kafka有没有接收到,at most once语义
这种模式下,如果leader副本还没拿到数据就发生故障了,那么将会数据丢失
-ack=1(默认模式),值等于1表示只要一个副本写入成功即可,这个副本必须是leader副本
如果follower还没同步成功,leader副本所在的服务器宕机了,那么将会数据丢失
-ack=-1,值等于-1表示是全部,只有分区里的eader副本、ISR里的follower全部同步完成才会返回ack,即at least once语义
如果同步完成,broker发送ack之前,leader发生故障,那么生产者将收不到ack,那么会重复发送数据,最后将会造成数据重复
kafka主从同步
-生产者写入消息,leader更新消息
-leader通知follower来取消息
-follower同步消息的速度是不一样的,以所有的follower最小的LEO为准,leader会更新HW值
-当所有的follower同步完毕后,leader会更新HW值
follower故障
当follower发生故障时它会被临时踢出ISR,进入OSR,待follower正常运行时,它会读取该分区上的HW,并将log文件里高于HW部分的数据截取掉
截取掉完成后再跟leader同步,当该follower的leo>=hw时,就可以重新加入ISR了
当leader没有写入新的数据时,那么该follower的leo就会等于hw
当leader写入新的数据且该follower同步较快时,那么leo将会大于hw
leader故障
当leader发生故障后,broker controller会从ISR里选出一个新的leader出来,然后让follower把HW的部分截掉(leader并没有截掉,它任然保留着高于HW的部分),最后再从新的leader里同步数据
这只能保证数据的一致性,不能保证数据不丢失或者数据不重复
同一topic下的一个分区只能被消费组里的一个消费者消费
不同的消费者组拿取数据是互不影响的,但是同一topic下的一个分区只能被消费组里的一个消费者消费
消费组中的消费者是如何确定自己该消费哪些分区数据的?
分区分配策略
Range(按范围)、RoundRobin(轮询)
-Range(默认),分区连续分
分区数/消费者数来决定每个消费者消费几个分区,除不尽的前面的几个消费者会多消费一个
例如:消费者组CG1里有C0和C1两个消费者,kafka里的topic为t1,他有11个分区
CG1里有三个线程C0-0、C1-0、C1-1
由于11除3除不尽,因此C0-0、C1-0都会多一个,结果如下:
C0-0将消费0,1,2,3分区
C1-0将消费4,5,6,7分区
C1-1将消费8,9,10分区
如果现在存在有2个Kafka topic(t1和t2),它们都有有10个partition,那么最后分区结果为:
C0-0 将消费t1主题的0、1、2、3分区以及t2主题的0、1、2、3分区
C1-0 将消费t1主题的4、5、6分区以及t2主题的4、5、6分区
C1-1 将消费t1主题的7、8、9分区以及t2主题的7、8、9分区
不同topic对于同一消费者组是互不影响的,同一topic对不同消费者组是互不影响的
-RoundRobin(轮询),分区一个一个分
把同一topic下的分区和消费者组内的消费者列出来,一个消费者分一个分区,尽可能把分区均匀分配给消费者
例如一个topic里有6个分区,同一消费者组里有三个消费者
那么第一个消费者是0,3,6,第二个1,4,第三个2,5
消费者组内消费者发生增删或者被消费者组订阅的分区发生变化时,都会重新分配分区
kafka为什么读写这么高效
1)分区,提高高并发,提高吞吐量
2)顺序写磁盘到数据
生产者将数据传到kafka,kafka会把数据追加到log文件末端
相同的磁盘,顺序写的速度为600m/s,随机写只有100k/s
随机写慢是因为要写进一个位置就需要判断这个位置是不是为空,使用顺序写则不需要这样,节省大量时间
3)零拷贝技术
零拷贝技术将磁盘里的数据拿到页面缓存,然后将页面缓存里的数据发送给消费者,避免一直去磁盘里拷贝数据
kafka事务
事务可以把一系列的操作当作原子来操作,要么成功,要么失败;当生产实例宕机重启后,可以继续对事务进行恢复,进行事务提交或者事务回滚
应用场景
-如果需要生产者发送的多条消息(这些消息可以跨topic、跨分区,这就是分布式事务)对消费者同时可见或者不可见,则可以使用事务
-如果需要先消费一个topic,消费完再把消息发到另一个topic,这就是一个consumer-transform-produce(生产消息和消费消息并存),需要把它放到一个事务里面,如果处理过程中有一点出错那么就得全部失败
相关概念
-事务协调者:TC,是分配pid和管理事务的核心
1)将pid和transaction id的映射关系存储到事务topic里
2)可以恢复(commit或者abort)之前producer未完成的事务,并对
-事务topic:transaction log,由TC写入,记录事务的状态
-pid:一个pid就是一个生产者,同一生产者不同实例具有相同的pid,由TC生成
-sequence number:生产者发送给同一topic里的同一分区就是一个seq num,它递增
-transaction id:一个事务必须要有一个事务id,方便生产者宕机重启后对事务进一步操作,提交或者回滚
-epoch:一个生产者的不同实例拥有一个epoch,用来标识同一transaction id里的新旧,同一请求的旧请求会被抛弃
发起旧事务请求被称作僵尸实例,当一个生产者实例宕机,然后又创建一个生产实例,此时旧的实例恢复了,这个旧的实例就是僵尸实例,它会kafka识别抛弃,kafka就是根据epoch的大小来识别的,新的epoch值比较大
-幂等性:幂等性可以保证同一分区里只会保存同一生产者提交重复提交的消息的一条
事务依赖于幂等性,幂等性是事务的基础
而幂等性不依赖于事务,开启事务就默认开启幂等性enable.idempotence=true,除非显式把幂等性设置成false
-因为事务存在commit和abort两种操作,而消费者又存在read committed和read uncommitted两种隔离级别
所以kafka必须识别事务状态,称为controller message
事务执行的流程
生产者首先通过请求一个broker得到事务协调者的ip地址,如果是第一次请求,那么TC将会返回一个新的pid和epoch(值等于-1,用于校验新老事务),并把pid和事务id的映射关系存储到事务id里;如果不是第一次请求,那么根据事务id返回pid和epoch;
然后生产者将涉及到的分区传给tc,tc将这些分区存储到事务topic里,方便后续提交以及回滚
生产者通过一个或者多个productrequest将数据,pid,seqnum,epoch发送给kafka,为了实现生产者-消费者转换,消费者需要将自己的offset自己数据提交给kafka,然后将消费者的offset以及生产者发送的消息放到一个事务里,最后再进行一个提交或者放弃
kafka如何保证数据的有序性/顺序消费
背景
kafka只能保证在一个topic里的同一个分区顺序执行,在不同分区不保证顺序执行
假设可以实现多个分区有序,那么如果在消费分区1时堵住,为了保证有序,那么后续的一系列分区将不能被消费,这种情况下,kafka将退化成单一队列,这个跟kafka的追求高吞吐的理念违背,并发性几乎为0,降低系统性能
因此kafka只保证单个partition有序(通过加锁的方式)
解决方式
既然明白只能单个分区保持有序性,那么我们将数据发送到一个分区即可
可以指定某个分区,或者书写某个key即可实现将数据都送往某一个分区
而在消费者端,如果觉得单线程处理数据太慢的话,可以通过多线程的方式
消费者端启动多个线程,每个线程维护专属的kafkaconsumer实例,负责完整的消息获取、消息处理
kafka如何保证数据不丢失/不漏消费
1、消息发送丢失数据的场景
1)acks=0,即在异步情况下,缓冲池满了且数据还没有发送出去,如果设置的是立即清理模式,那么数据将会丢失
2)在同步模式下,当消息确认acks=1时,leader返回消息确认给生产者后挂掉,副本没有进行同步,消息会丢失
解决方法
1)在异步模式下,设置好缓冲池参数,让缓冲池满时,设置成阻塞模式而不是立即清理模式
2)在同步模式下,将acks设置成-1,即消息写入leader和所有follower后才给生产者确认(可能会造成重复消费)
2、消息消费丢失数据的场景
假设设置自动提交offset,当消费者拉取到分区的某个消息后,刚准备进行消费,消费者挂掉了,然后又自动提交了offset,但这条数据实际上没被消费
解决方法
关闭自动提交offset,每次在真正消费完消息后再手动提交offset
但这同样可能会造成数据的重复消费,当消费完消息,还没提交offset,结果自己挂掉了,那么这个消息理论上会被消费两次
手动提交offset该怎么做
将auto.commit.offset设为false,处理完一批消息后同步提交或者异步提交
kafka如何保证数据不重复消费
有以下两种场景可能导致kafka重复消费
1、kafka保存一份数据多次
生产者多次传相同数据给kafka,kafka保存同份数据多次
解决方法
实现exactly once语义,即at least once+幂等性
at least once,即acks=-1
幂等性通过两个参数来实现
-pid,生产者id
-seqnum,序列号,每个pid发送数据给分区时都会带一个序列号值
当传来的序列号=broker里保存的序列号+1,broker才会接收这条数据
但是生产者重启时,pid会发生变化
exactly once=幂等性+at least once语义
2、消费者消费同一份数据多次
当消费者消费了某条offset数据时,刚准备提交offset到zookeeper,此时消费者进程被重启了
因此该offset的值并没有提交,kafka并不知道你消费过这条数据
下次消费者重启后,会找kafka继续发送上次那个offset对应的数据,因此数据会再次被消费
解决方法
结合业务,使用外部介质,例如mysql、redis
例如
消费程序是往数据库里添加某条数据,每次添加之前判断该数据是否已经存在于数据库,如果存在则不让它插入
或者消费者拿到一个订单数据,拿到这个数据后去redis里查一下,redis里的数据使用天然幂等性——set类型进行存储,如果这个订单id不在set集合里,那么对这条数据进行处理,最后把这个订单id写到redis里
如果这个订单id存在于set集合里,那么不处理这条数据
生产者发送消息的方式
1、生产者同步发送消息
生产者发送一条消息时,它就立马发送到某个分区去。如果acks=-1,那么leader和follower写入成功后才给生产者发送确认,这套流程很长,因此会很慢
如果acks=1,则可能存在着数据丢失的情况
2、生产者异步发送消息
主线程、sender线程,缓冲区RecordAccumulator
主要流程:
1)主线程调用send方法发送消息时,首先会先通过过滤器来过滤消息,再者将消息转成二进制,接着判断数据应该发送到哪个分区,根据指定分区/key/轮询的方式得到对应的分区
2)接着消息会追加到RecordAccumulator缓冲区,该缓冲区里有很多队列,找到对应分区的队列,把消息加上去
3)然后sender线程不断从缓冲区中拉取消息并把消息发送到broker去  分区能不能增删
可以增,但不能删,删了后数据很难处理
kafka的内部topic
__consumer_offsets,以双下划线开头,保存消费组的偏移量(0.9之前,offset是保存在zookeeper里)
kafka选举的地方
broker controller、分区leader
kafka分区数的设置
kafka分区数一般不要超过broker数量,因为超过没什么意义,分区数一般设置成3-10
|