IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka(二)— 原理解析 -> 正文阅读

[大数据]Kafka(二)— 原理解析

1.生产者原理

在这里插入图片描述

1.首先创建main线程,创建Producer对象,调用send方法发送数据,遇到拦截器就处理数据,然后将数据序列化(kafka本身的序列化器,开发要手动指定),然后进入分区器,根据分区数进行分区

2.然后将分好区的数据发往一个位于内存的缓冲区(默认32M)每个分区对应一个双端队列,生产者发送数据的批次大小为16K

3.sender线程的Sender读取数据,读取的临界条件是每个队列内的数据大小总和达到batch.size(默认16K)或者sender等待时间达到了linger.ms(默认0ms)

4.每个队列的数据被“转换”成请求,放入请求队列,去请求集群上对应节点的对应分区,要通过底层创建的Selector传送到集群,分区应答机制分为三种①应答机制为0:生产者只需要发送数据,其余啥都不用管 ②应答机制为1 需要等待leader收到数据后才返回应答 ③ -1(all) leader和follower都收到数据后才应答若节点不应答,最多可累计5条请求

5.发送成功到Kafka集群,那么清除内存中对应的数据,不成功重试(重试次数可达int的最大值次)

2.主题分区

主题为什么分区

在这里插入图片描述
很明显,将数据分区,可以合理的使用存储资源

其次,可以实现并行度,生产者可以以分区为单位进行发送数据,消费者可以分区为单位进行分区

主题分区策略

假如我只有3台服务器,但我对某主题分了9个分区,每个分区2个副本,那我这9个分区的leader在哪?follower在哪? 注意,副本数不能超过机器数(超过会报错)

测试

./kafka-topics.sh --bootstrap-server hadoop102:9092 --topic topicx --create --partitions 9 --replication-factor 2

./kafka-topics.sh --bootstrap-server hadoop102:9092 --topic topicx --describe 

在这里插入图片描述

可以看到,leader采取了轮询的方式,实现了leader自动平衡。关于follower,以每个节点为单位看,我们可以看到,位于节点0的leader的副本策略是这样的,[0,1] [0,2] [0,1],如果再来一个leader位于0,那它的副本策略是[0,2],也是为了负载均衡

生产经验 修改分区副本的位置

假如我想把分区0的副本由[1,2]变为[0,1]

在kafka目录下vim increase-replication-factor.json

{
"version":1,
"partitions":[{"topic":"topicx","partition":0,"replicas":[0,1]}
}
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

在这里插入图片描述
此外,还可以通过种方式增加副本,假如我把上述json文件再修改一下,改成[0,1,2],那该分区的副本数就会增加!

数据分区策略(数据被分到哪个分区规则)

可以看到,生产者的构造函数有以下6种
在这里插入图片描述
对于前4种,我们可以通过第二个参数直接指定分区

对于第5种,虽然没有指定分区,但是指定了key,那么数据在哪个分区由 key的hash值与topic的partition数进行取余操作得到分区号,比如我们可以将一张mysql的表名字作为key,那么这张表的数据就会进入一个分区了

对于第6种,既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)
例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)

自定义数据分区器

实现Partitioner接口,重写方法

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 发送的数据 value
        String str = value.toString();

        int partition;

        if(str.length() <= 3){
            partition = 0;
        }else {
            partition = 1;
        }
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

在属性中指定分区类,发送消息就会根据分区器进行发送了

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.gzhu.MyPartitioner");

3.生产者吞吐量

主要有4种方式改变吞吐量

1.改变RecordAccumulator的大小,缓冲区大了,数据处理能力也强了
2.改变batch.size的大小,这个要根据实际需求改变,如果太小导致发送数据太频繁,太大导致数据延迟高
3.改变linger.ms的时间,如果太小导致发送数据太频繁,太大导致数据延迟高
4.修改压缩方式

// 更改缓冲区大小 默认33554432 = 32M
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// 批次大小 默认16K 16384
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// linger.ms  1ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
// 压缩类型 snappy用多一些
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

4.数据可靠性 ISR原理

每个队列的数据被转换成请求队列,去请求集群上对应节点的对应分区,要通过底层创建的Selector传送到集群,集群应答机制分为三种

