IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SpringBoot - Kafka的集成与使用详解 (消费者) -> 正文阅读

[大数据]SpringBoot - Kafka的集成与使用详解 (消费者)

六、消费者1:指定 topic、partition、offset

1,使用 topics 指定 topic
(1)监听器主要是使用 @KafkaListenter 注解即可,而通过 topics 参数设置监听的 topic(可监听多个,用逗号隔开):
其他参数介绍:id(消费者 ID)、 groupId(消费组 ID)

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "consumer1",groupId = "my-group1", topics = {"topic1","topic2"})
    public void listen1(String data) {
        System.out.println(data);
    }
}

(2)下面分别往这两个 topic 的两个分区发送消息,可以看到消费者这边都能够收到:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic1", 0, "key1", "message1");
        kafkaTemplate.send("topic1", 1, "key2", "message2");
        kafkaTemplate.send("topic2", 0, "key3", "message3");
        kafkaTemplate.send("topic2", 1, "key4", "message4");
    }
}

2,使用 topicPartitions 指定 topic、parition、offset

(1)topicPartitions 可配置更加详细的监听信息,比如下面代码同样是同时监听 topic1 和 topic2,不同在于这次:
监听 topic1 的 0 号分区
监听 topic2 的 0 号和 1 号分区(其中 1 号分区的初始偏移量为 100)

注意:topics 和 topicPartitions 不能同时使用。

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "consumer1",groupId = "my-group1",topicPartitions = {
            @TopicPartition(topic = "topic1", partitions = { "0" }),
            @TopicPartition(topic = "topic2", partitions = "0",
                    partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })
    public void listen1(String data) {
        System.out.println(data);
    }
}

(2)下面分别往这两个 topic 的两个分区发送消息,可以看到消费者这边只会收到指定的消息:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic1", 0, "key1", "message1");
        kafkaTemplate.send("topic1", 1, "key2", "message2");
        kafkaTemplate.send("topic2", 0, "key3", "message3");
        kafkaTemplate.send("topic2", 1, "key4", "message4");
    }
}

之前的样例中消费者这边都直接获取消息内容并使用,如果我们还想要获取分区信息、消息头等其他内容的话,有如下两种方式。

七、消费者2:获取消息头和消息体

1,使用 ConsumerRecord 类方式

(1)使用 ConsumerRecord 类接收有一定的好处,ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(ConsumerRecord<String, Object> record) {
        //把ConsumerRecord里面所包含的内容打印到控制台中
        System.out.println(record);
    }
}

(2)这里我们简单的发送一条消息:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic3", 0, System.currentTimeMillis(), "key1", "hangge.com");
    }
}

(3)可以看到控制台输出的内容如下:

在这里插入图片描述

2,使用注解方式获取

(1)如果我们监听方法需要获取该消息非常多的字段时,也可以通过如下这种注解的方式:

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen2(@Payload String data,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
        System.out.println(data);
        System.out.println(topic);
        System.out.println(partition);
        System.out.println(key);
        System.out.println(ts);
    }
}

(2)这里我们简单的发送一条消息:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic3", 0, System.currentTimeMillis(), "key1", "hangge.com");
    }
}

由于 Kafka 的写性能非常高,因此项目经常会碰到 Kafka 消息队列拥堵的情况。遇到这种情况,我们可以通过并发消费、批量消费的方法进行解决。

八、消费者3:并发、批量消费

1,批量消费

(1)首先我们在项目 application.properties 文件中添加如下配置,一个设置启用批量消费,一个设置批量消费每次最多消费多少条消息记录。

注意:这里设置 max-poll-records 是 5,并不是说如果没有达到 5 条消息,我们就一直等待。而是说一次 poll 最多返回的记录数为 5。

# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=5

(2)接着对消费者监听这边代码稍作修改,改成使用 List 来接收:

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(List<String> data) {
        System.out.println("收到"+ data.size() + "条消息:");
        System.out.println(data);
    }
}
  • 如果使用 ConsumerRecord 类接收,则也是使用 List 来接收:
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(List<ConsumerRecord<String, Object>> records) {
        System.out.println("收到"+ records.size() + "条消息:");
        System.out.println(records);
    }
  • 如果使用注解方式获取消息头、消息体,则也是使用 List 来接收:
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen2(@Payload List<String> data,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> keys,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> tss) {
        System.out.println("收到"+ data.size() + "条消息:");
        System.out.println(data);
        System.out.println(topics);
        System.out.println(partitions);
        System.out.println(keys);
        System.out.println(tss);
    }
}

(3)我们一次性发送的 23 条数据测试一下:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        for (int i = 0; i < 23; i++) {
            kafkaTemplate.send("topic3", "message-" + i);
        }
    }
}

2,并发消费

(1)为了加快消费,我们可以提高并发数,比如下面配置我们将并发设置为 3:
注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。

# 并发数设为3
spring.kafka.listener.concurrency=3

(2)配置完毕后,消费者监听这边不需要修改:

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(String data) {
        System.out.println(data);
    }
}
  • 并发消费和批量消费可以结合同时使用的,消费者监听使用 List 来接收:
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(List<String> data) {
        System.out.println("收到"+ data.size() + "条消息:");
        System.out.println(data);
    }
}

