Kafka消费者
1 消费者概念
1.1 消费者与消费者组
应用程序--->kafka--->应用程序
生产者 主题 消费者
1. 上游应用程序将数据发送到主题中再由下游应用程序读取、验证数据。
2. 出现的可能性情况:生产者生产数据的速度超过消费者验证数据的速度
这个时候就可以使用消费者组,由消费者组订阅主题,消费者组中的每个消费者分别消费部分分区的数据,实现消费端的横向扩展
3. 需要注意的地方:消费者组中的消费者数量应不超过分区数,避免造成资源浪费
1.2 消费者组和分区再均衡
1. 什么是再均衡?
分区的所有权限由一个消费者转向另一个消费者,这种行为就是再均衡。
2. 优势?
实现了消费者组的高可用性和伸缩性
3. 不足?
再均衡期间,消费者无法读取消息,造成整个群组一小段时间处于不可用的状态
当分区被重新分给另一个消费者时,消费者当前的读取状态会丢失,有可能还需要刷新缓存,从而拖慢应用程序
4. 消费者与消费者组的通信
消费者通过向被指派为`群组协调器`的broker发送心跳来维持他们和群组的从属关系以及他们对分区的所有权关系。只要消费者以正常时间间隔发送心跳,就认为活跃,否则就会进行再均衡。
5. 如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认他死亡了才会触发再均衡。在此期间,死掉的消费者不会读取分区中的消息。在清理消费者是,消费者会通知协调器,此时会立即触发一次再均衡。
6. 如何分配分区?
当有消费者加入群组时,他会向群组协调器发送一个JoinGroup请求。第一个加入群组的消费者成为“群主”。群主从协调器那里获得群组的成员列表,并负责给每个消费者分配分区。
2 创建消费者
2.1 创建所需的必选属性
属性 | 描述 |
---|
bootstrap.servers | broker的地址清单,地址格式:host:port | key.serializer | 键的序列化器 | value.serializer | 值的序列化器 |
2.2 创建代码示例
producer.properties
bootstrap.servers=linux121:9092,linux122:9092,linux123:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka_customer
Properties props = new Properties();
props.load(Demo1_Kafka_Consumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
2.3 订阅主题
consumer.subscribe(Collections.singletonList("linux-test-01"));
2.4 轮询
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n", record.offset(), record.key(), record.value(), record.partition());
}
2.5 配置信息
配置 | 描述 |
---|
fetch.min.bytes | 指定消费者从服务器获取记录的最小字节数。broker在收到消费者的数据请求时,如果可用的数据量小于该配置指定的数据量,会等到有足够的可用数据时才会把他返回给消费者。 好处:可以降低消费者和broker的负载。 | fetch.max.wait.ms | 指定broker的等待时间。默认500ms如果没有足够的数据流入kafka,消费者获取最小数据量又无法得到满足,最导致500ms的延迟。 | max.partition.fetch.bytes | 指定服务器从每个分区里返回给消费者的最大字节数。默认为1M,即poll方法从每个分区里返回的记录最多不超过该属性的指定值。 在位消费者分配内存时,可以给他多分配一些,因为如果群组中的消费者发生崩溃,剩下的消费者需要处理更多的分区,而该值又必须大于brocker可以接受的最大消息的字节数,否则会使消费者一直处于挂起状态。 另外,还需要考虑消费者处理数据的时间。消费者需要频繁调用poll方法来避免会话过期和发生分区在均衡。如果单次调用poll返回的数据太多,消费者需要用更多的时间来处理数据,可能无法及时进行下一次轮询来避免会话过期。 | session.timeout.ms | 指定了消费者在被认为死亡之前可以与服务器断开连接的时间。 需要同时修改该属性和heartbeat.interval.ms,heartbeat.interval.ms必须小于该属性,一般是该属性的1/3。 该属性的值越小,可以越快的检测和恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置的大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。 | auto.offset.reset | 指定消费者在读取一个没有偏移量的分区或者偏移量无效(因消费者长时间失效,报刊偏移量的记录已经过时并被删除)的情况下该作何处理,值{latest(默认,从最近的消息开始读取),earliest(从开始读取)} | enable.auto.commit | 指定了消费者是否自动提交偏移量,默认为true,可以通过auto.commit.interval.ms来控制提交频率 | partition.assignment.strategy | 设置选择分区策略。 **Range:**把主题的若干个连续的分区分配给消费者。 **RoundRobin:**把主题中所有分区逐个分配给消费者。 | max.poll.records | 用于控制单词调用call()方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。 | receive.buffer.bytes和send.buffer.bytes | 设置socket在读写数据时用到的TCP缓冲区的大小。 如果为-1,则使用操作系统的默认值。 如果生产者或者消费者与broker处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和较低的宽带。 |
2.6 提交和偏移量
2.6.1 自动提交
poll方法中返回的是生产者写进去但是还没有被消费者所消费的这部分数据
消费者发消息(包含分区的偏移量)给_consumer_offset这个主题
自动提交设置:enable.auto.commit-->true
自动提交产生的问题:当当前处理的数据偏移量大于提交的数据偏移量的话会造成数据重复
解决办法:缩短提交偏移量的时间差,auto.commit.interval.ms
2.6.2 提交当前偏移量
public static void main(String[] args) throws IOException {
Properties props = new Properties();
props.load(Demo1_Kafka_Consumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("linux-test-01"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n", record.offset(), record.key(), record.value(), record.partition());
try {
consumer.commitSync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
不足:在broker对提交请求做出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。
2.6.3 异步提交
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n", record.offset(), record.key(), record.value(), record.partition());
}
consumer.commitAsync();
}
在成功提交后碰到无法恢复的错误之前,commitSync()会一直重试,但是commitAsync不会,这个也是他的不足之处。
不重试的原因:他收到服务器相应的时候可能有一个更大的偏移量已经提交成功了。
commitAsync支持回调,在broker做出响应时执行回调,经常被用于记录提交错误或生成度量指标。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n", record.offset(), record.key(), record.value(), record.partition());
}
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.out.println("Commit failed for offsets {}" + offsets + exception);
}
});
}
|