IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka相关问题 -> 正文阅读

[大数据]Kafka相关问题

Kafka相关问题<一>

Kafka为什么吞吐量大、速度快?

一、顺序读写

在这里插入图片描述

   众所周知Kafka是将消息记录持久化到本地磁盘中的,一般人会认为磁盘读写性能差,可能会对Kafka性能如何保证提出质疑。实际上不管是内存还是磁盘,快或慢关键在于寻址的方式,磁盘分为顺序读写与随机读写,内存也一样分为顺序读写与随机读写。基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写。
   磁盘的顺序读写是磁盘使用模式中最有规律的,并且操作系统也对这种模式做了大量优化,Kafka就是使用了磁盘顺序读写来提升的性能。Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升 。

二、Page Cache

    为了优化读写性能,Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存。这样做的好处有:
    1避免Object消耗:如果是使用 Java 堆,Java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多。
    2避免GC问题:随着JVM中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在GC问题
     相比于使用JVM或in-memory cache等数据结构,利用操作系统的Page Cache更加简单可靠。首先,操作系统层面的缓存利用率会更高,
因为存储的都是紧凑的字节结构而不是独立的对象。其次,操作系统本身也对于Page Cache做了大量优化,
提供了 write-behind、read-ahead以及flush等多种机制。
再者,即使服务进程重启,系统缓存依然不会消失,避免了in-process cache重建缓存的过程。
    通过操作系统的Page Cache,Kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。 

三、零拷贝

在这里插入图片描述

通过这种 “零拷贝” 的机制,Page Cache 结合 sendfile 方法,Kafka消费端的性能也大幅提升。这也是为什么有时候消费端在不断消费数据时,
我们并没有看到磁盘io比较高,此刻正是操作系统缓存在提供数据。

当Kafka客户端从服务器读取数据时,如果不使用零拷贝技术,那么大致需要经历这样的一个过程:

1.操作系统将数据从磁盘上读入到内核空间的读缓冲区中。

2.应用程序(也就是Kafka)从内核空间的读缓冲区将数据拷贝到用户空间的缓冲区中。

3.应用程序将数据从用户空间的缓冲区再写回到内核空间的socket缓冲区中。

4.操作系统将socket缓冲区中的数据拷贝到NIC缓冲区中,然后通过网络发送给客户端。

从图中可以看到,数据在内核空间和用户空间之间穿梭了两次,那么能否避免这个多余的过程呢?当然可以,Kafka使用了零拷贝技术,
也就是直接将数据从内核空间的读缓冲区直接拷贝到内核空间的socket缓冲区,然后再写入到NIC缓冲区,避免了在内核空间和用户空间之间穿梭。

可见,这里的零拷贝并非指一次拷贝都没有,而是避免了在内核空间和用户空间之间的拷贝。
如果真是一次拷贝都没有,那么数据发给客户端就没了不是?不过,光是省下了这一步就可以带来性能上的极大提升。

四、分区分段+索引

 Kafka的message是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的。这也非常符合分布式系统分区分桶的设计思想。
          通过这种分区分段的设计,Kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。为了进一步的查询优化,Kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度

五、批量读写

Kafka数据读写也是批量的而不是单条的。
        除了利用底层的技术外,Kafka还在应用程序层面提供了一些手段来提升性能。最明显的就是使用批次。在向Kafka写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。假设网络带宽为10MB/S,一次性传输10MB的消息比传输1KB的消息10000万次显然要快得多。

六、批量压缩

在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行
数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。

如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩
Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩
Kafka支持多种压缩协议,包括Gzip和Snappy压缩协议
     Kafka速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,
写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。

kafka的produce端的分区策略

第一种分区策略:给定了分区号,直接将数据发送到指定的分区里面去

第二种分区策略:没有给定分区号,给定数据的key值,通过key取上hashCode进行分区

第三种分区策略:既没有给定分区号,也没有给定key值,直接轮循进行分区

第四种分区策略:自定义分区, producer.send(new ProducerRecord<String, String>("test",i+""));

kafka消费者的方式 三种分区分配策略

第一种Range分配策略

