生产者:
- 流程:send方法先经过拦截器,序列化器,分区器,然后到缓冲区,缓冲区默认32M,当缓冲区内批次数据大小到达16k或等待linger.ms设置时间后,消息会发送到达Kafka集群,集群会根据ack应答策略应答,如果失败会重试
- 提高吞吐量配置参数:
(1).buffer.memory :RecordAccumulator 缓冲区总大小,默认 32m,生产课调到64,增大吞吐量 (2).batch.size:缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量 (4).linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。 (5).compression.type:生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。 - 数据可靠性配置参数
(1).acks:0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader 收到数据后应答。-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和 all是等价的。 - 数据去重配置参数:
(1).enable.idempotence:是否开启幂等性,默认 true,表示开启幂等性。 (2).代码中创建事务(其实也是一个存储事务信息的topic,有50个分区,transactional.id的hashcode值%50找分区,分区leader副本所在的boker节点处理事务) - 数据有序配置参数:
(1).单分区内保证有序: (2).enable.idempotence:是否开启幂等性,默认 true,表示开启幂等性。 (3).max.in.flight.requests.per.connection需要设置小于等于5
Broker 流程:启动后在zk注册broker,controller(哪个broker先注册,哪个broker的controller就是老大),controller老大监听所有boker,并负责topic的leader选举上传zk 核心配置参数: replica.lag.time.max.ms:ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s auto.leader.rebalance.enable:默认是 true。 自动 Leader Partition 平衡。建议关闭。
消费者: 核心配置参数: group.id:标记消费者所属的消费者组 enable.auto.commit:默认值为 true,消费者会自动周期性地向服务器提交偏移量
消费者再平衡配置参数: heartbeat.interval.ms:Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms,也不应该高于session.timeout.ms 的 1/3。 session.timeout.ms: Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。 max.poll.interval.ms: 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。 partition.assignment.strategy: 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。可以选择的策略包括:Range、RoundRobin、Sticky、CooperativeSticky
提高吞吐量配置参数: 增大topic分区数和消费者数:消费者数 = 分区数 fetch.max.bytes:默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数 max.poll.records:一次 poll 拉取数据返回消息的最大条数,默认是 500 条 增加下游处理能力:ES,Mysql
精准一次: 事务 + 手动提交 offset (enable.auto.commit = false)。 消费者输出的目的地必须支持事务(MySQL、Kafka)。
|