Kafka 是一个分布式、高吞吐量、可持久性和自动负载均衡的消息队列。它在实现了传统意义上的 MQ 功能的同时,也可以作为大数据的流处理平台。
简单来说,Kafka 就是一个高吞吐量的分布式发布订阅消息系统。
Kafka 的用法跟 RabbitMQ 用法相同,都是作为一个消息中间件收发消息,下面介绍的是 Springboot 微服务集成 Kafka,已经简单的用法说明。
依赖
Spring 有专门支持 Kafka 的依赖,引入 Spring 对应版本支持的 Kafka 依赖即可,如下
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置
spring:
kafka:
bootstrap-servers: ${KAFKA_HOST:192.168.0.105:9092}
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: consumer-group-default
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
默认 value-serializer 使用 org.apache.kafka.common.serialization.StringSerializer ,只支持文本消息。自定义 org.springframework.kafka.support.serializer.JsonSerializer 可以让消息支持其他类型。
使用示例
新建消息实体类
public class Message {
private Long id;
private String content;
private Date sendTime;
}
消息生产者控制器
@RestController
@RequestMapping("/kafka/producer")
public class KafkaProducerController {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerController.class);
private static final String TOPIC = "topic-test";
private KafkaTemplate kafkaTemplate;
public KafkaProducerController(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostMapping("/push")
public ResponseEntity<String> pushMessage(@RequestBody Message message) {
Date time = new Date();
message.setSendTime(time);
kafkaTemplate.send(TOPIC, JSON.toJSONString(message)).addCallback(success
-> LOGGER.info("{}-生产者发送消息成功:{},时间:{}", TOPIC, success, time), failure
-> LOGGER.error("{}-生产者发送消息失败:{}", failure.getMessage()));
return new ResponseEntity<>("success", HttpStatus.OK);
}
}
消息消费者监听
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private static final String TOPIC = "topic-test";
@KafkaListener(topics = {TOPIC})
public void testConsumer(String body) {
LOGGER.info("消费时间: {}", new Date());
Message message = JSON.parseObject(body, Message.class);
LOGGER.info("topic: {}, 消费消息内容: {}", TOPIC, message);
}
}
上边的示例是生产者发送消息到 topic-test,消费者以默认组 consumer-group-default 身份监听 topic-test 消费消息,监听器用 @KafkaListener 注解,topics 属性表示监听的topic,支持同时监听多个,用英文逗号分隔,如果需要使用指定组身份消费消息,可通过注解中的 groupId 属性指定。
|