Range分配策略是面向每个主题的,首先会对同一个主题里面的分区按照序号进行排序,并把消费者线程按照字母顺序进行排序。
然后用分区数除以消费者线程数量来判断每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
我们假设有3个主题(T1,T2,T3),都有7个分区,那么按照咱们上面这种Range分配策略分配后的消费结果如下:
消费者线程	对应消费的分区序号
C0-0	T1(0,1,2),T2(0,1,2),T3(0,1,2)
C1-0	T1(3,4),T2(3,4),T3(3,4)
C1-1	T1(5,6),T2(5,6),T3(5,6)
我们可以发现,在这种情况下,C0-0消费线程要多消费3个分区,这显然是不合理的,其实这就是Range分区分配策略的缺点。

第二种RoundRobin分配策略

RoundRobin策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典排序,然后通过轮询算法逐个将分区以此分配给每个消费者。
使用RoundRobin分配策略时会出现两种情况:
如果同一消费组内,所有的消费者订阅的消息都是相同的,那么 RoundRobin 策略的分区分配会是均匀的。
如果同一消费者组内,所订阅的消息是不相同的,那么在执行分区分配的时候,就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个
消费者没有订阅消费组内的某个 topic,那么在分配分区的时候,此消费者将不会分配到这个 topic 的任何分区。

第三种Sticky分配策略

最后介绍一下Sticky分配策略,这种分配策略是在kafka的0.11.X版本才开始引入的,是目前最复杂也是最优秀的分配策略。Sticky分配策略的原理比较复杂,它的设计主要实现了两个目的:
1)分区的分配要尽可能的均匀;
2)分区的分配尽可能的与上次分配的保持相同。
3)如果这两个目的发生了冲突,优先实现第一个目的。

Kafka如何保证消息的顺序性

? 一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
? 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对应 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。

kafka消息顺序
	消息重试对顺序消息的影响 对于一个有着先后顺序的消息A、B,正常情况下应该是A先发送完成后再发送B,但是在异常情况下,在A发送失败的情况下,B发送成功,而A由于重试机制在B发送完成之后重试发送成功了。这时对于本身顺序为AB的消息顺序变成了BA
	消息producer发送逻辑的控制 消息producer在发送消息的时候,对于同一个broker连接是存在多个未确认的消息在同时发送的,也就是存在上面场景说到的情况,虽然A和B消息是顺序的,但是由于存在未知的确认关系,有可能存在A发送失败,B发送成功,A需要重试的时候顺序关系就变成了BA。简之一句就是在发送B时A的发送状态是未知的。
	针对以上的问题,严格的顺序消费还需要以下参数支持:max.in.flight.requests.per.connection
大体意思是: 在发送阻塞前对于每个连接,正在发送但是发送状态未知的最大消息数量。如果设置大于1,那么就有可能存在有发送失败的情况下,因为重试发送导致的消息乱序问题。所以我们应该将其设置为1,保证在后一条消息发送前,前一条的消息状态已经是可知的。

Consumer 消费(数据)一致性

在这里插入图片描述

	HW(High Watermark)俗称高水位,consumer能够消费kafka的最大offset
	LEO (Log End Offset)已经写入kafka log中日志的最大offset值加1
	AR (Assigned Replicas) kafka 分区的所有副本
	ISR (In Sync Replicas) 在kafka中,所有与leader保持数据同步的副本(Replica)
	
    世上没有100%绝对可靠的系统。万一kafka分区的某个leader节点挂了,还能保证消费者不重复消费,也不多消费吗?
先回头看上面的HW(High Watermark)定义,也就是consumer能够消费的最,大offset。
为验证高可用性,我们来举个反例。上图中,虚线淡黄色的块表示还未从leader同步的数据,按照我们上面说的,consumer只能消费HW线之前的数据。 consumer正要消费record1-record3的时候,假设这时leader所在的broker突然挂了,“优胜劣汰”的原则,跑在前面的follower1会被选举成新的leader。
这时你可能会问?为什么不可以把follower1的最大值作为HW,这样可以消费到record4呢?
     再极端一点,假如follower1也同时挂了呢?如果以follower1的record4为HW的话,当leader切换成follower2时,follower2并未收到最初始leader同步过来的record4,所以会造成丢数据!
综上所述,HW必须是以跑的最慢的follower来定,HW机制也很好的保证了消费端的一致性,不会造成consumer少消费数据。但是,这样换区的强一致性必将带来性能上的损耗,很明显的,性能一定的损耗
在leader与follower直接的数据同步,这点损耗在大多数场景中还是可以接受的,换来的是kafka的高可用、一致性。
假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。

