IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ TTL延迟队列 -> 正文阅读

[大数据]RabbitMQ TTL延迟队列

流程图设计

????????

?创建配置类进行绑定

/**
 * TTL 队列 配置文件类
 */
@Configuration
public class TtlQueueConfig {

    /**
     * 普通交换机的名称
     */
    private static final String X_EXCHANGE = "X";
    /**
     * 死信交换机的名称
     */
    private static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    /**
     * 普通队列的名称
     */
    private static final String QUEUE_A = "QA";
    private static final String QUEUE_B = "QB";
    /**
     * 死信队列的名称
     */
    private static final String DEAD_LETTER_QUEUE = "QD";

    /**
     * 声明X交换机
     */
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    /**
     * 声明Y交换机
     */
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明队列 QA 绑定死信交换机Y routingKey=YD  TTL 过期时间10S
     */
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        //设置TTL 单位是ms
        arguments.put("x-message-ttl", 10 * 1000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }

    /**
     * 声明队列 QB 绑定死信交换机Y routingKey=YD TTL 过期时间40S
     */
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        //设置TTL 单位是ms
        arguments.put("x-message-ttl", 40 * 1000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }

    /**
     * 死信队列 QD
     */
    @Bean("queueD")
    public Queue queueD() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    /**
     * 队列绑定 QA和X交换机绑定 routingKey=XA
     */
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    /**
     * 队列绑定 QB和X交换机绑定 routingKey=XB
     */
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    /**
     * 队列绑定 QD和Y交换机绑定 routingKey=YD
     */
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

创建死信队列监听消费者

/**
 * 延迟队列TTL 消费者
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {

    /**
     * 监听死信队列QD 接收消息
     */
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信消息队列:{}", new Date().toString(), msg);
    }
}

创建简单的消息推送接口

/**
 * 返送延迟消息
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 消息推送接口
     *
     * @param message 推送消息
     */
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date().toString(), message);
        //进行消息推送
        rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列:" + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列:" + message);
    }

}

启动执行结果

调用接口

http://localhost:8080/ttl/sendMsg/娃哈哈

控制台打印结果

2021-07-24 15:01:40.714  INFO 38495 --- [nio-8080-exec-1] c.s.r.s.controller.SendMsgController     : 当前时间:Sat Jul 24 15:01:40 CST 2021,发送一条信息给两个TTL队列:娃哈哈
2021-07-24 15:01:50.733  INFO 38495 --- [ntContainer#0-1] c.s.r.s.c.DeadLetterQueueConsumer        : 当前时间:Sat Jul 24 15:01:50 CST 2021,收到死信消息队列:消息来自ttl为10s的队列:娃哈哈
2021-07-24 15:02:20.725  INFO 38495 --- [ntContainer#0-1] c.s.r.s.c.DeadLetterQueueConsumer        : 当前时间:Sat Jul 24 15:02:20 CST 2021,收到死信消息队列:消息来自ttl为40s的队列:娃哈哈

自定义延迟时间队列配置

   /**
     * 普通队列的名称
     */
    private static final String QUEUE_C = "QC";

    //声明QC
    @Bean("queueC")
    public Queue queueC() {
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        //无需定义过期时间
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }

    /**
     * 队列绑定 QC和X交换机绑定 routingKey=XC
     */
    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

自定义时间消息接口

/**
     * 自定义时间消息推送接口
     */
    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
        log.info("当前时间:{},发送一条时长{}毫秒的信息给一个TTL队列:{}", new Date().toString(), ttlTime, message);
        //进行消息推送
        rabbitTemplate.convertAndSend("X", "XC", "消息来自ttl为" + ttlTime + "ms的队列:" + message, msg -> {
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

启动执行结果

调用接口

http://localhost:8080/ttl/sendExpirationMsg/娃哈哈1/20000
http://localhost:8080/ttl/sendExpirationMsg/娃哈哈2/2000

控制台打印结果

2021-07-24 16:37:05.475  INFO 61239 --- [nio-8080-exec-4] c.s.r.s.controller.SendMsgController     : 当前时间:Sat Jul 24 16:37:05 CST 2021,发送一条时长20000毫秒的信息给一个TTL队列:娃哈哈1
2021-07-24 16:37:10.360  INFO 61239 --- [nio-8080-exec-5] c.s.r.s.controller.SendMsgController     : 当前时间:Sat Jul 24 16:37:10 CST 2021,发送一条时长2000毫秒的信息给一个TTL队列:娃哈哈2
2021-07-24 16:37:25.477  INFO 61239 --- [ntContainer#0-1] c.s.r.s.c.DeadLetterQueueConsumer        : 当前时间:Sat Jul 24 16:37:25 CST 2021,收到死信消息队列:消息来自ttl为20000ms的队列:娃哈哈1
2021-07-24 16:37:25.477  INFO 61239 --- [ntContainer#0-1] c.s.r.s.c.DeadLetterQueueConsumer        : 当前时间:Sat Jul 24 16:37:25 CST 2021,收到死信消息队列:消息来自ttl为2000ms的队列:娃哈哈2

存在问题

先后发送了两条消息,第一条延迟20s,第二条延迟2s,最终结果是同时到达。因为RabbitMQ只会检查第一条消息是否过期,过期就丢到死信队列进行排队,如果第一条消息延迟时间很长,那么之后的消息也不会得到优先执行。这个问题可以通过RabbitMQ插件得到解决。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-25 11:45:16  更:2021-07-25 11:46:10 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年4日历 -2024/4/30 16:59:52-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码