IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kakfa详解 -> 正文阅读

[大数据]Kakfa详解

Kafka基本概念:

名称

解释

Broker

消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群

Topic

Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic

Producer

消息生产者,向Broker发送消息的客户端

Consumer

消息消费者,从Broker读取消息的客户端

ConsumerGroup

每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息

Partition

物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的。一个分区可以有多个副本,实际意义上来说,副本才是物理概念。

Topic和Partition

Topic可以看成是一类消息的逻辑称呼,同类消息发送到同一个Topic下面。对于每一个Topic,下面可以有多个分区(Partition):Producer生产者不停的将新的消息发送到topic下的不同分区,按顺序追加写入磁盘。

Partition是一个有序的message序列,这些message按顺序添加到一个叫做commit log的文件中。每个partition中的消息都有一个唯一的编号,称之为offset,用来唯一标示某个分区中的message。换句话说,只有在同一个分区里,消息是有序的。

kafka不会删除消息,不管这些消息有没有被消费。只会根据配置的日志保留时间(log.retention.hours)确认消息多久被删除,默认保留最近一周的日志消息。kafka的性能与保留的消息数据量大小没有关系,因此保存大量的数据消息日志信息不会有什么影响。

Topic,Partition和Broker、副本之间的关系可以这么理解:

一个topic,代表逻辑上的一种类型的数据,比如订单相关操作消息放入订单topic,用户相关操作消息放入用户topic,订单消息很可能是非常巨量,比如有几百个G甚至达到TB级别,如果把这么多数据都放在一台机器上可定会有容量限制问题,那么就可以在topic内部划分多个partition来分片存储数据,不同的partition可以位于不同的机器上,每台机器上都运行一个Kafka的进程Broker

那既然机器磁盘容量的问题解决了,那如何解决高可用的问题呢,那就是副本的作用了。一个分区可以用一主多从的副本 多个副本来使用。每个分区只有主副本可以收发消息,其他分区只同步主分区的消息。这样在主分区宕机的时候,从分区可以有同样的消息存在。

除了解决commit log存储受到机器磁盘容量的限制,多个分区设计可以解决并行消费的能力。

Producers

生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。发送到

Consumers

消息传递模式有2种:队列(?queue) 和(publish-subscribe

  • queue模式:多个consumer从服务器中读取数据,消息只会到达一个consumer。
  • publish-subscribe模式:消息会被广播给所有的consumer。

Kafka基于这2种模式提供了一种consumer的抽象概念:consumer group

  • queue模式:所有的consumer都位于同一个consumer group 下。但同一条消息只能被一个consumer能接受到。
  • publish-subscribe模式:所有的consumer都有着自己唯一的consumer group。

原则:queue模式下同一group同一分区的消息,只有个consumer能收到

上图说明:由2个broker组成的kafka集群,某个主题总共有4个partition(P0-P3),分别位于不同的broker上。这个集群由2个Consumer Group消费, A有2个consumer 实列,B有4个。

通常一个topic会有几个consumer group,每个consumer group都是一个逻辑上的订阅者。每个consumer group由多个consumer 实例组成,从而达到可扩展和容灾的功能。

消费顺序:分区内有序除了分区顺序写入生产者产生的消息以外一个partition同一个时刻在一个consumer group中只能有一个consumer 实列在消费。因此,同组的实列数不要大于分区数,要不然始终有一个在打酱油。如果想保证topic的有序,那设置分区数量为1.但kafka很少用于这种场景

kafka客户端关键配置:

发送端:

1、props.put(ProducerConfig.ACKS_CONFIG, "1"):

acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,稳定性最差。

acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。如果此时follow副本还没来得及同步leader的消息leader就挂了,则消息丢失

acks=-1: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2,可以在broker或者topic层面进行设置) 这个参数配置的副本个数都成功写入日志,这种策略会保证 只要有一个备份存活就不会丢失数据。这是最强的数据保证。也就是至少每条消息有N个你配置的副本数确认才会提交成功。如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常

2、retries:发送失败重试次数,

3、retry.backoff.ms:重试间隔毫秒

4、buffer.memory:设置本地缓冲器大小kb

5、batch.size:设置batch大小。

6、linger.ms:到达batch后最迟多久发送一次,默认0,来了就发送。但是因为发送的数据大小不够,不能充分利用网络资源,所以一般设置为100毫秒.看情况而定吧