最低水位线(Low-Water Mark)
最低水位线是指在 WAL(Write Ahead Log)预写日志这种设计模式中,标记在这个位置之前的日志可以被丢弃。
问题背景WAL(Write Ahead Log)预写日志维护了对于存储的每次更新,随着时间不断增长,这个日志文件会变得无限大。Segmented Log 分割日志这种设计模式可以让我们每次只处理一个更小的文件,但是日志如果不清理,会无休止增长以至于硬盘被占满。

解决方案
最低水位线这种设计模式会告诉系统哪一部分的日志可以被删除了,即在最低水位线之前的所有日志可以被清理掉。一般的方式是,程序内有一个线程运行一个定时任务,不断地检查哪一部分的日志可以被清理并且删除这些日志文件。

kafka follower如何与leader同步数据

Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。
完全同步复制要求All Alive Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率。
而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下,如果leader挂掉,会丢失数据,kafka使用ISR的方式很好的均衡了确保数据不丢失以及吞吐率。
Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差。

kafka producer如何优化打入速度

1) 增加线程   
2) 提高 batch.size     
3) 增加更多 producer 实例  
4) 增加 partition 数  
5) 设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解;
    跨数据中心的传输:增加 socket 缓冲区设置以及 OS tcp 缓冲区设置。

kafka producer 打数据,ack 为 0, 1, -1 的时候代表啥

1(默认)  数据发送到Kafka后,经过leader成功接收消息的的确认,就算是发送成功了。在这种情况下,如果leader宕机了,则会丢失数据。
0 生产者将数据发送出去就不管了,不去等待任何返回。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
-1 producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。当ISR中所有Replica都向Leader发送ACK时,	leader才commit,这时候producer才能认为一个请求中的消息都commit了。

kafka 选举机制

控制器(Broker)选举

所谓控制器就是一个Borker,在一个kafka集群中,有多个broker节点,但是它们之间需要选举出一个leader,其他的broker充当follower角色。集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller来让自己成为控制器,其他broker启动时也会在zookeeper中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在zookeeper中创建watch对象,便于它们收到控制器变更的通知。那么如果控制器由于网络原因与zookeeper断开连接或者异常退出,那么其他broker通过watch收到控制器变更的通知,就会去尝试创建临时节点/controller,如果有一个broker创建成功,那么其他broker就会收到创建异常通知,也就意味着集群中已经有了控制器,其他broker只需创建watch对象即可。

如果集群中有一个broker发生异常退出了,那么控制器就会检查这个broker是否有分区的副本leader,如果有那么这个分区就需要一个新的leader,此时控制器就会去遍历其他副本,决定哪一个成为新的leader,同时更新分区的ISR集合。
如果有一个broker加入集群中,那么控制器就会通过Broker ID去判断新加入的broker中是否含有现有分区的副本,如果有,就会从分区副本中去同步数据。
集群中每选举一次控制器,就会通过zookeeper创建一个controller epoch,每一个选举都会创建一个更大,包含最新信息的epoch,如果有broker收到比这个epoch旧的数据,就会忽略它们,kafka也通过这个epoch来防止集群产生“脑裂”。

分区副本选举机制

在kafka的集群中,会存在着多个主题topic,在每一个topic中,又被划分为多个partition,为了防止数据不丢失,每一个partition又有多个副本,在
整个集群中,总共有三种副本角色:
首领副本(leader):也就是leader主副本,每个分区都有一个首领副本,为了保证数据一致性,所有的生产者与消费者的请求都会经过该副本来
处理。
跟随者副本(follower):除了首领副本外的其他所有副本都是跟随者副本,跟随者副本不处理来自客户端的任何请求,只负责从首领副本同步数
据,保证与首领保持一致。如果首领副本发生崩溃,就会从这其中选举出一个leader。
首选首领副本:创建分区时指定的首选首领。如果不指定,则为分区的第一个副本。
follower需要从leader中同步数据,但是由于网络或者其他原因,导致数据阻塞,出现不一致的情况,为了避免这种情况,follower会向leader发送
请求信息,这些请求信息中包含了follower需要数据的偏移量offset,而且这些offset是有序的。