①应答机制为0 生产者只需要发送数据,其余啥都不用管 ,很明显,如果leader没收到,就丢失了,但是生产者认为发送成功了,因此这种机制很不可靠,基本不用

②应答机制为1 需要等待leader收到数据后才返回应答 ,也有缺陷,leader收到某数据并且应答了,但是follower还没来得及同步,此时leader挂了,数据没有备份且唯一收到数据的leader还挂了,此时生产者已经收到过应答了,认为已经发送成功了,数据就丢失了

③ -1(all) leader和follower都收到数据后才应答 看似十分可靠,但是有个问题,比如某follower同步的时候,由于故障挂掉了,此时这个follower就无法产生应答了,生产者却一致在等,那整个集群不就瘫痪了吗?

为此,Leader维护了一个动态的In-Sync Replicas(ISR),意为和Leader保持正常同步的Follower+Leader集合 (leader:0,isr:0,1,2),因此我们只要关注ISR里节点的应答就可以

在这里插入图片描述

如果Follower在30s内未向Leader发送通信请求或同步数据,则该Follower将被移出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如节点2的副本超时,则(leader:0, isr:0,1)

这样就不用等长期联系不上或者已经故障的节点

如果分区副本设置为1个,或者ISR里应答的最小副本数量( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)。因此,想要保证数据完全可靠,ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2,回到当初的问题,[某follower同步的时候,由于故障挂掉了,此时这个follower就无法产生应答了,生产者却一致在等那整个集群不就瘫痪了吗?],由于ISR机制,这个follower被踢出,生产者等到了另外两个的应答,也视为成功

机制选择

在生产环境中,acks=0很少使用

acks=1,一般用于传输普通日志,允许丢个别数据

acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景

在开发中,主要配置两个参数

// 应答机制,默认为all
properties.put(ProducerConfig.ACKS_CONFIG,"all");
// 重试次数,默认int的最大值次
properties.put(ProducerConfig.RETRIES_CONFIG,1000);

5.数据重复问题 幂等性原理

先明白一点,我们目前针对的某个分区来分析的

假如对于某个分区,一个leader和两个follower,都成功收到了数据,但是就在返回应答的时候,leader挂了,那么生产者收不到全部的应答(应答机制为all),由于ISR机制,生产者会重新发送数据,此时原来的两个follower(有个follower在leader挂了后会成为新的leader)已经有了数据,再接收一次就产生了分区出现了数据重复问题
在这里插入图片描述

关于发送次数的一些场景

1.若至少发送一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

2.若发送最多一次(At Most Once)= ACK级别设置为0

At Least Once可以保证数据不丢失,但是不能保证数据不重复

At Most Once可以保证数据不重复,但是不能保证数据不丢失(实际我们不用)

3.精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失,我们怎么实现呢???幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端某分区都只会持久化一条,保证了不重复

幂等性主要通过三个属性来实现:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker某分区只会持久化一条

其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的,是每条数据的序列化号,不同数据是不同的

很明显,幂等性只能保证的是在单会话单分区内不重复,为什么只能保证单会话?示例,对于一个消息a,kafka集群启动了两次,由于PID不同,分区认为这是不同的数据,不会认为是相等的数据(实际都是数据a)

在这里插入图片描述

再回到当初的情景,节点0挂了,生产者重新发送数据,新的leader再次接收到a时,由于PID没变,分区号没变,SeqNumber没变,重复了,不接受,成功解决分区内的数据重复问题!

开发代码中如何实现幂等性?

开启参数enable.idempotence默认为true,false 关闭,实际上,底层幂等性是默认开启的,我们不写也可以

6.Kafka事务原理

幂等性只能保证的是在单分区单会话内不重复,幂等性不能跨多个分区。多个分区之间不会重复数据怎么实现,怎么办? 事务,kafka事务可以保证对多个分区写入操作的原子性

开启事务,必须开启幂等性,事务底层依赖幂等性

首先确定使用哪个事务协调器(因为每个节点都有事务协调器,啥?啥是事务协调器?kafka集群和客户端进行事务通信总得有个东西吧?就是通过它!),如何确定?集群中有一个特殊的主题,用来存储事务的信息,默认有50个分区,每个分区都会负责一部分事务。如何确定事务属于50个中的哪个分区?需要程序员手动输入一个transactional.id(全局唯一),将transactional.id的hashcode%50计算出属于哪个分区,该分区leader所在的节点上那个事务控制器就是我们要使用的