7、key和value的的序列化方式:key.serializer和value.serializer

8、最大请求的数据大小,单次:。。。配置参数名忘了,max.request.size?单次发送给kafka的请求的最大数据包。

解释下batch和buffer.memory:kafka发送消息并不是发送以后直接发个服务端,而是先放到本地一个内存缓冲区,我们叫它消息buffer区。我们发送的消息会被打包成一个batch放在缓冲区,如果这个batch达到了指定大小,我们才会把它发送出去。为何这样整?因为来一条发送一次消耗网络请求资源,为了一次性发送多一点,所以才此设计(这里还设计一个数据丢失的优化,下面讲到)

消费端:明再说吧。。。睡觉。。。

1、group.id:消费组的唯一表示。相同的消费组kafka服务端就认为是一个组内的消费实例。不同的group.Id就是不同的消费组。

2、fetch.min.bytes:拉取请求返回的最小数据量,如果数据不足,请求将等待数据积累。默认设置为1字节。如果等待超时,则回应消费者拉取请求。将此值设置的越大将导致服务器等待数据累积的越长,以时间节省服务器性能。如果遇到消息积压,这个参数要检查是不是配置的过大导致

3、heartbeat.interval.ms:与服务端维持心跳的时间。与session.timeout.ms配合使用。

4、session.timeout.ms:最小会话时间。如果最小会话时间内,任然没有收到过一次心跳。这重新分配消费组消费实例,默认10s。由此可见,heartbeat.interval.ms必须要小于此参数。

5、auto.offset.reset:没有消费者拉取的offset的情况下,消费组从哪里开始消费的方式(没有初始offset或如果当前的offset不存在时:如,数据被删)。一般是最早或是最新两种。或者抛出异常

6、enable.auto.commit:自动周期性提交offset

7、auto.commit.interval.ms:自动提交的时间间隔。

7、max.poll.interval.ms:消费组两次poll()调用之间的最大延迟。如果超过此时间没有再次poll(),则消费者被视为消费能力有问题,将重新分配组内成员。

8、max.poll.records:单次poll拉取消息数量的上限。

9、request.timeout.ms:客户端最长等待请求回应时间。

其他的不是常见的就不一一列举了。作为一个高吞吐的消息中间件,其参数太多了。。

网传kafka架构图解:(zk没有画集群)。图中的同步消息是错的,应该是Topic A的0分区的follower向leader去拉取消息,内部是时间轮实现的延迟拉取

Kafka集群控制器:

Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。

  • 当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。
  • 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
  • 当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到。

?那集群控制器broker是怎么定出来的呢。很简单,集群启动时。第一个在zk中创建/controller 临时节点成功的那个就是集群的控制器broker。zk嘛,当这个控制器broker挂了的时候,zk的临时节点也没了,这时候剩下的broker再去竞争注册临时节点,成功即当选。其控制器功能依赖zk如下实现:

  1. 在Zookeeper中的/brokers/ids/节点添加BrokerChangeListener来监听broker变化。
  2. 在Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化;为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作
  3. 对于所有topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化
  4. 更新集群的信息,同步到其他broker。

分区的Leader副本:

每个分区有多个副本保证其数据的安全性,多个副本只有leader副本是与外界交互的,那leader是怎么选出来的呢。很简单,当 kafka集群控制器broker检测到某个分区的leader副本挂了之后,会从 ISR列表中选出最先加入到ISR列表的broker作为新的leader,因为其数据可能是最全的。如果ISR列表为空,则分区不可用。设置unclean.leader.election.enable为true 时表示可以在ISR列表以外的broker上选出新的leader,提高可用性但是可能数据不全。

什么是ISR列表

  1. 能与zookeeper保持会话以及跟leader副本网络连通
  2. 副本能复制leader上的所有写操作,并且不能落后太多。(与leader副本同步滞后的副本,是由 replica.lag.time.max.ms 配置决定的,超过这个时间都没有跟leader同步过的一次的副本会被移出ISR列表)

消费者的各分区消费进度,offset如何记录:

kafka有个内部的topic:consumer_offsets(默认有50个分区)。消费者会定期的提交自己的消费的offset。提交是以groupId+topic+分区号作为key,offset作为值。此topic会被定期清理,只保留每个消费组在每个topic每个分区的最新的消费记录。每个consumer在提交时,会以hash(consumerGroupId) %分区数来确定提交的哪个分区去。

