完整性
如何保证数据的端到端的数据完整性? 这里说的完整性,就是说“不丢数据”,这里是不丢数据的意思。这里没有保准数据的一致性。也就是说,sink 端收到的数据可能是重复的。
如果要保证完整性,需要再 producer 端、broker 端、consumer 端,这三个地方下功夫。
producer 端
producer 端可以有两种发送方式,一个是只负责发送,至于 broker 收到没有收到那就不管了。另外一种类似 TCP 协议,每一个 TCP 包都会有一个 ACK 回复,收到 ACK 才确定这条数据发送完毕了。 显然是第二种方式是正道。对应的配置是
acks:"all"
这里的 all 代表了,将数据写入 ISR 中的所有副本,才算 producer 提交完成。否则视为不完成。 如果要是超时或者发送失败,怎么办呢?很简单重复呗,那重发的次数怎么配置呢?
reties:5
这里设置比 1 大的数即可。
如果要是超过重试的次数,还是收不到 ACK。那怎么办呢?我最好是能知道那条数据没有收到ACK,然后将这条数据保存起来,待 kafka 好了的时候,再发送。那如何实现呢?就是使用回调函数来做了,send(record,callback),其中的 callback 就回调函数了,下面是 callback 的定义。callback 是一个 interface。
callback
这个 interface 里面只有一个方法:
public void onCompletion?(RecordMetadata metadata, Exception exception)
这里对具体的用法不过多的做解释。如果消息每成功的写进入,那 RecordMethod 的值都是 -1 。Exception会给出异常的详细信息。在回调函数里面,就可以把没有发送成功的数据写入到磁盘里面,看什么时候合适再重新发送。
brokder 端
在 producer 端,提到了 acks 这个配置 ,这个配置和 broder 的 ISR 的概念相关。在 broder 端有了下面这个参数和 ack 是紧密相关的。
min.insync.replicas 规定了 ISR 的最小值,也就是说,如果我们设置这个值
大于 1, 当 leader 副本成了光杆司令,即使将消息写入磁盘,broker 也不会
认为它已经提交成功,也就不会发送 ACK 了。
既然说到了 ISR,ISR 最大的作用就是告诉 brokder leader 副本到底要同步给多少个 follow 副本才算ok。我们更关注的其实是 follow 副本什么时候被踢出局,什么时候有能回来。这就要清楚下一个参数
replica.lag.time.max.ms 默认值为 10 s,异常 ISR 的阈值
设置一个比较小的值,可以leader 和 follow 尽量保持一致。等到了遇到什么砍的,不得不换 leader 的时候,follow 不会和 leader 太多。那要是被提出 ISR 也参与到 leader 的选举,不幸的是,这个落伍的 follow 选上了 leader ,那 consumer 消费不到原来 leader 上的数据了,所以一定不能让落伍的 follower 参与选举。这也是一个相关的配置。
最后,如果我们就给一个分区一个副本,以上这些参数怎么玩,也玩不转了,所以在磁盘资源足够多的情况下,给 3 个副本吧,太多也没用。
replication.factor 副本的个数,而且 replication.factor > min.insyhc.replicas
consumer 端
cosumer 最好是关闭自动提交 offset 的设置。
auto.commit.enable = false
要让程序显示的提交。这个提交 offset 的时机把握就是关键了,一定要早处理数据后,再提交 offset ,如果这个处理数据出现了问题,一定不能提交 offset 。
总结
kafka 只对已经提交的数据,做有限度的持久化保证
|