一、Spring Boot整合Kafka
创建 SpringBoot项目,引入 kafka依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version>
</dependency>
1、yaml配置文件
在配置文件中,配置Kafka服务信息。
spring:
application:
name: kafka-springboot
kafka:
bootstrap-servers: 192.168.xxx.xxx:9092
producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: MANUAL_IMMEDIATE
配置参数不了解的,请查看之前文章,或者查看官方文档。
这里主要说明一下 listener配置参数:
- RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
- BATCH:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
- TIME:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
- COUNT:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
- COUNT_TIME:TIME | COUNT 有一个条件满足时提交
- MANUAL:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
- MANUAL_IMMEDIATE:手动调用Acknowledgment.acknowledge()后立即提交
作用就是影响消费者监听器(ListenerConsumer)处理之后提交动作。一般我们选用手动提交。
2、生产者
使用 KafkaTemplate类的 send方法 发送消费。
2.1 分区发送
@Service
public class Producer {
private final static String TOPIC_NAME = "my-springboot-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMsg(int orderId) {
Order order = new Order(orderId, "订单-" + orderId, 1000.00);
kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
}
}
2.2 同步发送
public void syncSendMsg(int orderId) {
Order order = new Order(orderId, "订单-" + orderId, 1000.00);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
SendResult<String, String> sendResult = null;
try {
sendResult = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
RecordMetadata metadata = sendResult.getRecordMetadata();
ProducerRecord<String, String> producerRecord = sendResult.getProducerRecord();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
}
2.3 异步发送
1)首先需要自定义一个生产者监听器类
自定义生产者发送结果监听类,需要实现 ProducerListener类 。
- 发送消息成功则会回调用 onSuccess方法,
- 发送消息失败则会回调用 onError方法。
@Component
public class MyKafkaProducerSendResultListener implements ProducerListener {
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
System.out.println("消息发送成功 : " + producerRecord.toString());
}
@Override
public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
System.out.println("消息发送成功,exception=" + exception.getMessage());
}
}
2)添加生产者监听器类之后在发送消息
@Autowired
private MyKafkaProducerSendResultListener myKafkaProducerSendResultListener;
public void asyncSendMsg(int orderId) {
Order order = new Order(orderId, "订单-" + orderId, 1000.00);
kafkaTemplate.setProducerListener(myKafkaProducerSendResultListener);
kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
}
}
3、消费者
使用 @KafkaListener注解 来注入消费者。常见参数如下:
@KafkaListener(
groupId = "mySpringBootGroup",
topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(
topic = "topic2",
partitions = "0",
partitionOffsets = @PartitionOffset(
partition = "1",
initialOffset = "100"))
},
concurrency = "6"
)
- group-id:表示消费组,如果没有指定,则会使用配置文件中设置的默认的groupId。
- topicPartitions:一个消费组可以消费多个主题分区
- TopicPartition:主题分区相关
- concurrency:同组下的消费者个数,必须小于等于分区总数,大于意义不大,没必要大于。
3.1 多消费组消费主题
如果一个主题要被多个消费组消费,那么我们使用 @KafkaListener注解来注入多个消费者即可。
@Component
public class MyConsumer {
@KafkaListener(topics = "my-springboot-topic",groupId = "mySpringBootGroup")
public void listenConsumerGroup1(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.printf("ConsumerGroup1收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), value);
ack.acknowledge();
}
@KafkaListener(topics = "my-springboot-topic",groupId = "mySpringBootGroup2")
public void listenConsumerGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.printf("ConsumerGroup2收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), value);
ack.acknowledge();
}
}
4、启动项目
启动项目,从打印信息看出,每个消费者都会输出其配置信息。
单元测试ok:
有了之前 Kafka客户端API的使用,Spring Boot整合Kafka使用就更加简单了。
传送门(Kafka客户端API):https://blog.csdn.net/qq_42402854/article/details/124994563
– 求知若饥,虚心若愚。
|