消费者重新分配Rebalance:

如果消费组里的消费者数量有变化或消费的分区数有变化,kafka会重新分配消费者消费分区的关系。如果消费组里的一个消费实例挂了,那kafka会重新把它消费的分区分配给其他消费者。如果一会它又好了,又会重新分给它。如果其他消费者是指定分区消费的,则不会进行reblance。因此消费者里的消费者实例变化,topice分区变化,或者订阅了新的topice都会reblance。由此还会导致其他很多常见问题,下面讲到。

既然reblance,那重新分配消费者分区的关系是什么策略呢?通过partition.assignment.strategy可以设置kafka的分区与消费者的消费绑定策略,range、round-robin、sticky。默认第一种range。比如默认有20个分区。三个消费者

1、range策略就是按照分区序号排序,第一个消费的是0-6分区,第二个消费的是7-13,第三个是14-19个分区。咋算就是 分区数/消费者 取余看。

2、round-robin 就是遍历消费者轮询。比如0给第一个消费者,分区1给第二个,分区2给第三个,然后分区3又给第一个,如此循环。

3、sticky:尽量均匀不过它会尽可能保证不动原消费者消费的分区。如果第一种情况,第一个消费者挂了,那回把14-19遍历分给第二和第三个。但是不会动原来的消费关系。

rebalance的过程

1、进行rebalance必定要选中一个broker作为协调器,负责监控这个消费组里的所有消费者以及消费分区的情况,以便及时的rebalance。每个consumer启动后都会向kafka集群中的某个节点发送 FindCoordinatorRequest 请求来查找对应的组协调器。而这个节点就是 组协调器.确定方式为:消费者的offset提交的(topice:__consumer_offsets)分区、这个分区的leader副本所在的broker就是这个消费组的 组协调器

2、第一个加入到组协调器的消费者将作为 leader消费组协调器。组协调器会把这个组的消费组员等情况发给这个 leader。由它指定分区方案。

3、leader将分区策略方案(那个消费实例消费哪些分区)发给组协调器。组协调器在发个各个消费者

生产者消息发布:

1、写入方式:采用push方式,将消息顺序append到分区中,是顺序写入磁盘的典型应用。至于顺序读写磁盘快于随机内存读取还是要看情况的,大部分情况没有出现官网说的快于内存随机读取。

2、分区路径:消息发送的哪个分区如何决定?

指定分区:写到指定的分区;
没指定分区但是有key:将key进行hash后再与该topic的分区数量进行取余得到分区号,可以保证同样的key的消息会进入同一个分区内;
没指定分区又没有key值:先随机生成一个整数(之后每次在这个整数上递增1),将这个整数和topic的分区数量进行取余得到分区号(Round-Robin算法),是kafka默认的分区分配策略,能保证负载均衡不会出现倾斜的情况。

3、写入流程:

1.(找分区leader) producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader

2. producer 将消息发送给该 leader

3. (持久化)leader 将消息写入本地commit log

4. (备份安全性)followers 从 leader pull 同步消息,写入本地 log 后 向leader 发送 ACK

5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK (这里指ack方式为-1)

HW与LEO:

一个partition对应的ISR中最小的LEO(log-end-offset)作为HW,consumer最多只能消费到HW所在的位置。什么意思。就是ISR列表中A/B/C节点。如果A的偏移量是100,B/C是98,那能消费的消息只是到98.这是 ack=-1的情况。保证所有副本都同步了以后,消息才算写入成功:

由此可见,kafka并不是同步复制的,而是异步复制到副本的。这种等待非leader副本确认完才可消费的方式很明显。。。。限制了吞吐量。但是如果不等follower副本同步完就更新HW到100,如果此时leader宕机,那其实所有的follower副本都没有100的消息,导致消息丢失。ack=1的情况:

?

commitLog分段存储:

消息以topic名称+分区号命名存在一个文件夹下,消息在分区内是分段存储的,每个段的消息都存储在不一样的log文件里,这种特性方便老的消息段快速被删除,kafka规定了一个段位的 log 文件最大为 1G,防止文件过大造成读写性能问题。文件分为:00000000000000000000.index、00000000000000000000.log、00000000000000000000.timeindex三种文件。

