1.pom依赖:
????????springboot与kafka版本对应关系:https://spring.io/projects/spring-kafka#overview
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.yml配置:
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=192.168.33.128:9092,192.168.33.128:9093,192.168.33.128:9094
###########【初始化生产者配置】###########
# 重试次数。
spring.kafka.producer.retries=3
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
spring.kafka.producer.acks=-1
# 批量发送的消息数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# 提交延时
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
spring.kafka.producer.properties.linger.ms=0
# 生产端缓冲区大小 32MBproduce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
#spring.kafka.producer.properties.partitioner.class=com.springboot.kafka.config.CustomizePartitioner
###########【初始化消费者配置】###########
# 默认的消费组ID(此处name和消费端注解名称必须一致)
spring.kafka.consumer.properties.group.id=zlx-group
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=false
# enable_auto_commit=false时才生效,有以下几种:
# record 每处理一条commit一次
# batch (默认)每次poll的时候批量提交一次,频率取决于每次poll的调用频率
# time 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
# count 累积达到ackCount次的ack去commit
# count_time ackTime或ackCount哪个条件先满足,就commit
# manual listener负责ack,但是背后也是批量上去
# manual_immediate listner负责ack,每调用一次,就立即commit
spring.kafka.listener.ack-mode=manual_immediate
# 提交offset延时(接收到消息后多久提交offset,enable-auto-commit=true才会生效)
spring.kafka.consumer.auto-commit-interval=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=earliest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
###########【批量配置】###########
# 设置批量消费
#spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
#spring.kafka.consumer.max-poll-records=50
3.Kafka配置类:
- 定义主题topic
- 自定义消费端异常处理
/**
* @Description 配置类
* @Date 2021/7/5 13:55
* @Created by zlx
*/
@Configuration
public class KafkaConfig {
/**
* @description: 创建Topic并设置分区数为3,分区副本数为2,如果要修改分区数,只需修改配置值重启项目即可,修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
* @author: zlx
* @time: 2021/7/5 14:23
*/
@Bean
public NewTopic createTopic1() {
return new NewTopic(KafkaConstant.TOPIC_ONE_NAME, KafkaConstant.NUM_PARTITIONS, KafkaConstant.REPLICATION);
}
/**
* @description: 自定义消费端异常处理器,只有消费端出现异常且未catch才会触发
* @author: zlx
* @time: 2021/7/5 14:32
*/
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return (message, exception, consumer) -> {
System.out.println("进入异常。。。。。消费异常:" + message.getPayload());
//do something
return null;
};
}
}
4.消息生产者:
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* @param topic 主题
* @param partition 分区
* @param message 消息体
* @description: 发送消息。addCallback回调方法,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理
* @author: zlx
* @time: 2021/7/5 14:18
*/
public void sendMessage(String topic, Integer partition, String message) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, partition, null, message);
kafkaTemplate.send(producerRecord)
.addCallback(result -> {
if (result != null && null != result.getRecordMetadata()) {
System.out.println("消费发送成功,消息发送到的topic:" + result.getRecordMetadata().topic() +
",消息发送到的分区:" + result.getRecordMetadata().partition() +
",消息在分区内的offset:" + result.getRecordMetadata().offset());
}
}, throwable -> {
//可做补偿机制
System.out.println("消费发送失败:" + throwable.getMessage());
});
}
}
5.消息消费者:
@Component
public class KafkaConsumer {
//单条接收消息,
@KafkaListener(groupId = KafkaConstant.GROUP_NAME, topics = KafkaConstant.TOPIC_ONE_NAME, errorHandler = "consumerAwareErrorHandler")
public void onMessage1(ConsumerRecord<String, String> record, Acknowledgment ack) {
System.out.println("topic:" + record.topic() + "|partition:" + record.partition() + "|offset:" + record.offset() + "|value:" + record.value());
ack.acknowledge();
}
//批量接收消息时用List来接收,对应配置文件最后两个
/*@KafkaListener(groupId = KafkaConstant.GROUP_NAME, topics = KafkaConstant.TOPIC_ONE_NAME)
public void onMessage2(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
System.out.println(">>>批量消费一次,records.size()=" + records.size());
for (ConsumerRecord<?, ?> record : records) {
System.out.println(record.value());
}
ack.acknowledge();
System.out.println("-------------------------------------");
}*/
}
|