IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMQ第二话 -- Springboot基于四种Exchange(Direct、Fanout、Topic、Heders、延时队列)的实现和多虚拟主机下的生产消费者实现 -> 正文阅读

[Java知识库]RabbitMQ第二话 -- Springboot基于四种Exchange(Direct、Fanout、Topic、Heders、延时队列)的实现和多虚拟主机下的生产消费者实现

本文主要分享RabbitMQ exchange类型的功能和使用、RabbitMQ延时队列、一个springboot服务发送消息到多虚拟主机

1.RabbitMQ exchange

exchange交换机,负责分发消息,为解决消息不同的业务场景,也提供了不同的交换机类型。

  • 基于springboot2.5.6
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • yaml配置
spring:
  rabbitmq:
    addresses: 192.168.0.221:5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        #手动确认 当有自动确认机制 又手动ACK会报406错误
        acknowledge-mode: manual

2.DirectExchange

默认交换机,一对一,默认下根据队列名下发,有routerKey时根据key下发

2.1 java中实现

  • 创建队列与绑定
@Bean
public boolean createDirectQueue(@Autowired ConnectionFactory connectionFactory) {
    String exchange = "direct_exchange";
    String queue = "direct_queue";
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    rabbitAdmin.declareExchange(new DirectExchange(exchange));
    /**
     * Queue可传递参数说明
     * @params1 :队列名称
     * @params2 :队列是否持久化(如果是,则重启服务不会丢失)
     * @params3 :是否是独占队列(如果是,则仅限于此连接)
     * @params4 :是否自动删除(最后一条消息消费完毕,队列是否自动删除)
     */
    rabbitAdmin.declareQueue(new Queue(queue));
    rabbitAdmin.declareBinding(new Binding(queue, Binding.DestinationType.QUEUE,
            exchange, "", null));
    return true;
}

@GetMapping("/send")
public String send() {
    JSONObject object = new JSONObject();
    object.put("hello", "22222");
    //如有设置routingKey则需要指定key
    rabbitTemplate.convertAndSend("direct_exchange", "", object.toJSONString());
    return "success";
}
  • 消费者
