基本使用-简单的生产消费
项目的基本构建
新建一个 maven 项目,引入 kafka 依赖,pom 文件内容如下
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.3</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
编写配置文件 application.yml,进行 kafka 的基本配置
spring:
kafka:
bootstrap-servers: localhost:9092
listener:
missing-topics-fatal: false
consumer:
enable-auto-commit: true
auto:
commit:
intervals:
ms: 1000
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 10
properties:
session:
timeout:
ms: 120000
request:
timeout:
ms: 180000
group:
id: defaultConsumerGroup
producer:
retries: 1
acks: -1
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
linger:
ms: 0
创建启动类
@SpringBootApplication
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class);
}
}
可以编写一个配置类,创建一个名为 testTopic 的主题,并将其分区数设置为 8(当我们发送消息到主题时,Kafka 会自动创建该主题,但此时仅存在一个分区)
@Configuration
public class KafkaInitialConfiguration {
@Bean
public NewTopic initialTopic() {
return new NewTopic("testTopic", 8, (short) 2);
}
@Bean
public NewTopic updateTopic() {
return new NewTopic("testTopic", 10, (short) 2);
}
}
编写生产者 Controller 控制类模拟消息的生产,引入 KafkaTemplate
@RestController
@RequestMapping("/kafka/producer")
public class ProducerController {
@Resource
private KafkaTemplate<Object, Object> kafkaTemplate;
}
简单的生产消费
在 ProducerController 中模拟消息的生产
@GetMapping("/simpleSend/{topic}/{msg}")
public String simpleSend(@PathVariable("topic") String topic,
@PathVariable("msg") String msg) {
kafkaTemplate.send(topic, msg);
return "Kafka 生产消息成功~";
}
创建 ConsumerService 作为消费者进行消息的消费,@KafkaListener 的topics 属性指定了监听的主题,可同时监听多个,用逗号隔开即可
@KafkaListener(topics = {"testTopic"})
public void simpleGetMsg(ConsumerRecord<Object, Object> record) {
System.out.println("topic:" + record.topic());
System.out.println("partition:" + record.partition());
System.out.println("msg:" + (String) record.value());
}
使用 postman 测试结果如下 消费结果
进阶使用-生产者
带回调的生产者
@GetMapping("/callbackSend/{topic}/{msg}")
public String callbackSend(@PathVariable("topic") String topic,
@PathVariable("msg") String msg) {
try {
kafkaTemplate.send(topic, msg).addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable e) {
System.out.println("消息发送失败..." + e.getMessage());
}
@Override
public void onSuccess(SendResult<Object, Object> result) {
System.out.println("消息发送成功..." + result.getRecordMetadata().topic() + "-" +
result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
} catch (Throwable throwable) {
return "kafka 发送消息失败~";
}
return "kafka 发送消息成功~";
}
测试结果 发送消息成功回调打印信息
事务提交消息
@GetMapping("/transactionSend/{topic}/{msg}")
public String transactionSend(@PathVariable("topic") String topic,
@PathVariable("msg") String msg) {
final Boolean[] result = {true};
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<Object, Object, Object>() {
@Override
public Object doInOperations(KafkaOperations<Object, Object> kafkaOperations) {
try {
if ("HelloKafka_transaction".equals(msg)) {
throw new RuntimeException("消息异常,启动 Kafka 事务,不生产对应消息~");
}
try {
kafkaTemplate.send(topic, msg).get();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "生产消息无异常,生产成功~";
} catch (Exception e) {
e.printStackTrace();
result[0] = false;
return "生产消息有异常,生产失败~";
}
}
});
if (!result[0]) {
return "kafka 发送消息失败~";
}
return "kafka 发送消息成功~";
}
使用事务发送消息时,需要在配置文件中配置transaction-id-prefix 属性,即事务前缀,详见开篇的配置文件,打开相应的注释即可 生产异常处理,当有异常产生时,消息不会被发送,故消费者没有监听到信息
自定义分区器
Kafka 中 每一个 Topic 都可以划分成多个分区,而消息将被 append 到哪一个分区,则有对应的分区策略:
- 若我们在发送消息时制定了分区策略,则将消息按照策略 append 到相应分区;
- 若我们在发送消息时没有指定分区,但消息携带了 key,此时 kafka 将根据 key 将消息划分到对应分区,该策略将保证 key 相同的消息被 append 到同一分区;
- 若分区和 key 都没有指定,则使用 kafka 的默认分区策略,轮询得到一个分区;
编写自定义分区器
public class MyPartitioner implements Partitioner {
@Override
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
Partitioner.super.onNewBatch(topic, cluster, prevPartition);
}
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
配置好自定义的分区逻辑后,需要在配置文件中配置自定义分区策略 spring.kafka.producer.properties.partitioner.class=com.zqf.config.MyPartitioner 详见开篇配置文件末尾,将注释去掉即可
测试发现,所有消息都会被分发进入分区 0
进阶使用-消费者
指定消费者监听主题、分区、偏移量
配置@KafkaListener 的属性可指定消费者监听的主题、分区以及偏移量
@KafkaListener(id = "consumer01", groupId = "testGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = "0"),
@TopicPartition(topic = "testTopic", partitions = {"1", "3", "5", "7", "9"})
})
public void targetGetMsg1(ConsumerRecord<Object, Object> record) {
System.out.println("=======consumer01收到消息=======");
System.out.println("topic:" + record.topic());
System.out.println("partition:" + record.partition());
System.out.println("msg:" + (String) record.value());
}
@KafkaListener(id = "consumer02", groupId = "testGroup", topicPartitions = {
@TopicPartition(topic = "topic2", partitions = "0"),
@TopicPartition(topic = "testTopic", partitions = {"0", "2", "4", "6", "8"})
})
public void targetGetMsg2(ConsumerRecord<Object, Object> record) {
System.out.println("=======consumer02收到消息=======");
System.out.println("topic:" + record.topic());
System.out.println("partition:" + record.partition());
System.out.println("msg:" + (String) record.value());
}
测试结果
消费者批量消费
修改配置文件,开启批量消费模式,并指定每一次消费的消息数量
# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
详见开篇配置文件,将对应属性的注释去掉即可
编写生产者方法批量生产数据并发送
@GetMapping("/multiAccept/{topic}/{msg}")
public String multiAccept(@PathVariable("topic") String topic,
@PathVariable("msg") String msg) {
for (int i = 1; i <= 30; i++) {
kafkaTemplate.send(topic, msg + i);
}
return "Kafka 发送消息成功~";
}
消费者批量进行消息的消费
@KafkaListener(topics = {"multiAcceptTopic"})
public void multiAccept(List<ConsumerRecord<Object, Object>> records) {
System.out.println("======= Kafka 开始批量消费 =======");
System.out.println("消息数量 >>> " + records.size());
records.forEach(consumer -> {
System.out.println("msg:" + (String) consumer.value());
});
}
测试结果
消费者异常处理
消费者异常处理需要编写异常处理器,并配置到监听方法中
@Configuration
public class MyConsumerAwareErrorHandler {
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
System.out.println("消费发生异常...");
System.out.println("异常信息:" + e.getMessage());
return null;
}
};
}
}
模拟消费者消费异常
@KafkaListener(topics = {"errorHandler"}, errorHandler = "consumerAwareListenerErrorHandler")
public void errorHandlerListener(ConsumerRecord<Object, Object> record) {
throw new RuntimeException("模拟接收消息异常...");
}
测试结果
消费者消息过滤
消费者进行消息的过滤需要编写过滤器
@Component
public class MyKafkaFilter {
@Autowired
ConsumerFactory<Object, Object> consumerFactory;
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(consumerRecord -> {
if (consumerRecord.value().toString().length() < 5) {
return false;
}
System.out.println("消息不合格~被过滤丢弃~");
return true;
});
return factory;
}
}
编写消费者消费方法
@KafkaListener(topics = {"msgFilter"}, containerFactory = "listenerContainerFactory")
public void msgFilterListener(ConsumerRecord<Object, Object> record) {
System.out.println("topic:" + record.topic());
System.out.println("partition:" + record.partition());
System.out.println("msg:" + (String) record.value());
}
测试结果 根据过滤逻辑,未被过滤的消息,正常被消费 被过滤掉的消息,执行过滤逻辑,消息未被消费
消费者消息转发
消费者可以在监听到消息后,接受消息并对消息进行处理,并将处理后的消息转发到另外的主题,只需要添加注解@sendTo 即可
@KafkaListener(topics = {"sendTo"})
@SendTo("testTopic")
public String sendToListener(ConsumerRecord<Object, Object> record) {
return record.value()+"--我被处理了~";
}
进行测试,发送消息内容为:“Hi~Kafka~” 消费者进行监听,接收到消息后进行处理并转发到主题testTopic ,并被对应的消费者处理
|