如果有follower向leader发送了请求1,接着发送请求2,请求3,那么再发送请求4,这时就意味着follower已经同步了前三条数据,否则不会发送请
求4。leader通过跟踪 每一个follower的offset来判断它们的复制进度。
默认的,如果follower与leader之间超过10s内没有发送请求,或者说没有收到请求数据,此时该follower就会被认为“不同步副本”。而持续请求的副本就是“同步副本”,当leader发生故障时,只有“同步副本”才可以被选举为leader。其中的请求超时时间可以通过参数replica.lag.time.max.ms参数
来配置。
我们希望每个分区的leader可以分布到不同的broker中,尽可能的达到负载均衡,所以会有一个首选首领,如果我们设置参数auto.leader.rebalance.enable为true,那么它会检查首选首领是否是真正的首领,如果不是,则会触发选举,让首选首领成为首领。

消费组选主

在kafka的消费端,会有一个消费者协调器以及消费组,组协调器GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,那么如何选举的呢?
如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader,如果某一个时刻leader消费者由于某些原因退出了消费组,那
么就会重新选举leader,如何选举?
private val members = new mutable.HashMap[String, MemberMetadata]
leaderId = members.keys.headOption
上面代码是kafka源码中的部分代码,member是一个hashmap的数据结构,key为消费者的member_id,value是元数据信息,那么它会将leaderId选
举为Hashmap中的第一个键值对,它和随机基本没啥区别。

kafka查看topic数据消费情况?

bin/kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9092 --group invoiceUpload

Kafka 的 ISR 机制是什么?

 Kafka 的核心机制,就是 ISR 机制。这个机制简单来说,就是会自动给每个 Partition 维护一个 ISR 列表,这个列表里一定会有 Leader,然后还会包含跟 Leader 保持同步的 Follower。也就是说,只要 Leader 的某个 Follower 一直跟他保持数据同步,那么就会存在于 ISR 列表里。但是如果 Follower 因为自身发生一些问题,导致不能及时的从 Leader 同步数据过去,那么这个 Follower 就会被认为是“out-of-sync”,被从 ISR 列表里踢出去。所以大家先得明白这个 ISR 是什么,说白了,就是 Kafka 自动维护和监控哪些 Follower 及时的跟上了 Leader 的数据同步。

Kafka 写入的数据如何保证不丢失?

如果要让写入 Kafka 的数据不丢失,你需要保证如下几点:
每个 Partition 都至少得有 1 个 Follower 在 ISR 列表里,跟上了 Leader 的数据同步。
每次写入数据的时候,都要求至少写入 Partition Leader 成功,同时还有至少一个 ISR 里的 Follower 也写入成功,才算这个写入是成功了。
如果不满足上述两个条件,那就一直写入失败,让生产系统不停的尝试重试,直到满足上述两个条件,然后才能认为写入成功。
按照上述思路去配置相应的参数,才能保证写入 Kafka 的数据不会丢失。
好!现在咱们来分析一下上面几点要求。
第一条,必须要求至少一个 Follower 在 ISR 列表里。
那必须的啊,要是 Leader 没有 Follower 了,或者是 Follower 都没法及时同步 Leader 数据,那么这个事儿肯定就没法弄下去了。
第二条,每次写入数据的时候,要求 Leader 写入成功以外,至少一个 ISR 里的 Follower 也写成功。
大家看下面的图,这个要求就是保证说,每次写数据,必须是 Leader 和 Follower 都写成功了,才能算是写成功,保证一条数据必须有两个以上的副本。这个时候万一 Leader 宕机,就可以切换到那个 Follower 上去,那么 Follower 上是有刚写入的数据的,此时数据就不会丢失了。

Kafka Rebalance?

什么是 Rebalance
	Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。
	例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具有 100 个 partition 的 Topic 。
正常情况下,kafka 会为每个 Consumer 平均的分配 5 个分区。这个分配的过程就是 Rebalance。
触发 Rebalance 的时机

Rebalance 的触发条件有3个。
	1)组成员个数发生变化。例如有新的 consumer 实例加入该消费组或者离开组。
	2)订阅的 Topic 个数发生变化。
	3)订阅 Topic 的分区数发生变化。
