kafka消费者api
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MsgConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
// 是否自动提交offset,默认就是true 当消费者的代码执行完 不管有没有异常都会把这次从broker中拉取的消息偏移量给提交了 如果偏移量被提交了 消费者就不会再次的消费这条消息了
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交offset的间隔时间 如果关闭了自动提交这个配置就会失效 这个表示两次自动提交的间隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 一般使用手动提交
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
/*
当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费
latest(默认) :只消费自己启动之后发送到主题的消息
earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)
*/
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
/*
consumer给broker发送心跳的间隔时间,broker接收到心跳如果此时有rebalance发生会通过心跳响应将
rebalance方案下发给consumer,这个时间可以稍微短一点
*/
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
/*
服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组, 被踢出消费组 这个消费者就无法再消费消息 除非重启
对应的Partition也会被重新分配给其他consumer,默认是10秒
*/
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
//一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
/*
如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费 也就是说当消费者从 broker中poll到消息然后执行业务代码 执行的时间超
过了30秒也就是props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);配置的时间broker会认为这个消费者的机器是垃圾就会把这个消费者
踢出消费者组 在broker只会保留性能高的消费者
*/
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
// key的序列号方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// value的序列号方式
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
// 消费指定分区
//consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
//消息回溯消费
/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/
//指定offset消费
/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/
/*从指定时间点开始消费 kafka没有根据时间消费的api 它只是根据一个时间 去查找这个时间对应的那条消息的偏移量 然后最终根据偏移量消费消息 kafka保存消息的时候 会有.log、.index、.timeindex这三个文件所有的消息都会保存在log文件中为了提升效率会把log文件中的消息每存满4kb把第4kb的那条消息分别放到index、timeindex文件中用来做索引 当使用指定时间消费 就会去timeindex文件中查找这个时间然后找到它的偏移量 然后返回 然后再根据这个偏移量去消费消息指定偏移量就是去index文件中找index、timeindex会保存这个消息的偏移量、磁盘上所在的地址这两个都是根据log文件中得到的*/
/*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从1小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() + "|offset-" + offset);
System.out.println();
//根据消费里的timestamp确定offset
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}*/
while (true) {
/*
* poll() API 是拉取消息的长轮询 也就是说会循环1秒钟从broker中拉取消息 如果在1秒钟拉取到了消息就返回 如果没拉取到消息就返回空(null)
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
/*if (records.count() > 0) {
// 手动同步提交offset,当前线程会阻塞直到offset提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
consumer.commitSync();
// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
}
}
});
}*/
}
}
}
spring boot整合kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.yml配置文件
server:
port: 8080
spring:
kafka:
bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094
producer: # 生产者
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
发送者代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public void send() {
kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
}
}
消费者代码
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyConsumer {
/**
* @KafkaListener(groupId = "testGroup", topicPartitions = {
* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
* @TopicPartition(topic = "topic2", partitions = "0",
* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
* },concurrency = "6")
* //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
* @param record
*/
@KafkaListener(topics = "my-replicated-topic",groupId = "zhugeGroup")
public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//手动提交offset
ack.acknowledge();
}
/*//配置多个消费组
@KafkaListener(topics = "my-replicated-topic",groupId = "tulingGroup")
public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
ack.acknowledge();
}*/
}
kafka的总控制器
kafka集群中一个或者多个broker 在这之间会有一个被选举为控制器(跟leader一样为了好区分这里叫控制器) 它负责管理整个集群所有主题分区的状态(就是这个分区是不是leader或者follower)控制器的选举方式就是当kafka启动的时候会给zookeeper发送create 创建一个临时节点controller 谁先创建成功这个节点谁就是控制器 如果使用脚本同时启动多台kafka 这多台kafka机器就会同时发送create这个命令 但是zookeeper执行命令的时候是一个一个的执行 也就是说第一个创建了controller这个节点后面的都会创建失败 这也就选出了控制器 如果控制器挂了会利用zookeeper监听机制发送给其它机器的kafka然后重新创建controller节点
控制器监听的节点 控制器会监听broker的变化 /brokers/ids/这层的节点也都是临时节点根据zookeeper监听机制处理相关逻辑 监听topic相关的变化。为Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化;为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作。 从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。对于所有topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化。 更新集群的元数据信息,同步到其他普通的broker节点中
根据上面说的控制器监听的节点 当分区中的leader挂了 上面的ids也就会感知到 ids下面都是临时节点 节点挂了controller就会感知到 就可以得到这个节点的一些信息 这些信息就包含 这个节点是哪些分区的leader然后 对应的leader分区就会从isr列表中获取第一个 让它升级为leader 为什么是第一个?第一个可以说它最先跟leader数据同步完的节点 也就是说这个节点跟leader节点通信、传输效率最高让它当leader很明显是最好的 如果isr列表中的节点都挂了它会阻塞客户端 等待节点重新启动 也可以不等待unclean.leader.election.enable=false通过这个参数设置 为false等待 为true不等待 不等待就会丢失数据 这里还有一个消费者Rebalance分区分配策略这个东西 后续会写 副本进入ISR列表有两个条件: 副本节点不能产生分区,必须能与zookeeper保持会话以及跟leader副本网络连通 副本能复制leader上的所有写操作,并且不能落后太多。(与leader副本同步滞后的副本,是由 replica.lag.time.max.ms 配置决定的,超过这个时间都没有跟leader同步过的一次的副本会被移出ISR列表)
消费者消费消息的偏移量(offset)记录机制
查看kafka用来保存topic的目录下会有许多以__consumer_offsets开头的文件这个文件就是用来保存偏移量 默认50个 __consumer_offsets可能会接收高并发的请求kafka默认给它分配了50个分区 当消费者提交了偏移量就会往__consumer_offsets开头的文件中存储一个key是consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic里的消息,最后就保留最新的那条数据
通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区 公式:hash(consumerGroupId) % __consumer_offsets主题的分区数
|