什么是位移提交,定义。
??Consumer需要向kafka汇报自己的位移数据,这个汇报过程称为位移提交。因为Consumer可以同时消费多个分区。所以位移提交是按照分区的粒度进行的。即Consumer需要为分配给他的每个分区提交各自的位移数据。 ??作用是:位移提交表示了Consumer的消费进度。这样当Consumer发生故障重启后,就可以读取之前提交的位移,然后从指定位移开始继续消费。类似于书签。 ??我们要知道,位移的提交完全由我们掌控,我们可以提交任何位移值,但是我们也就需要为我们提交的位移负责。
位移提交的分类
用户角度分类
- 自动提交
- 手动提交
Consumer角度
- 同步提交
- 异步提交
自动提交和手动提交:
??我们将Consumer端的参数:enable.auto.commit设置为true,就开启了自动提交。如果我们开启了自动提价哪还有一个参数:auto.commit.interval.ms 它是自动提交的时间间隔,默认是5秒钟。 ??我们开启了自动提交之后就不需要对提交位移进行管理了,并且kafka会保证在开始调用Poll方法时,提交上一次poll返回的所有消息。从顺序上说,是先提价上一批消息的位移,再处理新的消息。因此他能保证消息不丢失,但是会出现重复的情况。 ??也就是,Consumer是每5秒提价一次位移,如果,Rebalance发生在第三秒的时候。这样当重启之后,前三秒已经消费过的消息就会被重新消费,因为位移数据还是三秒前的位移数据。虽然你可以通过减少提交位移的时间间隔,但是并不能完全避免重复消费,只能减少重复消费的数据。 ??手动提交:只要将上面的自动提交的参数设置成false就可以了。
同步提交和异步提交
??手动提交更加灵活,但是,有一个缺陷:调用commitSync()同步提交时,Consumer会处于阻塞状态。会影响应用程序的TPS。因为这个问题,又出现了另一个API:commitAsync()异步提交。异步提交不会阻塞Consumer但是它没有重试机制,不能保证提交一定会成功。所以通常情况下是异步提交和同步提交一起使用,先用异步提交进行阶段性提交避免阻塞,然后在Consumer要关闭时再用同步提交最后提交一次,确保Consumer位移的正确性。
kafka Consumer API还为手动提交提供了这样的方法: commitSync(Map<TopicPartition,OffsetAndMetadata>)和commitAsync(Map<TopicParttion,OffsetAndMetadata>)它们的参数是一个Map对象,键是TopicPartition(消费的分区),值是:OffsetAndMetadata对象(主要保存的是位移数据)。它可以将一个大的事务分割成若干个小的事务分别提交,能有效的减少错误恢复的时间。
|