我们假设我们的主题是3个分区,且使用broker0的事务控制器

在这里插入图片描述

1.因为事务依赖于幂等性,保证单个分区内的数据不重复,所以生产者首先要向事务协调器获取Producer ID,也即PID
2.事务协调器返回当前的PID
3.开启事务后生产者向集群的不同分区发送数据,是原子性操作
4.告诉事务协调器事务提交commit请求
5.事务协调器通知事务主题该请求,让其进行持久化操作,这样保证了客户端挂了后,重启后事务可以继续处理未完成的数据
6.事务协调器告知生产者事务提交成功
7.事务协调器会发送请求到每个分区,确认每个分区是否收到了数据,确保事务成功
8.每个分区应答给事务协调器,告知一切正常
9.事务协调器收到通知后,将该条事务的执行成功持久化,确保不会再执行该事务了

IDEA 事务

public class TransactionsTest {
    public static void main(String[] args) {
        // 1.配置属性
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.102:9092");
        // 指定对应的key和value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 指定事务id  任意取,但唯一
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"100");


        // 2.创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);


        // 3.初始化并开启事务
        producer.initTransactions();
        producer.beginTransaction();

        try {
            // 4.发送数据
            for (int i = 0; i < 5; i++) {
                producer.send(new ProducerRecord<>("first","Hello " + i));
            }
            // 5.提交事务
            producer.commitTransaction();
        }catch (Exception e){
            // 有异常终止事务
            producer.abortTransaction();
        }finally {
            // 6.关闭生产者
            producer.close();
        }
    }
}

7.数据有序原理

如果保证分区内有序,是有条件的,这里的有序是指生产者发送 1 2 3,集群收到的数据为1 2 3,而不是2 1 3 等等

什么条件呢?

没有开启幂等性,数据没有序列化号(序列化号大的是后来的,因为序列化号是递增的)!不知道谁先来的,谁后来的,因此为了保证有序,直接设置下图中请求只能存一个,max.in.flight.requests.per.connection需要设置为1,当前请求成功后才发送下一个请求

不开启幂等性并且max.in.flight.requests.per.connection不为1,假设当有两个请求时,请求1先来的,但是请求失败了,请求2成功了,这样请求1再次请求成功后,1在2的后面了,因此只能max.in.flight.requests.per.connection只能为1

在这里插入图片描述
开启了幂等性,最大的好处是有序列化号了,可以根据序列化号判断先后了!

max.in.flight.requests.per.connection需要设置小于等于5,这样我就能保证最近5个请求时有序的了

开启幂等性,假设当有两个请求时,请求1先来的,但是请求失败了,请求2成功了,但是集群知道第一个应该是请求1,先在内存放着请求2,等待请求1,这样请求1再次请求成功后,先让1排到前面去,在落盘,保证了有序

注意,只保证了分区内有序,如果想实现分区间有序,只能所有分区数据到了集群排序了

8.ZooKeeper一些信息

1.kafka/brokers/ids 可以看到哪些节点正常运行
在这里插入图片描述
2.kafka/brokers/topics可以看到每个主题的每个分区的leader和ISR
在这里插入图片描述

3.kafka/controller是个辅助选举leader的
在这里插入图片描述

两者结合使用原理
在这里插入图片描述

每个kafka的节点启动后会向/brokers/ids/ 注册,然后每个节点都会有controller控制器,采取抢占式,哪个节点的controller抢到了,谁就说了算,然后由该controller监控ids并进行每个分区的leader选举,选举规则:ISR存活为前提,谁在AR(Assigned Replicas) 中排在前面谁就是leader(AR:分区所有副本统称),对于分区1,启动的时候AR假如是是这样的[1,0,2],那么就会选举broker1的作为leader,然后controller会将分区信息上传到zookeeper,其余的controller会同步信息(保证容错性)

假如leader挂了,controller监听到了,那么controller会立马拉取zookeeper上的ISR信息,根据选举规则再次选举,找活着的,并且在AR拍前面的就是新的leader

数据在分区是怎么存储的
在这里插入图片描述
实际是以主题名字加分区号命名的