(3)上面我们设置 concurrency 为 3,也就是将会启动 3 条线程进行监听。而由于我们创建的 topic 有 4 个 partition(分区),意味着将有 2 条线程都是分配到 1 个 partition,还有 1 条线程分配到 2 个 partition。我们可以通过日志看到每条线程分配到的 partition。

在这里插入图片描述

通常来说 KafkaListener 要做的事只是监听 Topic 中的数据并消费,如果在 KafkaListener 中还需要对异常进行 try catch 捕获并处理的话,则会显得代码块非常臃肿不利于维护。
? 好在 spring-kafka 为我们提供了专门的异常处理器(ConsumerAwareListenerErrorHandler),通过异常处理器,我们可以处理 consumer 在消费时发生的异常。

九、消费者4:异常处理器

1,注册异常处理器

(1)注册一个异常处理器就是新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,使用 @Bean 注入(BeanName 默认就是方法名):

@Configuration
public class KafkaInitialConfiguration {
 
    //异常处理器
    @Bean
    public ConsumerAwareListenerErrorHandler myConsumerAwareErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {
            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException exception,
                                      Consumer<?, ?> consumer) {
                System.out.println("--- 发生消费异常 ---");
                System.out.println(message.getPayload());
                System.out.println(exception);
                return null;
            }
        };
    }
}

(2)当然我们也可以使用下面这种 lambda 表达式写法,简化代码:

@Configuration
public class KafkaInitialConfiguration {
 
    //异常处理器
    @Bean
    public ConsumerAwareListenerErrorHandler myConsumerAwareErrorHandler() {
        return (message, exception, consumer) -> {
            System.out.println("--- 发生消费异常 ---");
            System.out.println(message.getPayload());
            System.out.println(exception);
            return null;
        };
    }
}

2,使用异常处理器

? 异常处理器注册好之后,将这个异常处理器的 BeanName 放到 @KafkaListener 注解的 errorHandler 属性里面,当监听抛出异常的时候,则会自动调用异常处理器。

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"}, errorHandler = "myConsumerAwareErrorHandler")
    public void listen1(String data) throws Exception {
        throw new Exception("模拟一个异常");
    }
}

3,开始测试

(1)编写测试方法,发送一条消息:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic3", "hangge.com");
    }
}

(2)可以看到异常处理器已经能正常使用了:

附:批量消费异常处理器

? 批量消费异常处理器和之前单消息消费异常处理器差不多,上面异常处理器代码可以完全不用改动直接使用,只不过传递过来的数据都是 List 集合方式:

消息过滤器可以让消息在抵达监听容器前被拦截,过滤器根据系统业务逻辑去筛选出需要的数据交由 KafkaListener 处理,不需要的消息则会过滤掉。

十、消费者5:消息过滤器

1,配置消息过滤器

? 配置消息过滤器十分简单,只需要为监听容器工厂配置一个 RecordFilterStrategy(消息过滤策略),返回 true 的时候消息将会被抛弃,返回 false 时,消息则能正常抵达监听容器。

@Configuration
public class KafkaInitialConfiguration {
 
    // 监听器工厂
    @Autowired
    private ConsumerFactory consumerFactory;
 
    // 配置一个消息过滤策略
    @Bean
    public ConcurrentKafkaListenerContainerFactory myFilterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory =
                new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        // 消息过滤策略(将消息转换为long类型,判断是奇数还是偶数,把所有奇数过滤,监听器只接收偶数)
        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                long data = Long.parseLong((String) consumerRecord.value());
                if (data % 2 == 0) {
                    return false;
                }
                //返回true将会被丢弃
                return true;
            }
        });
        return factory;
    }
}

2,使用消息过滤器

? 带有消息过滤策略的容器工厂注册好之后,将这个容器工厂的 BeanName 放到 @KafkaListener 注解的 containerFactory 属性里面。这样消息在抵达监听器之前,会先进行过滤。

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"}, containerFactory = "myFilterContainerFactory")
    public void listen1(String data) {
        System.out.println(data);
    }
}

3,开始测试
(1)编写测试方法,连续发送 5 条消息(消息内容分别是 0 到 4 这个 5 个数字):

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        for (int i = 0; i < 5; i++) {
            kafkaTemplate.send("topic3", String.valueOf(i));
        }
    }
}

在实际开发中,我们常常需要使用转发功能实现业务解耦。比如:应用 A 从 topic1 获取到消息,经过处理后转发到 topic2。应用 B 监听 topic2 获取消息再次进行处理。

十一、消费者6:消息转发

(1)Spring-Kafka 只需要通过一个 @SendTo 注解即可以实现消息的转发,被注解方法的 return 值即转发的消息内容:

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic1"})
    @SendTo("topic2")
    public String listen1(String data) {
        System.out.println("业务A收到消息:" + data);
        return data + "(已处理)";
    }
 
    // 消费监听
    @KafkaListener(topics = {"topic2"})
    public void listen2(String data) {
        System.out.println("业务B收到消息:" + data);
    }
}

