1.生产者原理
1.首先创建main线程,创建Producer对象,调用send方法发送数据,遇到拦截器就处理数据,然后将数据序列化(kafka本身的序列化器,开发要手动指定),然后进入分区器,根据分区数进行分区
2.然后将分好区的数据发往一个位于内存的缓冲区(默认32M),每个分区对应一个双端队列,生产者发送数据的批次大小为16K
3.sender线程的Sender读取数据,读取的临界条件是每个队列内的数据大小总和达到batch.size(默认16K)或者sender等待时间达到了linger.ms(默认0ms)
4.每个队列的数据被“转换”成请求,放入请求队列,去请求集群上对应节点的对应分区,要通过底层创建的Selector传送到集群,分区应答机制分为三种—①应答机制为0:生产者只需要发送数据,其余啥都不用管 ②应答机制为1 需要等待leader收到数据后才返回应答 ③ -1(all) leader和follower都收到数据后才应答 。若节点不应答,最多可累计5条请求
5.发送成功到Kafka集群,那么清除内存中对应的数据,不成功重试(重试次数可达int的最大值次)
2.主题分区
主题为什么分区
很明显,将数据分区,可以合理的使用存储资源
其次,可以实现并行度,生产者可以以分区为单位进行发送数据,消费者可以分区为单位进行分区
主题分区策略
假如我只有3台服务器,但我对某主题分了9个分区,每个分区2个副本,那我这9个分区的leader在哪?follower在哪? 注意,副本数不能超过机器数(超过会报错)
测试
./kafka-topics.sh --bootstrap-server hadoop102:9092 --topic topicx --create --partitions 9 --replication-factor 2
./kafka-topics.sh --bootstrap-server hadoop102:9092 --topic topicx --describe
可以看到,leader 采取了轮询的方式,实现了leader自动平衡 。关于follower,以每个节点为单位看,我们可以看到,位于节点0的leader的副本策略是这样的,[0,1] [0,2] [0,1],如果再来一个leader位于0,那它的副本策略是[0,2],也是为了负载均衡
生产经验 修改分区副本的位置
假如我想把分区0的副本由[1,2]变为[0,1]
在kafka目录下vim increase-replication-factor.json
{
"version":1,
"partitions":[{"topic":"topicx","partition":0,"replicas":[0,1]}
}
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
此外,还可以通过种方式增加副本,假如我把上述json文件再修改一下,改成[0,1,2],那该分区的副本数就会增加!
数据分区策略(数据被分到哪个分区规则)
可以看到,生产者的构造函数有以下6种 对于前4种,我们可以通过第二个参数直接指定分区
对于第5种,虽然没有指定分区,但是指定了key,那么数据在哪个分区由 key的hash值与topic的partition数进行取余操作得到分区号,比如我们可以将一张mysql的表名字作为key,那么这张表的数据就会进入一个分区了
对于第6种,既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同) 例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)
自定义数据分区器
实现Partitioner接口,重写方法
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String str = value.toString();
int partition;
if(str.length() <= 3){
partition = 0;
}else {
partition = 1;
}
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
在属性中指定分区类,发送消息就会根据分区器进行发送了
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.gzhu.MyPartitioner");
3.生产者吞吐量
主要有4种方式改变吞吐量
1.改变RecordAccumulator的大小,缓冲区大了,数据处理能力也强了 2.改变batch.size的大小,这个要根据实际需求改变,如果太小导致发送数据太频繁,太大导致数据延迟高 3.改变linger.ms的时间,如果太小导致发送数据太频繁,太大导致数据延迟高 4.修改压缩方式
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
4.数据可靠性 ISR原理
每个队列的数据被转换成请求队列,去请求集群上对应节点的对应分区,要通过底层创建的Selector传送到集群,集群应答机制分为三种
①应答机制为0 生产者只需要发送数据,其余啥都不用管 ,很明显,如果leader没收到,就丢失了,但是生产者认为发送成功了,因此这种机制很不可靠,基本不用
②应答机制为1 需要等待leader收到数据后才返回应答 ,也有缺陷,leader收到某数据并且应答了,但是follower还没来得及同步,此时leader挂了,数据没有备份且唯一收到数据的leader还挂了,此时生产者已经收到过应答了,认为已经发送成功了,数据就丢失了
③ -1(all) leader和follower都收到数据后才应答 看似十分可靠,但是有个问题,比如某follower同步的时候,由于故障挂掉了,此时这个follower就无法产生应答了,生产者却一致在等,那整个集群不就瘫痪了吗?
为此,Leader维护了一个动态的In-Sync Replicas(ISR),意为和Leader保持正常同步的Follower+Leader集合 (leader:0,isr:0,1,2),因此我们只要关注ISR里节点的应答就可以
如果Follower在30s内未向Leader发送通信请求或同步数据,则该Follower将被移出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如节点2的副本超时,则(leader:0, isr:0,1)
这样就不用等长期联系不上或者已经故障的节点
如果分区副本设置为1个,或者ISR里应答的最小副本数量( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)。因此,想要保证数据完全可靠,ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2,回到当初的问题,[某follower同步的时候,由于故障挂掉了,此时这个follower就无法产生应答了,生产者却一致在等那整个集群不就瘫痪了吗?],由于ISR机制,这个follower被踢出,生产者等到了另外两个的应答,也视为成功
机制选择
在生产环境中,acks=0很少使用
acks=1,一般用于传输普通日志,允许丢个别数据
acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景
在开发中,主要配置两个参数
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,1000);
5.数据重复问题 幂等性原理
先明白一点,我们目前针对的某个分区来分析的
假如对于某个分区,一个leader和两个follower,都成功收到了数据,但是就在返回应答的时候,leader挂了,那么生产者收不到全部的应答(应答机制为all),由于ISR机制,生产者会重新发送数据,此时原来的两个follower(有个follower在leader挂了后会成为新的leader)已经有了数据,再接收一次就产生了分区出现了数据重复问题
关于发送次数的一些场景
1.若至少发送一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
2.若发送最多一次(At Most Once)= ACK级别设置为0
At Least Once可以保证数据不丢失,但是不能保证数据不重复
At Most Once可以保证数据不重复,但是不能保证数据不丢失(实际我们不用)
3.精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失,我们怎么实现呢???幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端某分区都只会持久化一条,保证了不重复
幂等性主要通过三个属性来实现:具有<PID, Partition, SeqNumber>相同主键 的消息提交时,Broker某分区只会持久化一条
其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的,是每条数据的序列化号,不同数据是不同的
很明显,幂等性只能保证的是在单会话单分区内不重复,为什么只能保证单会话?示例,对于一个消息a,kafka集群启动了两次,由于PID不同,分区认为这是不同的数据,不会认为是相等的数据(实际都是数据a)
再回到当初的情景,节点0挂了,生产者重新发送数据,新的leader再次接收到a时,由于PID没变,分区号没变,SeqNumber没变,重复了,不接受,成功解决分区内的数据重复问题!
开发代码中如何实现幂等性?
开启参数enable.idempotence默认为true,false 关闭,实际上,底层幂等性是默认开启的,我们不写也可以
6.Kafka事务原理
幂等性只能保证的是在单分区单会话内不重复,幂等性不能跨多个分区。多个分区之间不会重复数据怎么实现,怎么办? 事务,kafka事务可以保证对多个分区写入操作的原子性
开启事务,必须开启幂等性,事务底层依赖幂等性
首先确定使用哪个事务协调器(因为每个节点都有事务协调器,啥?啥是事务协调器?kafka集群和客户端进行事务通信总得有个东西吧?就是通过它!),如何确定?集群中有一个特殊的主题,用来存储事务的信息,默认有50个分区,每个分区都会负责一部分事务。如何确定事务属于50个中的哪个分区?需要程序员手动输入一个transactional.id(全局唯一),将transactional.id的hashcode%50计算出属于哪个分区,该分区leader所在的节点上那个事务控制器就是我们要使用的
我们假设我们的主题是3个分区,且使用broker0的事务控制器
1.因为事务依赖于幂等性,保证单个分区内的数据不重复,所以生产者首先要向事务协调器获取Producer ID,也即PID 2.事务协调器返回当前的PID 3.开启事务后生产者向集群的不同分区发送数据,是原子性操作 4.告诉事务协调器事务提交commit请求 5.事务协调器通知事务主题该请求,让其进行持久化操作,这样保证了客户端挂了后,重启后事务可以继续处理未完成的数据 6.事务协调器告知生产者事务提交成功 7.事务协调器会发送请求到每个分区,确认每个分区是否收到了数据,确保事务成功 8.每个分区应答给事务协调器,告知一切正常 9.事务协调器收到通知后,将该条事务的执行成功持久化,确保不会再执行该事务了
IDEA 事务
public class TransactionsTest {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.102:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"100");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.initTransactions();
producer.beginTransaction();
try {
for (int i = 0; i < 5; i++) {
producer.send(new ProducerRecord<>("first","Hello " + i));
}
producer.commitTransaction();
}catch (Exception e){
producer.abortTransaction();
}finally {
producer.close();
}
}
}
7.数据有序原理
如果保证分区内有序,是有条件的,这里的有序是指生产者发送 1 2 3,集群收到的数据为1 2 3,而不是2 1 3 等等
什么条件呢?
没有开启幂等性,数据没有序列化号(序列化号大的是后来的,因为序列化号是递增的)!不知道谁先来的,谁后来的,因此为了保证有序,直接设置下图中请求只能存一个,max.in.flight.requests.per.connection需要设置为1,当前请求成功后才发送下一个请求
不开启幂等性并且max.in.flight.requests.per.connection不为1,假设当有两个请求时,请求1先来的,但是请求失败了,请求2成功了,这样请求1再次请求成功后,1在2的后面了,因此只能max.in.flight.requests.per.connection只能为1
开启了幂等性,最大的好处是有序列化号了,可以根据序列化号判断先后了!
max.in.flight.requests.per.connection需要设置小于等于5,这样我就能保证最近5个请求时有序的了
开启幂等性,假设当有两个请求时,请求1先来的,但是请求失败了,请求2成功了,但是集群知道第一个应该是请求1,先在内存放着请求2,等待请求1,这样请求1再次请求成功后,先让1排到前面去,在落盘,保证了有序
注意,只保证了分区内有序,如果想实现分区间有序,只能所有分区数据到了集群排序了
8.ZooKeeper一些信息
1.kafka/brokers/ids 可以看到哪些节点正常运行 2.kafka/brokers/topics可以看到每个主题的每个分区的leader和ISR
3.kafka/controller是个辅助选举leader的
两者结合使用原理
每个kafka的节点启动后会向/brokers/ids/ 注册,然后每个节点都会有controller控制器,采取抢占式,哪个节点的controller抢到了,谁就说了算,然后由该controller监控ids并进行每个分区的leader选举,选举规则: 在ISR存活为前提,谁在AR(Assigned Replicas) 中排在前面谁就是leader(AR:分区所有副本统称),对于分区1,启动的时候AR假如是是这样的[1,0,2],那么就会选举broker1的作为leader,然后controller会将分区信息上传到zookeeper,其余的controller会同步信息(保证容错性)
假如leader挂了,controller监听到了,那么controller会立马拉取zookeeper上的ISR信息,根据选举规则再次选举,找活着的,并且在AR拍前面的就是新的leader
数据在分区是怎么存储的 实际是以主题名字加分区号命名的
每个分区都会有个log文件,log是有多个Segment组成,生产者到topic的数据就被存在Segment
以Segment(1G)为单位存储,为了加快检索速度,会有个.index文件,使用索引加快查询速度,这里是稀疏索引 ,当往log文件写入4kb数据时,index才会增加一条索引,也就是这4kb共用一个索引,找数据的时候先找到数据时哪个索引管,然后根据索引信息去log中找
数据清除
一.删除
每个segment最后来的数据的时间戳,超过7天删除(这个时间可以修改),也就是最后来的数据过期,才会清除,最后来的都过期了,其余的肯定过期了
二.压缩
适合K-V类型数据,对于相同的key,value只保留最新的value
9.leader和follower同步问题
LEO(Log End Offset) : 每个副本的最后一个offset,看图
HW(High Watermark):所有副本最小的LEO,上图中,Follower1的LEO最小,那么所有副本的HW为4号位
Follower出现故障
假如上图中,Follower2出现故障,ISR首先会将节点删除,但是Follower1和Leader会继续接收数据
Follower2恢复后,会读取本地磁盘的HW,并将文件中高于HW的部分删掉,因为它觉得这些数据没校验,然后开始同步Leader的数据,直到该Follower的LEO大于等于该分区的HW,才算真正的恢复,加入到ISR
Leader出现故障
Leader出现故障后,根据选举规则会选择一个新的leader,那么此时新leader的LEO和HW成为新的标准,所有的follower的数据都要向新的leader看齐,也即follower多余的数据的数据会被砍掉,很明显,这样可能会导致数据丢失
10.kafka高效读写原理
1.kafka本身是分布式,分区技术提高了并行度
2.数据采用稀疏索引,可以快速定位要消费的数据
3.kafka写数据时追加到log后面的,是顺序写,写的速度非常快,可达600M/s
4.页缓存+零拷贝
生产者将数据发送到kafka后,kafka会将数据放入操作系统的页缓存(Page Cache是Linux内核管理的内存区域)中,内核有内存管理的功能,它可以将数据持久化到磁盘或者留在内存。至此写结束。读的时候,kafka先看页缓存有没有数据,没有的话从磁盘读取,然后直接通过网关给消费者(如果跨节点会涉及网卡)。所以说,读取数据不走kafka应用层,提高了效率
11.消费者原理
消费方式
消费者主动从kafka拉取数据,原因很简单,不同消费者的消费速度不同,可以根据自己的能力消费,如果kafka推数据的话,因为是一个固定的速度,很难满足所有消费者,但是,kafka没数据的时候,消费者一直尝试拉数据,那就一直返回空,陷入了死循环
消费规则
单个消费者可以同时消费多个分区的数据
消费者组里消费者(消费者的groupid相同),每一个只能消费尚未被组里其他人消费的分区
如果一个消费者组的的消费者个数小于分区数,那么一个消费者可以消费多个分区(前提这几个分区没有被其他消费者正在消费)
如果消费者组的的消费者个数等于分区数,一对一刚好
如果消费者组的的消费者个数大于分区数,那就有空闲的了
offset
1.谁提交,提交到哪里?
消费记录由kafka一个特殊的主题,_consumer_offsets存储,记录每个分区消费到哪里了(offset标识),实际上是每隔5秒(可更改)将当前offset自动(也可以我们自己手动提交)提交到相应的主题。采用K-V存储,K是group.id+topic+分区号,value就是当前offset的值,这样就能保证每个消费者挂了重启后可以继续消费了
自动提交offset代码
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
1000);
手动提交offset代码
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false");
consumer.commitAsync();
2.offset使用
可以通过设置参数来规定如何使用offset auto.offset.reset = earliest | latest | none 默认是 latest
- earliest:自动将偏移量重置为最早的偏移量,–from-beginning
- latest(默认值):自动将偏移量重置为最新偏移量
- none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
- 任意指定offset位移开始消费
手动指定offset
public class CustomConsumerSeek {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"100");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
ArrayList<String> topics = new ArrayList<>();
topics.add("topicx");
consumer.subscribe(topics);
Set<TopicPartition> assignment = consumer.assignment();
while(assignment.size() == 0){
consumer.poll(Duration.ofSeconds(1));
assignment = consumer.assignment();
}
for(TopicPartition topicPartition : assignment){
consumer.seek(topicPartition,200);
}
while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.partition()+" " + record.offset() + " "+ record.value());
}
}
}
}
指定消费一定时间内的数据(利用offset的时间戳)
public class CustomConsumerSeekTime {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"1111");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
consumer.subscribe(topics);
Set<TopicPartition> assignment = consumer.assignment();
while(assignment.size() == 0){
consumer.poll(Duration.ofSeconds(1));
assignment = consumer.assignment();
}
HashMap<TopicPartition, Long> hashMap = new HashMap<>();
for(TopicPartition topicPartition : assignment){
hashMap.put(topicPartition,System.currentTimeMillis() - 24 * 3600 * 1000);
}
Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(hashMap);
for(TopicPartition topicPartition : assignment){
OffsetAndTimestamp time = map.get(topicPartition);
consumer.seek(topicPartition,time.offset());
}
while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
}
}
}
哪个消费者消费哪个分区呢?
1.Range策略(针对每个主题):
首先将主题的分区按照序号排序,然后将消费者也排序
假如有个主题有7个分区t1-t7,三个消费者C1-C3
然后通过分区数/消费者数 来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费1个分区
7/3 = 2… 所以每个消费者应该消费2个,多的一个由第一个分区消费
也就是C1消费t1-t3 C2消费t4-t5 C3消费t6-t7,假如其中一个C3挂了,那么这个消费者要消费的分区数据先不会被消费,而是等45秒,等过了45秒后还没被消费就确定真的挂了,那么t6-t7会被分到另外的消费者,实现再平衡
这仅仅是针对一个主题而言 ,如果很多主题这样,那么C1将会比其他分区多消费N个!导致数据倾斜
2.RoundRobin策略(针对所有主题)
把所有的分区和所有的消费者列出来,按照hashcode排序,然后通过轮询算法分配
假如有个主题有7个分区t1-t7,三个消费者C1-C3,均是排好序的
那t1给C1 t2给C2 t3给C3 t4给C1 t5给C2…
3. Sticky策略
和Range策略不同的是分区不会排序,而是随机分到每个分区(7/3 = 2… 所以每个消费者应该消费2个,多的一个由第一个分区消费),也就是C1可能消费t2 t3 t6 ,C2消费t1 t4 C3消费t5 t7
4.IDEA配置分区策略
XX是全类名
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"XX")
消费者过程 1.消费者发送拉取请求到ComsumerNetworkClient 2.请求传到集群 3.当等待时间超过fetch.max.wait.ms(默认500ms)或者数据批次超过Fetch.min.bytes(默认1字节,最大是50M)集群发送数据到一个消息队列 4.消费者从队列中拉取数据,默认最大是500条 5.经过序列化器和拦截器后得到最后的数据
消费者提高吞吐量(数据积压了怎么办)
1.可以尽可能让消费者数 = 分区数 2.可以提高批次拉取的大小 3.生产者也可以调整4个参数来缓解
|