@RabbitHandler
@RabbitListener(queues = {"direct_queue"})
public void onMessage(Message message, Channel channel) throws Exception {
    String msgBodyString = new String(message.getBody());
    JSONObject json = JSONObject.parseObject(msgBodyString);
    log.info("消费消息:{},{}", json, message.getMessageProperties().getConsumerQueue());
    //进行消费
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

2.2 测试日志

调用接口发送消息到RabbitMQ,查看控制台日志

c.e.rabbitmq.consum.RabbitmqConsume      : 消费消息:{"hello":"22222"},direct_queue

消费确认后就会被删除,只能消费一次

3.FanoutExchange

广播消息,所有队列都发,所有队列消费完毕后删除,如无消费者会保存在队列(开启持久化)等待一个消费者消费后删除

3.1 java中实现

  • 创建队列与绑定、发送消息
@Bean
public boolean createFanoutQueue(@Autowired ConnectionFactory connectionFactory) {
    String exchange = "fanout_exchange";
    String queue = "fanout_queue";
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    rabbitAdmin.declareExchange(new FanoutExchange(exchange));
	//该交换机为广播消息创建两个不同的队列
    for (int i = 0; i < 2; i++) {
        rabbitAdmin.declareQueue(new Queue(queue + i));
        rabbitAdmin.declareBinding(new Binding(queue + i, Binding.DestinationType.QUEUE,
                exchange, "", null));
    }
    return true;
}

@GetMapping("/send")
public String send() {
    JSONObject object = new JSONObject();
    object.put("hello", "22222");
    //如有设置routingKey则需要指定key
	//fanout
    rabbitTemplate.convertAndSend("fanout_exchange", "", object.toJSONString());
    return "success";
}
  • 消费者
@RabbitHandler
@RabbitListener(queues = {"fanout_queue0", "fanout_queue1"})
public void fonoutOnMessage(Message message, Channel channel) throws Exception {
    String msgBodyString = new String(message.getBody());
    JSONObject json = JSONObject.parseObject(msgBodyString);
    log.info("消费消息:{},{}", json, message.getMessageProperties().getConsumerQueue());
    //进行消费
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

3.2 测试日志

c.e.rabbitmq.consum.RabbitmqConsume      : 消费消息:{"hello":"22222"},fanout_queue1
c.e.rabbitmq.consum.RabbitmqConsume      : 消费消息:{"hello":"22222"},fanout_queue0

一条消息会广播到两个队列

4.TopicExchange

通配符模糊匹配routerKey,满足条件就发送

4.1 java中实现

  • 创建队列与绑定、发送
@Bean
public boolean createTopicQueue(@Autowired ConnectionFactory connectionFactory) {
    String exchange = "topic_exchange";
    String queue = "topic_queue";
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    rabbitAdmin.declareExchange(new TopicExchange(exchange));
	//匹配前缀的队列
    rabbitAdmin.declareQueue(new Queue(queue + "_before"));
    rabbitAdmin.declareBinding(new Binding(queue + "_before", Binding.DestinationType.QUEUE,
            exchange, "*." + queue, null));
	//匹配后缀的队列
    rabbitAdmin.declareQueue(new Queue(queue + "_after"));
    rabbitAdmin.declareBinding(new Binding(queue + "_after", Binding.DestinationType.QUEUE,
            exchange, queue + ".*", null));
    return true;
}

//topic 发送代码 同一个exchange 不同的routingKey
rabbitTemplate.convertAndSend("topic_exchange", "video.topic_queue", object.toJSONString());
rabbitTemplate.convertAndSend("topic_exchange", "topic_queue.music", object.toJSONString());

4.2 测试日志

c.e.rabbitmq.consum.RabbitmqConsume      : 消费消息:{"hello":"22222"},topic_queue_before
c.e.rabbitmq.consum.RabbitmqConsume      : 消费消息:{"hello":"22222"},topic_queue_after

经测试,均能正常消费

5.HeadersExchange

可以配置多个key-value形式的密钥,生产者在头部配置任意一个key-value即可发送到对应的队列

5.1 java中实现

  • 创建队列与绑定、发送
@Bean
public boolean createHeaderQueue(@Autowired ConnectionFactory connectionFactory) {
    String exchange = "header_exchange";
    String queue = "header_queue";
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    rabbitAdmin.declareExchange(new HeadersExchange(exchange));
	//创建两组密钥
    Map<String, Object> map = new HashMap<>();
    map.put("header_key1", "12345");
    map.put("header_key2", "123456");

    rabbitAdmin.declareQueue(new Queue(queue));
    rabbitAdmin.declareBinding(
            BindingBuilder.bind(new Queue(queue))
                    .to(new HeadersExchange(exchange))
                    //任意匹配一对 whereAny还提供了单个key 多个key匹配的方法
                    .whereAny(map).match());
    return true;
}

//发送 header 头部的key-value必须是完全匹配的
object.put("hello", "header_key1");
rabbitTemplate.convertAndSend("header_exchange", null,
        MessageBuilder.withBody(object.toJSONString().getBytes())
                .setHeader("header_key1", "12345").build());
object.put("hello", "header_key2");
rabbitTemplate.convertAndSend("header_exchange", null,
        MessageBuilder.withBody(object.toJSONString().getBytes())
                .setHeader("header_key2", "123456").build());

5.2 测试日志

c.e.rabbitmq.consum.RabbitmqConsume      : 消费消息:{"hello":"header_key1"},header_queue
c.e.rabbitmq.consum.RabbitmqConsume      : 消费消息:{"hello":"header_key2"},header_queue

能正常消费到发送的内容

6.RabbitMQ延时队列

6.1 延时队列原理说明

RabbitMQ提供消息过期队列,一条消息在指定时候后没有被消费掉则会被定义为过期,过期的消息可以通过配置转到其他的交换机去,如不配置则直接抛掉。
那么开发实现过程为:

  • 新建delayed_exchange_ttldelayed_queue_ttl队列,通过配置设置消息存活时间和过期以后存放的死信队列。发送需要延迟的消息到该交换机,该交换机下的队列无消费者。

  • 成为死信一般有以下几种情况:

    消息被拒绝(basic.reject or basic.nack)且带requeue=false参数
    消息的TTL-存活时间已经过期
    队列长度限制被超越(队列满)

  • 新建delayed_exchangedelayed_queue队列,存放已过期消息的死信队列,延时消息的消费者监听该队列

6.2 Java中实现

  • 创建与绑定、发送
@Bean
public boolean createDelayedQueue(@Autowired ConnectionFactory connectionFactory) {
    String exchange = "delayed_exchange";
    String queue = "delayed_queue";
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    //创建一个有定时过期时间的队列
    rabbitAdmin.declareExchange(new DirectExchange(exchange + "_ttl"));
    //队列的配置
    Map<String, Object> map = new HashMap<>();
    //过期时间
    map.put("x-message-ttl", 30000);
    //过期消息转达的死信交换机
    map.put("x-dead-letter-exchange", "delayed_exchange");
    //死信队列的routingKey
    map.put("x-dead-letter-routing-key", "delayed_exchange_key");
    rabbitAdmin.declareQueue(new Queue(queue + "_ttl", true, false, true, map));
    rabbitAdmin.declareBinding(new Binding(queue + "_ttl", Binding.DestinationType.QUEUE,
            exchange + "_ttl", "", null));

    //创建真正的消费者队列
    rabbitAdmin.declareExchange(new DirectExchange(exchange));
    rabbitAdmin.declareQueue(new Queue(queue));
    rabbitAdmin.declareBinding(new Binding(queue, Binding.DestinationType.QUEUE,
            exchange, "delayed_exchange_key", null));
    return true;
}

//发送代码
log.info("发送消息:{}",new Date().getTime());
        rabbitTemplate.convertAndSend("delayed_exchange_ttl", null, object.toJSONString());
  • 最终队列的图如下,ttl队列中的消息达到指定时间后会转存到delayed_queue队列去
    在这里插入图片描述
    在这里插入图片描述
  • 消费者代码
@RabbitHandler
@RabbitListener(queues = {"delayed_queue"})
public void onMessage(Message message, Channel channel) throws Exception {
    String msgBodyString = new String(message.getBody());
    JSONObject json = JSONObject.parseObject(msgBodyString);
    log.info("消费消息:{},{},{}", json,
            message.getMessageProperties().getConsumerQueue(),new Date().getTime());
    //进行消费
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

6.3 测试日志

c.e.rabbitmq.controller.Controller       : 发送消息:1655967316743
c.e.rabbitmq.consum.RabbitmqConsume      : 消费消息:{"hello":"1234567"},delayed_queue,1655967344564

总体来说是实现了延时功能,但是延时的时间有误差,只适合不是很重要的场景使用

需要注意的一个点:
队列一经创建,不可再次修改。
至于延时时间动态的,可以先注解@Autowired ConnectionFactory connectionFactory,然后再通过生产者声明创建指定时间的队列。

7.多虚拟主机实现

使用场景:业务服务需要发送消息到用户和订单服务,但是这两个服务又是不一样的虚拟主机下。这种情况就只能通过代码进行连接了。

7.1 生产者

配置我就省略了,反正不用自动装配就行

@Bean("orderPushRabbit")
public RabbitTemplate orderPushRabbit() {
    //Mq连接信息
    CachingConnectionFactory rabbitFactory = new CachingConnectionFactory();
    rabbitFactory.setAddresses(amqpAddress);
    rabbitFactory.setChannelCacheSize(Runtime.getRuntime().availableProcessors() * 2);
    rabbitFactory.setUsername(amqpUserName);
    rabbitFactory.setPassword(amqpPassword);
    rabbitFactory.setVirtualHost(vhost);
    // 初始化RabbitTemplate
    RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitFactory);
    MessageConverter serializerMessageConverter = new SerializerMessageConverter();
    rabbitTemplate.setMessageConverter(serializerMessageConverter);
    //设置exchange信息和routingKey
    rabbitTemplate.setExchange(exchange);
    rabbitTemplate.setRoutingKey(routingKey);  
    return rabbitTemplate;
}

如果有多个虚拟主机的复制上述配置即可,需要注意配置的不同即可。
生产者代码中使用

@Autowired
@Qualifier("orderPushRabbit")
RabbitTemplate PushRabbit;

@Autowired
@Qualifier("userPushRabbit")
RabbitTemplate userPushRabbit;

7.2 消费者

根据虚拟主机声明不同的连接工厂

@Bean(name = "orderConnectionFactory")
public ConnectionFactory orderConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setAddresses(amqpAddress);
    connectionFactory.setUsername(amqpUserName);
    connectionFactory.setPassword(amqpPassword);
    connectionFactory.setVirtualHost(vhost); // /user
    return connectionFactory;
}

@Bean("orderListenerContainer")
@Primary
public RabbitListenerContainerFactory orderListenerContainer(@Qualifier("orderConnectionFactory") ConnectionFactory orderConnectionFactory) {
    SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
    //设置对应虚拟主机的连接工厂
    container.setConnectionFactory(orderConnectionFactory);
    container.setMaxConcurrentConsumers(1);
    container.setConcurrentConsumers(1);
    container.setPrefetchCount(prefetch);
    //手动确认
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    container.setMessageConverter(new SimpleMessageConverter());
    return container;
}

以此类推,如果有多个虚拟主机,则有多份上述的配置。
@Primary注解标识在没有声明监听工厂时默认使用的

消费者代码

//containerFactory 指定队列的监听工厂
@RabbitListener(queues = {"order_exchange"}, containerFactory = "orderListenerContainer")
public void onMessage(Message message, Channel channel) throws Exception {
    String msgBodyString = new String(message.getBody());
    JSONObject json = JSONObject.parseObject(msgBodyString);
    log.info("消费消息:{}", json);
    //进行消费
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

8.总结

上面有说明死信的条件,如没有死信队列,可能会导致死循环。
消息确认不通过或者异常会重新进入到队列头部,接下来消费者又会消费此条消息由此造成死循环。
因为消费者最好遵循以下几点:
1.捕获消息处理过程中的异常,一定要自己手动确认还是不确认(由其他消费者消费)

2.重试次数记录,如果规范消息协议应该是有次数字段的,消费者根据重试次数来做异常消息日志等操作

3.根据配置自动丢到死信队列,前提是创建队列时需要指定死信队列和key

listener:
  simple:
    concurrency: 5
    prefetch: 10
    retry:
      enabled: true   # 允许消息消费失败的重试
      max-attempts: 3   # 消息最多消费次数3次

以上就是本章的全部内容了。

上一篇:RabbitMQ第一话 – docker安装RabbitMQ以及Springboot集成RabbitMQ
下一篇:RabbitMQ第三话 – RabbitMQ高可用集群搭建

贵有恒何必三更眠五更起,最无益只怕一日曝十日寒

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-06-26 16:47:16  更:2022-06-26 16:48:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 16:43:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码