Rebalance 发生时,Group 下所有 consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。但是 Rebalance 过程对 consumer group 会造成比较严重的影响。在 Rebalance 的过程中 consumer 
group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成。
Rebalance 过程分析
Rebalance 过程分为两步:Join 和 Sync。

Join 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,
coordinator会从中选择一个consumer担任leader的角色,并把组成员信
息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。
Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。
一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容
为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

如何避免不必要的rebalance
要避免 Rebalance,还是要从 Rebalance 发生的时机入手。我们在前面说过,Rebalance 发生的时机有三个:

组成员数量发生变化
订阅主题数量发生变化
订阅主题的分区数发生变化
后两个我们大可以人为的避免,发生rebalance最常见的原因是消费组成员的变化。

消费者成员正常的添加和停掉导致rebalance,这种情况无法避免,但是时在某些情况下,Consumer 实例会被 Coordinator 错误地认为 “已停止” 从而被“踢出”Group。从而导致rebalance。

当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 
已经 “死” 了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。这个时间可以通过Consumer 端的参数 session.timeout.ms进行配置。默认值是 10 秒。

除了这个参数,Consumer 还提供了一个控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但
好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。

除了以上两个参数,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认
值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起 “离开组” 的请求,Coordinator 也会开启新一轮 Rebalance。

通过上面的分析,我们可以看一下那些rebalance是可以避免的:

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被 “踢出”Group 而引发的。这种情况下我们可以设置 session.timeout.ms 和 heartbeat.interval.ms 的值,来尽量避免rebalance的出现。(以下的配
置是在网上找到的最佳实践,暂时还没测试过)

设置 session.timeout.ms = 6s。
设置 heartbeat.interval.ms = 2s。
要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer,早日把它们踢出 Group。

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍
长一点。
总之,要为业务处理逻辑留下充足的时间。这样,Consumer 就不会因为处理这些消息的时间太长而引发 Rebalance 。

如果leader crash时,ISR为空怎么办?

kafka在Broker端提供了一个配置参数:unclean.leader.election,这个参数有两个值:
true(默认):允许不同步副本成为leader,由于不同步副本的消息较为滞后,此时成为leader,可能会出现消息不一致的情况。
false:不允许不同步副本成为leader,此时如果发生ISR列表为空,会一直等待旧leader恢复,降低了可用性。

kafka的message格式是什么样的?

一个Kafka的Message由一个固定长度的header和一个变长的消息体body组成
header部分由一个字节的magic(文件格式)和四个字节的CRC32(用于判断body消息体是否正常)构成。
当magic的值为1的时候,会在magic和crc32之间多一个字节的数据:attributes(保存一些相关属性,
比如是否压缩、压缩格式等等);如果magic的值为0,那么不存在attributes属性、body是由N个字节构成的一个消息体,包含了具体的key/value消息

Kafka中的消息是否会丢失和重复消费?

要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。
	1、消息发送
	  Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。Kafka通过配置request.required.acks属性来确认消息的生产:
		0---表示不进行消息接收是否成功的确认;
		1---表示当Leader接收成功时确认;
		-1---表示Leader和Follower都接收成功时确认;
	综上所述,有6种消息生产的情况,下面分情况来分析消息丢失的场景:
	
	(1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;
	(2)acks=1、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;
	
	2、消息消费
	
	Kafka消息消费有两个consumer接口,Low-level API和High-level API:
	
	Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;
	
	High-level API:封装了对parition和offset的管理,使用简单;
	
	如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来
	得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了;
	
	解决办法:
	
	   针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;
	
        针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。

ISR 副本集合保存副本条件?

leader 与 follower 数据相差超过 4000 条
replica.lag.max.message = 4000
超过 3 秒未同步数据,拉取速度慢于 leader 副本写入速度replica.lag.time.max.ms = 3000
如果 follower 速度提升上来之后,可能会重新加入 ISR 副本集合中

法:

   针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;

    针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。

## ISR 副本集合保存副本条件?

leader 与 follower 数据相差超过 4000 条
replica.lag.max.message = 4000
超过 3 秒未同步数据,拉取速度慢于 leader 副本写入速度replica.lag.time.max.ms = 3000
如果 follower 速度提升上来之后,可能会重新加入 ISR 副本集合中


  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-21 20:57:41  更:2022-03-21 20:58:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 17:57:14-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码