发送消息主要由三种模式 1、发后即忘?? ?(只管往kafka中发送消息,而不去关系消息是否到达,这种发送方式性能最高,但同时可靠性也最差,容易丢失消息) 2、同步?? ??? ? (可以使用get()方法阻塞,以此来得到同步返回结果) 返回的是一个RecordMetadata对象,里面包含了一些元数据信息,比如当前消息的主题,分区,分区中的偏移量,时间戳等 3、异步?? ??? ? ?(异步一般是在send方法中加一个CallBack的回调函数)
在kafkaProducer发送消息的时候,一般会有以下异常,如:NetworkException,LeaderNotAvailableException,对于 这些由于网络瞬时故障而导致的异常,可以通过重试解决,如果配置了retries(默认为0),那么在规定的重试次数内 自行恢复,就不会抛出这些异常,如果重试失败,则会抛出
序列化器: 生产者需要用到序列化器把对象转换成字节数组,然后才能够通过网络发送给Kafka,而在对策,消费者需要用到反序列化器 把从kafka中收到的字节数组转换成相应的对象
分区器: 如果消息ProducerRecord中指定了partition字段,就不需要分区器了,因为partition代表的就是要发往的分区号 如果消息中没有指定partition字段,那么久需要依赖分区器,根据这个key来计算partition的值。分区器的作用就是消息分配分区 默认的分区规则是,如果有key值,则将key hash,然后%numPartition 取余,得到分区号(这里的分区号是所有分区中的任意一个) 如果没有key值,则会采用轮询的方式获取分区号(这里的分区号是可用分区中的其中一个) 实现了自定义个分区器号,需要在配置中指定这个分区器
拦截器: 拦截器有两种,分别是生产者拦截器和消费者拦截器,主要可以对消息做一些定制化的处理, 比如过滤掉不符合要求的消息、修改消息的内容等
注:拦截器->序列化器->分区器
整体架构: 整个生产者客户端主要是由两个线程协调运行,分别为主线程和Send线程 主线程中主要是有KafkaProducer创建消息,然后经过拦截器,序列化器,分区器 再然后会将消息缓存到消息累加器中(RecordAccumulator),当累加器中消息数量达到batch.size(或者到一定时间)则将消息发送到kafka 消息累加器主要用来缓存消息,以便Send线程可以批量发送,进而减少网络传输的资源消耗以提升性能,消息累加器缓存的大小可以通过配置 buffer.memory配置,默认为32M
生产者重要参数 1、acks: acks = 1;默认值即为1,表示生产者发送消息之后,只要分区的leader副本成功写入消息,那么就会收到来自服务端的成功响应 acks = 0;生产者发送消息之后不需要等待任何服务器端的回应 acks = -1或acks = all;生产者发送消息之后,需要等待ISR中的所有副本都成功写入消息之后,服务端才会成功响应
2、max.request.size ?? ?这个参数主要用来限制生产者发送消息的最大值,默认为1M ?? ? 3、reries和retry.backoff.ms ?? ?retries参数用来配置生产者重试的次数,默认值为0,即发生异常的时候不会去重试 ?? ?retry.backoff.ms表示两次重试之间的时间间隔
kafka有一个内部主题:_consumer_offsets 主要用来保存分区消费的offset(初始情况下这个主题并不存在,只有当第一次消费者消费时才会自动创建这个主题)
再均衡 是指分区的所属权从一个消费者转移到另一个消费者的行为。在再均衡发生期间,消费组内的消费者是无法读取消息的,消费组会变得不可用 再均衡可能会出现消息重复消息的问题。比如消费者刚消费完消息,但是还没有提交消费位移,此时发生了再均衡操作,这个分区被分配到另 一个消费者手上,原来已经消费过的消息,此时被这个新的消费者重新消费一遍
文件存储: topic->partition->replica->log 这里的log并不是真正意义上的物理概念,log在物理上只以文件夹的形式存储 log下又引入了日志分段(LogSegment)的概念,每个logSegment对应磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(事务索引文件) 向log中追加消息是顺序写入的,因此只有最后一个logSegment才能够写入,可以称之为活跃分段,当满足一定的条件后,就会创建一个新的activeSegment 新的消息就会追加写入到这个新的activeSegment中。为了消息便于检索,每个logSegment都有一个偏移量索引文件以及时间戳索引文件 偏移量索引文件:用来建立消息偏移量( offset )到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置; 格式: 相对偏移量 ?消息物理地址 ? ? ? ?根据绝对偏移量找到相对偏移量,然后根据物理地址找到消息 ?? ? ? 时间戳索引文件:根据指定的时间戳( timestamp )来查找对应的偏移量信息。(时间) 格式: 时间戳?? ?相对偏移量 ?? ? ? 根据时间戳找到相对偏移量,然后再从偏移量索引文件中找到消息的具体物理地址
日志清理有两种策略 1、日志删除:按照一定的保留策略直接删除符合条件的日志分段 2、日志压缩
kafka实现高吞吐的原因 1、页缓存 2、顺序写入 3、零拷贝(直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手,减少了内核和用户模式之间的上下文切换) 4、批处理:降低了rpc的次数
控制器的选举以及异常恢复 kafka中的控制器选举工作依赖于zookeeper,成功竞选为控制器的broker会在zookeeper中创建/controller这个临时节点 (注:这里的控制器是集群中概念,而分区的leader副本提供读写功能,这个是分区的概念,且leader副本的选举是controller来负责的)
分区leader的选举 按照AR集合副本的顺序查找第一个存活的副本,并且这个副本要在ISR中;如果IRS中没有可用的副本,那么此时还要再检查一下配置unclean.leader.election.enable 参数(默认值为FALSE),如果配置为true,则表示可以从非ISR中选举leader,从AR中找到第一个存活的副本即为leader 注:这里是按照AR中副本顺序,而不是ISR中的副本顺序,因为AR中副本顺序是保持不变的,而ISR中的副本顺序是可变的
AR:分区中的所有副本统称为AR ISP:是指与leader副本保持同步状态的副本的集合 OSR:是指未与leader副本保持同步状态的副本的集合 LEO:是指每个分区中的最后一条消息的下一个位置,分区中每个副本都有自己的LEO ?? ? ?其中ISR中最小的LEO即为HW,俗称高水位,消费者只能够拉取到HW之前的消息 ?? ? ?(因为大于HW之后的消息,有些分区还未成功同步,所以暂时不能够同步) ?? ? ?生产者发出的一条消息首先会被写入到分区的leader副本,不过还需要等待ISR中的所有follower副本都 ?? ? ?同步完之后才能够被认为已经提交,之后才会更新分区的HW,进而消费者可以消费到这条消息 ?? ? ?
zookeeper的作用 kafka集群中有一个broker会被选举为controller,controller负责broker的上下线,所有topic的分区副本分配和leader副本的选举等工作
kafka事务 首先要实现事务,必须要开启幂等性(多次调用返回的结果是一致的),kafka中实现幂等性,主要是通过PID(生产者的ID,kafka内部分配的),和序列号这两个来实现的 每个生产者在初始化的时候都会被分配一个PID,也就是生产者和PID是一一对应的,生产者发送消息到某个分区,都会有对应的序列号,这些序列号都是 从零开始单调递增,broker端会在内存中为每个PID维护这个序列号,对于收到的消息,只有当它的序列号值比broker端维护的序列号的值大1,broker才 会接收它,如果是小于,则表明是重复写入,直接丢弃;如果是大于,则表明中间有数据未写入,出现了乱序,也就是数据丢失 要使用事务,应用程序必须提供唯一的transactionID,这个transactionID通过客户端参数transactionl.id来显示设置。事务要求生产者开启幂等性。 与此同时,如果使用同一个transactionID开启两个生产者,那么前一个生产者会报错,这个是对于生产者而言 对于消费者而言,log文件中不仅有普通的消息,还有控制消息,专门用来标志一个事务的结束,有两种类型,一种是commit,另一种是abort,kafkaConsumer可以 通过控制消息来判断对应的事务是被提交了还是被终止了,然后再结合设置的隔离级别来判断是否需要返回给消费者
|