log:存储消息体和offset 的文件。

index:部分offset的索引文件,默认每写入4k文件就会记录当前offse到index文件。作用:很明显如果按照offset查找文件,现在index文件中采用二分法找到对应offset段,然后根据offset段查找log文件,可以精确定位到4k大小的文件,而不是整个log文件从头开始找。

timeindex:同样的,只不过这里记录的是时间。默认每写入4k文件就会记录当前时间戳与对应的offset到timeindex文件,同上,方便按照时间查找。

提下:kafka一般都用G1垃圾收集器比较合适。kafka的高吞吐大内存,但是又都是秒秒钟就需要回收的短生命期对象,G1可以很好的控制Gc时间,防止内存过大导致一次Gc时间过长。而且由于写数据到磁盘要用到系统 page cache,linux并不会 立刻把数据刷到磁盘,而是为了方便读写会将其先刷到pagecache,后续自主刷到磁盘。所以需要留出几个G给系统,而不是全部分给Jvm。

Kafka用磁盘依旧高效的原因

1、磁盘顺序读写,分段存储

2、两个零拷贝技术:sendfile(消费者取数据)和mmap(生产者写数据)

3、读写数据的批量batch处理并且支持压缩传输,充分利用网络资源

sendFile拷贝原理:一般的消费者取数据要通过4个步骤,4次拷贝。sendFile技术则直接在读取数据到内核缓冲区后丢给网卡,省略了2/3两步的拷贝操作。

?mmap:将磁盘文件映射到内存中,用户对内存中的数据修改其实最终通过映射会刷到磁盘,但不是立刻刷盘,操作系统会在适当的时候刷盘,因此也减少了不必要的拷贝。但是同样的,所以的高性能必有一定安全问题,再刷盘前宕机了,数据就丢了,当然kafka提供了立刻刷盘的设置

kafka的事务:其实kafka事务的功能是一次发送的多条消息,要么成功要么失败。主要我们是在发送到下游不同的消费者时,希望他们都能保持kafka事务的一致性,而不是有的收到,有的没收到但其并不是我们所说的分布式事务,如RocketMq的分段提交实现的分布式事务(采用提交后回查提交者是否提交成功来实现)。

常见问题:

1、消息丢失:根据上面讲的,很容易知道几种情况。生产者提交时如果是异步提交,如果缓冲池满了,就设置无限制阻塞时间生产者。设置ack=-1。mmap设置为立刻刷盘。消费者设置自动提交也可能导致消息丢失(数据没处理完,offset已经提交,这时候宕机了。。。。).保证消息不丢也必然导致性能下降

2、重复消费:rebalance,发送方因网络抖动重试。rebalance:可能是一次拉取的消息数目太多导致规定时间内没有再次poll,也有可能是业务逻辑处理太慢导致超时。session与心跳时间配置错。消费端做幂等性处理。生产者kafka也有幂等性设置“enable.idempotence”:?true。其实现就是每次发送器生成唯一id和机器码绑定,如果下次同机器码还是一样的id,则不再接收

3、如何保证有序:将有序的消息发到同一个分区。必须设置为同步提交或者异步提交不重试。如果异步提交重试,发1/2/3的消息,可能1失败了,2/3成功后重试了1。所以最好同步提交。

4、消息积压:这。。可以增加组内消费者数量,前提是分区数小于组内消费者数。如果还是不行的话证明消费逻辑比较长已经来不及慢慢消费了。修改消费者程序逻辑,格一段时间就直接转发到其他topic里面,用其他消费者实列去分担其消费。

5、延时队列:比如A的消息要延迟消费。则把A抛出到一个其他专门放延时消息的topic中去,用一个中间服务消费它以后放入一个延时阻塞队列,然后自旋取,取到后就抛送到实际消费的topic中去(也可以实现一个时间轮)。或者用一个中间服务定时轮询延时topic,看时间到了没有。

6、回溯消费:根据时间或者偏移量消费,上面讲了,index/timeindex.log 就是支持干这个事情的

7、并不是分区越多,消费者越多吞吐越大。还要看网络带宽,服务器处理能力等。实际压测结果为准。一般分区数目不超过集群broker节点数。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-11 16:42:24  更:2021-07-11 16:45:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/1 7:46:58-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码