本项目demo基于SpringBoot 2.6.3版本。
1、引入kafka依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、配置kafka
3、创建生产者
这里将生产者放在controller层,准备用浏览器触发来生产消息。
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public String sendMessage(@RequestParam("msg") String msg) {
kafkaTemplate.send("topic-demo", msg);
return String.format("消息 %s 发送成功!", msg);
}
}
4、创建消费者
spring-kafka这个依赖提供了 @KafkaListener 这个注解来监听topic的消息,该注解有很多属性,其中topics属性和groupId属性是必须要配置的。
topics:顾名思义,表示需要配置主题。而且该属性支持配置多个topic,写法是字符串数组的形式。例:topics = {"topic-demo", "topic-demo2"}
groupId:可自定义。自定义的配置文件见这里:?kafka消费者的group id从哪里获取
@Component
public class MyConsumer {
@KafkaListener(topics = "topic-demo", groupId = "test-consumer-group")
public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
System.out.println("=========开始消费消息=========");
System.out.println("record:" + record);
String value = record.value();
System.out.println("message:" + value);
ack.acknowledge();
System.out.println("=========消费完成=========");
}
}
5、生产到消费的效果演示
浏览器发送请求,投递了一条消息。
控制台立刻拉取了该条消息并消费完成。?
?6、SpringBoot与Kafka版本对应
参考Spring官网:Spring for Apache Kafka
|