(2)编写测试方法,发送 1 条消息:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic1", "1条测试消息");
    }
}
默认情况下,当项目启动时,监听器就开始工作(监听消费发送到指定 topic 的消息)。如果我们不想让监听器立即工作,想在程序运行的过程中能够动态地开启、关闭监听器,可以借助 KafkaListenerEndpointRegistry 实现,下面通过样例进行演示。

十二、消费者7:动态启动、停止监听器

1,动态地开启、关闭监听器

(1)消费者这边代码没有什么特别的,主要是设置了个消费者 ID(监听器 ID),后面要根据这个 ID 来开启、关闭监听:

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "myListener1", topics = {"topic1"})
    public void listen1(String data) {
        System.out.println("监听器收到消息:" + data);
    }
}

(2)然后我定义两个 controller 接口分别通过 KafkaListenerEndpointRegistry 来控制监听器的开启、关闭:

注意:
KafkaListenerEndpointRegistry 在 SpringIO 中已经被注册为 Bean,直接注入使用即可。
还需要注意一下启动监听容器的方法,resume 是恢复的意思不是启动的意思。所以我们需要判断容器是否运行,如果运行则调用 resume 方法,否则调用 start 方法。
@RestController
public class KafkaProducer {
 
    @Autowired
    private KafkaListenerEndpointRegistry registry;
 
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        System.out.println("监听器发送消息!");
        kafkaTemplate.send("topic1", "1条测试消息");
    }
 
    // 开启监听
    @GetMapping("/start")
    public void start() {
        System.out.println("开启监听");
        //判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("myListener1").isRunning()) {
            registry.getListenerContainer("myListener1").start();
        }
        registry.getListenerContainer("myListener1").resume();
    }
 
    // 关闭监听
    @GetMapping("/stop")
    public void stop() {
        System.out.println("关闭监听");
        //判断监听容器是否启动,未启动则将其启动
        registry.getListenerContainer("myListener1").pause();
    }
}

(3)开始测试一下:

  • 启动项目,调用 /test 接口发送一条消息,监听器成功接收到消息。

  • 调用 /stop 接口关闭监听,再次发送一条消息,监听器不会收到消息。

  • 调用 /start 接口开启监听,监听器收到之前发送的消息。

  • 由于监听已经开启,再次发送一条消息,监听器成功接收到消息。

2,禁止监听器自启动

(1)默认情况下,当项目启动的时候,监听器就开始工作。如果想要禁止监听器自启动,首先我们定义一个不自动启动的监听容器工厂:

@Configuration
public class KafkaInitialConfiguration {
 
    // 监听器工厂
    @Autowired
    private ConsumerFactory consumerFactory;
 
    // 监听器容器工厂(设置禁止KafkaListener自启动)
    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container =
                new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止自动启动
        container.setAutoStartup(false);
        return container;
    }
}

(2)然后将这个容器工厂的 BeanName 放到 @KafkaListener 注解的 containerFactory 属性里面。这样该监听器就不会自动启动了。

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "myListener1", topics = {"topic1"},
            containerFactory = "delayContainerFactory")
    public void listen1(String data) {
        System.out.println("监听器收到消息:" + data);
    }
}

附:定时启动、停止监听器

? 有时我们需要监听器在我们指定的时间点开始工作,或者在我们指定的时间点停止工作。比如:项目需要利用 Kafka 做数据持久化的功能,由于用户活跃的时间为早上 10 点至晚上 12 点,那在这个时间段做一个大数据量的持久化可能会影响数据库性能导致用户体验降低,我们可以选择在用户活跃度低的时间段去做持久化的操作,也就是晚上 12 点后到第二条的早上 10 点前。这个可以结合定时任务来实现。

(1)首先在项目启动类上添加 @EnableScheduling 注解开启定时任务:

@SpringBootApplication
@EnableScheduling
public class KtestApplication {
    public static void main(String[] args) {
        SpringApplication.run(KtestApplication.class, args);
    }
}

(2)下面代码我们创建两个定时任务,一个用来在指定时间点启动 myListener1 这个监听器,另一个在指定时间点停止 myListener1 这个监听器:

@Component
public class MyKafkaSchedule {
    @Autowired
    private KafkaListenerEndpointRegistry registry;
 
    //定时器,每天凌晨0点开启监听
    @Scheduled(cron = "0 0 0 * * ?")
    public void startListener() {
        System.out.println("开启监听");
        //判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("myListener1").isRunning()) {
            registry.getListenerContainer("myListener1").start();
        }
        registry.getListenerContainer("myListener1").resume();
    }
 
    //定时器,每天早上10点关闭监听
    @Scheduled(cron = "0 0 10 * * ?")
    public void shutDownListener() {
        System.out.println("关闭监听");
        registry.getListenerContainer("myListener1").pause();
    }
}

(3)最后记得禁止这个监听器自启动(关于禁止自动启动的监听容器工厂见文章第 2 点内容):

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "myListener1", topics = {"topic1"},
            containerFactory = "delayContainerFactory")
    public void listen1(String data) {
        System.out.println("监听器收到消息:" + data);
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-21 15:32:29  更:2021-08-21 15:34:21 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 19:06:14-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码