每个分区都会有个log文件,log是有多个Segment组成,生产者到topic的数据就被存在Segment

以Segment(1G)为单位存储,为了加快检索速度,会有个.index文件,使用索引加快查询速度,这里是稀疏索引当往log文件写入4kb数据时,index才会增加一条索引,也就是这4kb共用一个索引,找数据的时候先找到数据时哪个索引管,然后根据索引信息去log中找
在这里插入图片描述
在这里插入图片描述

数据清除

一.删除

每个segment最后来的数据的时间戳,超过7天删除(这个时间可以修改),也就是最后来的数据过期,才会清除,最后来的都过期了,其余的肯定过期了

二.压缩

适合K-V类型数据,对于相同的key,value只保留最新的value

9.leader和follower同步问题

在这里插入图片描述

LEO(Log End Offset) : 每个副本的最后一个offset,看图

HW(High Watermark):所有副本最小的LEO,上图中,Follower1的LEO最小,那么所有副本的HW为4号位

Follower出现故障

假如上图中,Follower2出现故障,ISR首先会将节点删除,但是Follower1和Leader会继续接收数据

Follower2恢复后,会读取本地磁盘的HW,并将文件中高于HW的部分删掉,因为它觉得这些数据没校验,然后开始同步Leader的数据,直到该Follower的LEO大于等于该分区的HW,才算真正的恢复,加入到ISR

在这里插入图片描述

Leader出现故障

Leader出现故障后,根据选举规则会选择一个新的leader,那么此时新leader的LEO和HW成为新的标准,所有的follower的数据都要向新的leader看齐,也即follower多余的数据的数据会被砍掉,很明显,这样可能会导致数据丢失

在这里插入图片描述

10.kafka高效读写原理

1.kafka本身是分布式,分区技术提高了并行度

2.数据采用稀疏索引,可以快速定位要消费的数据

3.kafka写数据时追加到log后面的,是顺序写,写的速度非常快,可达600M/s

4.页缓存+零拷贝

生产者将数据发送到kafka后,kafka会将数据放入操作系统的页缓存(Page Cache是Linux内核管理的内存区域)中,内核有内存管理的功能,它可以将数据持久化到磁盘或者留在内存。至此写结束。读的时候,kafka先看页缓存有没有数据,没有的话从磁盘读取,然后直接通过网关给消费者(如果跨节点会涉及网卡)。所以说,读取数据不走kafka应用层,提高了效率
在这里插入图片描述

11.消费者原理

消费方式

消费者主动从kafka拉取数据,原因很简单,不同消费者的消费速度不同,可以根据自己的能力消费,如果kafka推数据的话,因为是一个固定的速度,很难满足所有消费者,但是,kafka没数据的时候,消费者一直尝试拉数据,那就一直返回空,陷入了死循环

消费规则

单个消费者可以同时消费多个分区的数据

消费者组里消费者(消费者的groupid相同),每一个只能消费尚未被组里其他人消费的分区

如果一个消费者组的的消费者个数小于分区数,那么一个消费者可以消费多个分区(前提这几个分区没有被其他消费者正在消费)

如果消费者组的的消费者个数等于分区数,一对一刚好

如果消费者组的的消费者个数大于分区数,那就有空闲的了
在这里插入图片描述

offset

1.谁提交,提交到哪里?

消费记录由kafka一个特殊的主题,_consumer_offsets存储,记录每个分区消费到哪里了(offset标识),实际上是每隔5秒(可更改)将当前offset自动(也可以我们自己手动提交)提交到相应的主题。采用K-V存储,K是group.id+topic+分区号,value就是当前offset的值,这样就能保证每个消费者挂了重启后可以继续消费了

自动提交offset代码

// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
true);
// 提交 offset的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
1000);

手动提交offset代码

// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false");

// 消费消息.........

// 异步提交 offset 异步提交就是offset发出去,不管_consumer_offset接收到没有,就开始消费下一批数据了  同步提交就是offset发出去,_consumer_offset接收到,才开始消费下一批数据
consumer.commitAsync(); 

2.offset使用

可以通过设置参数来规定如何使用offset
auto.offset.reset = earliest | latest | none 默认是 latest

  • earliest:自动将偏移量重置为最早的偏移量,–from-beginning
  • latest(默认值):自动将偏移量重置为最新偏移量
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
  • 任意指定offset位移开始消费

