IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Rabbitmq死信队列 -> 正文阅读

[大数据]Rabbitmq死信队列

Rabbitmq死信队列

消息进入死信队列的条件
1.消息到达超时时间
2.队列长度到达限制
3.消息拒绝签收,不把消息放入原队列中

direct方式

消息超时测试

1.RabbitMQ配置类,队列消息过期时间设置5秒

@Configuration
public class DirectRabbitConfig {
    //死信队列
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return new Queue("TDL_QUEUE");
    }

    //死信交换机
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("TDL_EXCHANGE").durable(true).build();
    }
    
    //绑定死信队列和死信交换机
    @Bean
    public Binding deadLetterBinding(Queue deadLetterQueue,DirectExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("TDL_KEY");
    }


    //队列
    @Bean
    public Queue directQueue() {
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 5000); // 队列中的消息未被消费则5秒后过期
        map.put("x-dead-letter-exchange", "TDL_EXCHANGE");
//       x-dead-letter-routing-key    声明 死信队列抛出异常重定向队列的routingKey(TKEY_R)
        map.put("x-dead-letter-routing-key", "TDL_KEY");
        return new Queue("directQueue", true, false, false, map);
    }
    //Direct交换机
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }
    //绑定  将队列和交换机绑定
    @Bean
    Binding bindingDirect(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001");
    }
}

2.发送消息

@Controller
public class SendController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @RequestMapping("sendMessage")
    @ResponseBody
    public String sendMessage(String message){
        rabbitTemplate.convertAndSend("directExchange", "routingkey001", message);
        return "ok";
    }
}

这里消息没有接收,5秒后超时,消息进入死信队列
在这里插入图片描述

队列长度到达限制

1.修改RabbitMQ配置类,队列长度设置为5

@Configuration
public class DirectRabbitConfig {


    //死信队列
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return new Queue("TDL_QUEUE");
    }

    //死信交换机
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("TDL_EXCHANGE").durable(true).build();
    }
    //绑定死信队列和死信交换机
    @Bean
    public Binding deadLetterBinding(Queue deadLetterQueue,DirectExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("TDL_KEY");
    }


    //队列
    @Bean
    public Queue directQueue() {
        Map<String, Object> map = new HashMap<>();
        map.put("x-max-length", 5); //最多存入5个消息
        map.put("x-dead-letter-exchange", "TDL_EXCHANGE");
//       x-dead-letter-routing-key    声明 死信队列抛出异常重定向队列的routingKey(TKEY_R)
        map.put("x-dead-letter-routing-key", "TDL_KEY");
        return new Queue("directQueue", true, false, false, map);
    }
    //Direct交换机
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }
    //绑定  将队列和交换机绑定
    @Bean
    Binding bindingDirect(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001");
    }
}

这里队列中消息数量超过限制后,多余的消息进入死信队列中
在这里插入图片描述

消费端拒绝签收,不把消息放入原有队列

发送端代码

1.RabbitMQ配置类

@Configuration
public class DirectRabbitConfig {


    //死信队列
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return new Queue("TDL_QUEUE");
    }

    //死信交换机
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("TDL_EXCHANGE").durable(true).build();
    }
    //绑定死信队列和死信交换机
    @Bean
    public Binding deadLetterBinding(Queue deadLetterQueue,DirectExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("TDL_KEY");
    }


    //队列
    @Bean
    public Queue directQueue() {
        Map<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange", "TDL_EXCHANGE");
//       x-dead-letter-routing-key    声明 死信队列抛出异常重定向队列的routingKey(TKEY_R)
        map.put("x-dead-letter-routing-key", "TDL_KEY");
        return new Queue("directQueue", true, false, false, map);
    }
    //Direct交换机
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }
    //绑定  将队列和交换机绑定
    @Bean
    Binding bindingDirect(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001");
    }
}

2.发送消息


@Controller
public class SendController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @RequestMapping("sendMessage")
    @ResponseBody
    public String sendMessage(String message){
        Message message1 = new Message(message.getBytes());
        rabbitTemplate.convertAndSend("directExchange", "routingkey001", message1);
        return "ok";
    }

}

接收端代码
1.配置文件,启动手动签收

spring:
  application:
    name: rabbitmq-consumer
  rabbitmq:
    host: 47.110.157.82
    port: 5672
    username: liu
    password: 123456
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual

2.RabbitMQ配置类

@Configuration
public class DirectRabbitConfig {


    //死信队列
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return new Queue("TDL_QUEUE");
    }

    //死信交换机
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("TDL_EXCHANGE").durable(true).build();
    }
    //绑定死信队列和死信交换机
    @Bean
    public Binding deadLetterBinding(Queue deadLetterQueue,DirectExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("TDL_KEY");
    }


    //队列
    @Bean
    public Queue directQueue() {
        Map<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange", "TDL_EXCHANGE");
//       x-dead-letter-routing-key    声明 死信队列抛出异常重定向队列的routingKey(TKEY_R)
        map.put("x-dead-letter-routing-key", "TDL_KEY");
        return new Queue("directQueue", true, false, false, map);
    }
    //Direct交换机
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }
    //绑定  将队列和交换机绑定
    @Bean
    Binding bindingDirect(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001");
    }
}

3.接收消息

@Component
public class ReceiveMessage {

    @RabbitListener(queues = "directQueue")//监听的队列名称 directQueue
    public void process(Message message,Channel channel) throws IOException {
        //消息编号
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try{
            System.out.println("directReceiver消费者收到消息  : " + message);
            int a=10/0;//制造异常
            channel.basicAck(deliveryTag,true);//手动签收
        }catch (Exception e){
            //参数3:true将消息放回原来队列中,false不把消息放入原队列中
            channel.basicNack(deliveryTag,true,false);
        }
    }
}

消费消息异常后,没有签收消息,将消息不放回原有队列,这时消息被放入到私信队列中
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-07 21:47:01  更:2021-08-07 21:47:11 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 19:23:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码