2.3 SpringKafka消费者
2.3.1 Kafka消息监听器MessageListener
之前已经介绍了通过kafka-topic-consumer.sh 和kafka-tool 工具来消费数据。下面介绍SpringKafka消费数据的方式——kafka消息监听器。
Kafka的消息监听一般可以分为:1.单条数据监听;2.批量数据监听。GenericMessageListener 是SpringKafka 的消息监听器接口,也是一个函数式接口,利用接口的onMessage 方法可以实现消费数据。
public interface GenericMessageListener<T> {
void onMessage(T data);
default void onMessage(T data, @Nullable Acknowledgment acknowledgment) {
throw new UnsupportedOperationException("Container should never call this");
}
default void onMessage(T data, Consumer<?, ?> consumer) {
throw new UnsupportedOperationException("Container should never call this");
}
default void onMessage(T data, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
throw new UnsupportedOperationException("Container should never call this");
}
}
基于此接口可以实现单条数据消息监听器接口MessageListenen 、多条数据消息监听器接口BatchMessageListener 、带ACK机制的消息监听器AcknowledgingMessageListener 和BatchAcknowledgingMessageListener
MessageListener
GenericMessageListener
单条数据监听器
BatchMessageListener
批量数据监听器
AckowledgingMessageListenenr
带ACK的单条数据监听器
BatchAckowledgingMessageListener
带ACK机制的批量数据监听器
2.3.2 消息监听容器与容器工厂
消息监听器MessageListener 是由消费监听器容器MessageListenerContainer 接口来承载,使用setupMessageListenner() 方法启动一个监听器。其中还有定义了操作消息的resume() 、pause() 等方法。
public interface MessageListenerContainer extends SmartLifecycle {
void setupMessageListener(Object messageListener);
Map<String, Map<MetricName, ? extends Metric>> metrics();
}
spring-kafka提供了两个容器KafkaMessageListenerContainer 和ConcurrentMessageListenerContainer 。
GenericMessageListenerContainer
MessageListenerContainer
AbstractMessageListenerContainer
KafkaMessageListenerConatiner
ConcurrentMessageListenerCOntainer
消息监听器容器由容器工厂KafkaListenerContainerFactory 统一创建并管理
public interface KafkaListenerContainerFactory<C extends MessageListenerContainer> {
C createListenerContainer(KafkaListenerEndpoint endpoint);
C createContainer(TopicPartitionOffset... topicPartitions);
C createContainer(String... topics);
C createContainer(Pattern topicPattern);
}
spring-kafka提供了监听器容器工厂ConcurrentKafkaListenerContainerFactory ,其有两个重要的配置
ContainerProperties 和ConsumerFactory
AbstractKafkaListenerContainerFactory
KafkaListenerContainerFactory
ConcurrentKafkaListenerContainerFactory
ContainerProperties 定义了要消费消息的topic ,消息处理的MessageListener 等信息。
因此要实现一个消息监听器的流程如下:
KafkaProperties
ConsumerFactory
ContainerProperties
ContainerFactory
ListenerContainer
MessageListener
2.3.3 非注解式消费监听器
SpringKafka的消费者是由一个消费监听器容器ListenerConatiner 去承载的,容器对应一个配置文件为ContainerProperties ,ContainerProperties 继承自消费者配置类ConsumerProperties ,并且承载了消息监听器的设置
ContainerProperties
ConsumerProperties
首先介绍非注解式的消息监听器,类似于ProducerFactory ,消费者需要创建一个ConsumerFactory
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
然后建立监听器容器工厂ConcurrentKafkaListenerContainerFactory
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
有了容器工厂之后,就可以通过注册bean 的方式生成一个MessageListenerContainer
@Bean
public KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer(
ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProperties = new ContainerProperties("numb");
containerProperties.setMessageListener(
(MessageListener<String, String>) data -> System.out.println("收到消息: " + data.value()));
return new KafkaMessageListenerContainer(consumerFactory, containerProperties);
}
在这个kafkaMessageListenerContainer 中,通过ContainerProperties 配置了消费的topic和messageListener。之后启动项目后,spring会将kafkaMessageListenerContainer 注册到ConcurrentKafkaListenerContainerFactory 中,这样获取到数据后会自动调用消息监听器进行数据处理。
测试消费者消费数据
@Test
public void test_send_and_consume() {
ExecutorService threadPool = Executors.newCachedThreadPool();
threadPool.submit(() -> {
while (true) {
kafkaTemplate.send(KafkaConsts.TOPIC_TEST, UUID.randomUUID().toString(), "kv");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发送完成");
}
});
while (true);
}
输出:
发送完成
发送成功
收到消息: kv
发送完成
发送成功
收到消息: kv
2.3.4 注解式消费监听器@KafkaListener
之前配置了容器监听器工厂ConcurrentKafkaListenerContainerFactory 之后,还需要用代码配置MessageListenerContainer , 指定消费的topic、消息监听器处理等。其实上面这步完全可以通过注解@KafkaListener 实现。
@Component
@Slf4j
public class MessageHandler {
@KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "kafkaListenerContainerFactory", id = "consumer_numb"
)
public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
try {
String message = (String) record.value();
log.info("收到消息: {}", message);
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
acknowledgment.acknowledge();
}
}
}
@KafkaListener的主要属性
-
id:监听器的id -
groupId:消费组id -
idIsGroup:是否用id作为groupId,如果置为false,并指定groupId时,消费组ID使用groupId;否则默认为true,会使用监听器的id作为groupId -
topics:指定要监听哪些topic(与topicPattern、topicPartitions 三选一) -
topicPattern: 匹配Topic进行监听(与topics、topicPartitions 三选一) -
topicPartitions: 显式分区分配,可以为监听器配置明确的主题和分区(以及可选的初始偏移量)
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
-
containerFactory:指定监听器容器工厂 -
errorHandler: 监听异常处理器,配置BeanName -
beanRef:真实监听容器的BeanName,需要在 BeanName前加 “__” -
clientIdPrefix:消费者Id前缀 -
concurrency: 覆盖容器工厂containerFactory的并发配置
|