手动指定offset

public class CustomConsumerSeek {
    public static void main(String[] args) {
        // 1.配置
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        // 2.反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

        // 3.配置消费者id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"100");

        // 4.创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

        // 5.订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("topicx");
        consumer.subscribe(topics);

        // 6.指定offset
        // 6.1 获取当前主题的分区信息
        Set<TopicPartition> assignment = consumer.assignment();
        // 6.2 保证分区已存在
        while(assignment.size() == 0){
            consumer.poll(Duration.ofSeconds(1));
            assignment = consumer.assignment();
        }

        // 6.3 指定offset
        for(TopicPartition topicPartition : assignment){
            consumer.seek(topicPartition,200);
        }
        // 6.消费
        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.partition()+" " + record.offset() + " "+ record.value());
            }
        }

    }
}

在这里插入图片描述
指定消费一定时间内的数据(利用offset的时间戳)

public class CustomConsumerSeekTime {
    public static void main(String[] args) {
        // 1.配置
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        // 2.反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

        // 3.配置消费者id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"1111");

        // 4.创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

        // 5.订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        consumer.subscribe(topics);

        // 6.指定offset
        // 6.1 获取当前主题的分区信息
        Set<TopicPartition> assignment = consumer.assignment();
        // 6.2 保证分区已存在
        while(assignment.size() == 0){
            consumer.poll(Duration.ofSeconds(1));
            assignment = consumer.assignment();
        }
        // 6.3 指定offset的时间 每个分区对应时间
        HashMap<TopicPartition, Long> hashMap = new HashMap<>();
        for(TopicPartition topicPartition : assignment){
           hashMap.put(topicPartition,System.currentTimeMillis() - 24 * 3600 * 1000);
        }
        // 通过时间获取offset
        Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(hashMap);
        // 6.4 指定offset
        for(TopicPartition topicPartition : assignment){
            OffsetAndTimestamp time = map.get(topicPartition);

            consumer.seek(topicPartition,time.offset());
        }


        // 7.消费
        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }

    }
}

哪个消费者消费哪个分区呢?

1.Range策略(针对每个主题):

首先将主题的分区按照序号排序,然后将消费者也排序

假如有个主题有7个分区t1-t7,三个消费者C1-C3

然后通过分区数/消费者数来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费1个分区

7/3 = 2… 所以每个消费者应该消费2个,多的一个由第一个分区消费

也就是C1消费t1-t3 C2消费t4-t5 C3消费t6-t7,假如其中一个C3挂了,那么这个消费者要消费的分区数据先不会被消费,而是等45秒,等过了45秒后还没被消费就确定真的挂了,那么t6-t7会被分到另外的消费者,实现再平衡

这仅仅是针对一个主题而言,如果很多主题这样,那么C1将会比其他分区多消费N个!导致数据倾斜

2.RoundRobin策略(针对所有主题)

所有的分区和所有的消费者列出来,按照hashcode排序,然后通过轮询算法分配

假如有个主题有7个分区t1-t7,三个消费者C1-C3,均是排好序的

那t1给C1 t2给C2 t3给C3 t4给C1 t5给C2…

3. Sticky策略

和Range策略不同的是分区不会排序,而是随机分到每个分区(7/3 = 2… 所以每个消费者应该消费2个,多的一个由第一个分区消费),也就是C1可能消费t2 t3 t6 ,C2消费t1 t4 C3消费t5 t7

4.IDEA配置分区策略

XX是全类名

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"XX")

消费者过程
在这里插入图片描述
1.消费者发送拉取请求到ComsumerNetworkClient
2.请求传到集群
3.当等待时间超过fetch.max.wait.ms(默认500ms)或者数据批次超过Fetch.min.bytes(默认1字节,最大是50M)集群发送数据到一个消息队列
4.消费者从队列中拉取数据,默认最大是500条
5.经过序列化器和拦截器后得到最后的数据

消费者提高吞吐量(数据积压了怎么办)

1.可以尽可能让消费者数 = 分区数
2.可以提高批次拉取的大小
3.生产者也可以调整4个参数来缓解

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-28 11:56:15  更:2022-04-28 11:57:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 10:58:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码