IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> SpringBoot整合RabbitMQ Direct、Topic、Fanout模式 -> 正文阅读

[Java知识库]SpringBoot整合RabbitMQ Direct、Topic、Fanout模式

1、SpringBoot整合RabbitMQ

1.1、Direct 模式(路由模式)

在这里插入图片描述

  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key

  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列

  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息

  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

模式说明:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey

  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息- 的 Routing key 完全一致,才会接收到消息

1.2、Topic 模式(通配符模式)

在这里插入图片描述

  • Queue Q1:绑定的routing key是 *.orange.*,因此第二个单词是orange并且含有第三个单词的routing key 都会被匹配到。例如:tomatoes.orange.small
  • Queue Q2:绑定的routing key 分别是*.*.rabbite 和Lazy.# ,因此第三个单词rabbite或以Lazy开头的routing key 都会被匹配

模式说明:

  • Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符

  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

  • 通配符规则:# 匹配一个或多个词* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

1.3、Fanout模式(广播模式)

在这里插入图片描述

不处理路由键,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上

Fanout交换机转发消息是最快的

1.4、代码整合

  1. 添加依赖

     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>
    
  2. 配置整合

    1. yaml配置参数
      spring:
        rabbitmq:
          host: 192.168.0.134
          port: 5672
          username: admin
          password: admin
          virtual-host: /admin
      
    2. 生产者配置文件
      /**
       * @author 
       */
      @Configuration
      public class RabbitMqConfig {
      
          /**
           * topic通配符模式-交换机
           */
          public static final String  TOPIC_EXCHANGE = "topic_exchange";
          /**
           * topic通配符模式-队列
           */
          public static final String TOPIC_QUEUE = "topic_queue";
          /**
           * topic通配符模式-路由键
           * #:匹配所有
           * *:匹配单一单词
           */
          public static final String TOPIC_ROUTING = "topic.routing.#";
      
          /**
           * direct路由模式-交换机
           */
          public static final String  DIRECT_EXCHANGE = "direct_exchange";
          /**
           * direct路由模式-队列
           */
          public static final String DIRECT_QUEUE = "direct_queue";
          /**
           * direct路由模式-路由键
           */
          public static final String DIRECT_ROUTING = "direct.routing";
      
          /**
           * fanout广播模式-交换机
           */
          public static final String  FANOUT_EXCHANGE = "fanout_exchange";
          /**
           * fanout广播模式-队列
           */
          public static final String FANOUT_QUEUE_01 = "fanout_queue_01";
          /**
           * fanout广播模式-队列
           */
          public static final String FANOUT_QUEUE_02 = "fanout_queue_02";
      
          /**
           * 1、声明交换机
           * topic通配符模式,默认持久化,非自动删除
           * @return
           */
          @Bean(TOPIC_EXCHANGE)
          public Exchange topicExchange(){
      
              return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE).durable(true).build();
          }
      
          /**
           * 2、声明队列
           * topic通配符模式
           * @return
           */
          @Bean(TOPIC_QUEUE)
          public Queue topicQueue(){
      
              return QueueBuilder.durable(TOPIC_QUEUE).build();
          }
      
          /**
           * 3、队列与交换机进行绑定
           * topic通配符模式
           * @param queue @Qualifier 将 value 对应的bean 注入到参数中
           * @param exchange @Qualifier 将 value 对应的bean 注入到参数中
           * @return
           */
          @Bean
          public Binding topicQueueExchange(@Qualifier(TOPIC_QUEUE) Queue queue, @Qualifier(TOPIC_EXCHANGE) Exchange exchange){
              return BindingBuilder.bind(queue).to(exchange).with(TOPIC_ROUTING).noargs();
          }
      
      
          /**
           * 1、声明交换机
           * direct路由模式,默认持久化,非自动删除
           * @return
           */
          @Bean(DIRECT_EXCHANGE)
          public Exchange directExchange(){
      
              return ExchangeBuilder.directExchange(DIRECT_EXCHANGE).build();
          }
      
          /**
           * 2、声明队列
           * direct路由模式
           * @return
           */
          @Bean(DIRECT_QUEUE)
          public Queue directQueue(){
      
              return QueueBuilder.durable(DIRECT_QUEUE).build();
          }
      
          /**
           * 3、队列与交换机进行绑定
           * direct路由模式
           * @param queue @Qualifier 将 value 对应的bean 注入到参数中
           * @param exchange @Qualifier 将 value 对应的bean 注入到参数中
           * @return
           */
          @Bean
          public Binding directQueueExchange(@Qualifier(DIRECT_QUEUE) Queue queue, @Qualifier(DIRECT_EXCHANGE) Exchange exchange){
              return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING).noargs();
          }
      
          /**
           * 1、声明交换机
           * 广播模式,默认持久化,非自动删除
           * @return
           */
          @Bean(FANOUT_EXCHANGE)
          public Exchange fanoutExchange(){
      
              return ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE).durable(true).build();
          }
      
          /**
           * 2、声明队列
           * 广播模式
           * @return
           */
          @Bean(FANOUT_QUEUE_01)
          public Queue fanout01Queue(){
      
              return QueueBuilder.durable(FANOUT_QUEUE_01).build();
          }
          /**
           * 2、声明队列
           * 广播模式
           * @return
           */
          @Bean(FANOUT_QUEUE_02)
          public Queue fanout02Queue(){
      
              return QueueBuilder.durable(FANOUT_QUEUE_02).build();
          }
      
          /**
           * 3、队列与交换机进行绑定
           * 广播模式
           * @param queue @Qualifier 将 value 对应的bean 注入到参数中
           * @param exchange @Qualifier 将 value 对应的bean 注入到参数中
           * @return
           */
          @Bean
          public Binding fanout01QueueExchange(@Qualifier(FANOUT_QUEUE_01) Queue queue, @Qualifier(FANOUT_EXCHANGE) Exchange exchange){
              return BindingBuilder.bind(queue).to(exchange).with("").noargs();
          }
          /**
           * 3、队列与交换机进行绑定
           * 广播模式
           * @param queue @Qualifier 将 value 对应的bean 注入到参数中
           * @param exchange @Qualifier 将 value 对应的bean 注入到参数中
           * @return
           */
          @Bean
          public Binding fanout02QueueExchange(@Qualifier(FANOUT_QUEUE_02) Queue queue, @Qualifier(FANOUT_EXCHANGE) Exchange exchange){
              return BindingBuilder.bind(queue).to(exchange).with("").noargs();
          }
      }
      
      
    3. 生产者消息推送,需要spring-boot-starter-test测试包,根据实际情况引入
      
      /**
       * @author
       */
      @SpringBootTest
      public class ProducerTest {
      
          //注入 RabbitTemplate
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
          @Test
          public void sendTopic() {
              rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE, RabbitMqConfig.TOPIC_ROUTING, "topic模式消息");
          }
          @Test
          public void sendDirect() {
              rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE, RabbitMqConfig.DIRECT_ROUTING, "direct模式消息");
          }
          @Test
          public void sendFanout() {
              rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE,"", "fanout模式消息");
          }
      }
      
  3. 消费者接收类

    
    @Component
    public class DirectListener {
    
        /**
         * 定义方法进行信息的监听
         * RabbitListener中的参数用于表示监听的是哪一个队列
         */
          @RabbitListener(queues = RabbitMqConfig.DIRECT_QUEUE)
          public void ListenerQueue(Message message){
              System.out.println("direct message:"+new String(message.getBody()));
          }
    }
    
    @Component
    public class Fanout01Listener {
    
        /**
         * 定义方法进行信息的监听
         * RabbitListener中的参数用于表示监听的是哪一个队列
         */
          @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_01)
          public void ListenerQueue(Message message){
              System.out.println("fanout01 message:"+new String(message.getBody()));
          }
    }
    
    @Component
    public class Fanout02Listener {
    
        /**
         * 定义方法进行信息的监听
         * RabbitListener中的参数用于表示监听的是哪一个队列
         */
          @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_02)
          public void ListenerQueue(Message message){
              System.out.println("fanout02 message:"+new String(message.getBody()));
          }
    }
    
    @Component
    public class TopicListener {
    
        /**
         * 定义方法进行信息的监听
         * RabbitListener中的参数用于表示监听的是哪一个队列
         */
          @RabbitListener(queues = RabbitMqConfig.TOPIC_QUEUE)
          public void ListenerQueue(Message message){
              System.out.println("topic message:"+new String(message.getBody()));
          }
    }
    
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-07-05 23:25:28  更:2022-07-05 23:27:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 15:14:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码