可能丢消息的场景
从Producer的角度考虑
- 消息在写入累加器之前失败了,这个只能靠上层应用自行处理
- 消息写入累加器后吗,未发送到Broker前Producer宕机,因为消息存在内存里,所以有丢失的风险。 kafkaProducer.close()方法会在Producer退出前等待累加器中消息被处理
- Sender线程将消息发送到Broker前因为一些可重试的异常(比如网络抖动、leader选举)导致发送失败
对于可重试异常,可以考Producer端配置retries让Sender线程自动重试,重试失败则交给主线程捕获异常或在回调函数中处理(写到flume里,定时重试)。 - Sender线程将消息发送到Broker时发生了一些不可重试的异常,比如消息太大
从Broker角度考虑
几个概念 AR:一个分区的所有副本集合 ISR:与leader副本保持同步状态的副本集合,leader副本本身也是ISR集合中的一员 同步状态:follower副本滞后leader副本的时间不超过某个值(replica.lag.time.max.ms) acks: 0: 发送消息后不等Broker回应 1: 只要leader副本成功写入消息就会收到Broker的响应 all: 需要ISR中所有副本都成功写入消息才会收到Broker的响应
min.insync.replicas: 最少同步副本,=2表示ISR集合中至少要存在两个副本,producer才能向分区发送消息。 当acks=all && min.insync.replicas >= 2 时,可以认为Broker端保证了消息不丢失
从Consumer的角度考虑
- 消费者还没处理完消息就提交偏移量
- enable.auto.commit: 是否开启自动提交,开启后消费者在轮询拉取到消息后或者间隔一定时间之后会自动提交偏移量,如果此时消费者还在处理offset为100消息,同时拉取了新的一批消息触发了自动提交offset100,然后宕机了,那么就丢失了offset为100的消息
extractly once保证机制
幂等producer
实现原理:PID + 序列号 开启方法:props.put(“enable.idempotence”, ture)
- 每个 producer 在启动的时候会从 Broker 端获取到一个 PID,重启后会获得新的 PID
- 每个 <PID, 分区> 都会对应一个序列号,维护在 Broker 端
- 生产者每发送一条消息,都会将 <PID, 分区> 对应的序列号加 1
- Broker 端收到消息后,根据序列号来判断是否是重复消息
注意: 因为序列号是跟 PID、分区绑定的,且每次 producer 都会产生新的 PID,所以幂等 Producer 提供的幂等特性只针对单 producer 会话中的单分区幂等
事务producer
定义: 事务型 Producer 能够保证将一批消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。 开启方法:
- 和幂等性 Producer 一样,开启 enable.idempotence = true。设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字。
- 和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
- 在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level 参数的值即可。当前这个参数有两个取值:
a. read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。 b. read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
|