IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> rabbitMQ使用【springboot整合】 -> 正文阅读

[Java知识库]rabbitMQ使用【springboot整合】

1、 MQ模型

1.1 简单队列模型

简单队列模型
publisher -----> queue ----> consumer
    
publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接收并缓存消息
consumer:订阅队列,处理队列中的消息

1.2 发布/订阅模型

在这里插入图片描述

publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给exchange(交换机)
exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
   
    exchange种类:
    	- Fanout:广播,将消息交给所有绑定到交换机的队列
    	- Direct:定向,将消息交给符合routingKey的队列
    	- Topic:主题(使用通配符),将消息交给符合routing pattern(路由模式)的队列

consumer:消费者,订阅队列,消费消息
queue:队列,接收消息,缓存消息

2、 SpringAMQP的使用

SpringAMQP地址:https://spring.io/projects/spring-amqp

2.1 Basic Queue 简单队列模型

  1. 导入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置MQ地址
spring:
  rabbitmq:
    host: 192.168.10.128 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: root # 用户名
    password: root # 密码
  1. 实体类与Controller类
@Data
public class MqMessage {
    private String message; // 消息
    private String queueName; // 队列名称
    private String exchange; // 交换机名称
    private String routingKey; // 路由键
}

    @PostMapping("/send")
    public String sendMsgService(@RequestBody MqMessage msg) {
        return msgService.sendMsg(msg);
    }
  1. impl 消息发送
@Service
public class MsgServiceImpl implements MsgService {
    @Autowired
    RabbitTemplate rabbitTemplate;
//    @Autowired
//    AmqpTemplate amqpTemplate; //也可以使用这个,一样的用

    @Override
    public String sendMsg(MqMessage msg) {
        try {
        	// 注意:这样写要先在rabbitmq中先创建对应队列!
            rabbitTemplate.convertAndSend(msg.getRoutingKey(), msg.getMessage()); // 该两个参数的方法是使用routingKey参数做队列名了;
            return "Success";
        } catch (Exception e) {
            e.printStackTrace();
            return e.getMessage();
        }
    }
}
  1. 调用
    在这里插入图片描述
    调用成功后可在rabbitmq的控制台看到刚发送到MQ的消息:
    在这里插入图片描述
  2. 消息消费(配置文件内容与publisher一样)
@Configuration
public class ConsumerListen {
 
    @RabbitListener(queues = "my.queue") // 必须先在控制台新建该名称队列或【可用下面自动创建队列的方式queuesToDeclare】
    public void listenSimple(String msg) throws InterruptedException {
        System.out.println("接收到的消息:" + msg);
    }
    
    /**
     * 使用@RabbitListener自动创建队列
     */
    @RabbitListener(queuesToDeclare = @Queue("myQueue")) // 自动创建队列
    public void myQueue(String msg) throws InterruptedException {
        System.out.println("myQueue接收到的消息:" + msg);
    }
}

另一种写法(未去验证):

@Component
@RabbitListener(queues = "my.queue")//监听的队列名称 my.queue
public class DirectReceiver {
 
    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("my.queue消费者收到消息  : " + testMessage.toString());
    } 
}

2.2 Work Queue模型

在这里插入图片描述
让多个消费者绑定到一个队列,共同消费队列中的消息。
场景:当消息处理比较耗时,生产速度远大于消耗速度,就可以使用此模型

			...
            for (int i = 0; i < 50; i++) {
                //发送多条消息
                rabbitTemplate.convertAndSend(msg.getRoutingKey(), msg.getMessage() + " " + i);
            }

	消费者:(多个消费者加监听simple.queue队列)
	消费者1
    @RabbitListener(queues = "my.queue")
    public void listenSimple(String msg) throws InterruptedException {
        System.out.println("1接收到的消息:" + msg);
        Thread.sleep(1000);
    }

    @RabbitListener(queues = "my.queue")
    public void listenSimple2(String msg) throws InterruptedException {
        System.out.println("2接收到的消息:" + msg);
    }

上面会有一个问题:消费者2很快就处理了25条信息,但是消费者1还在慢慢处理自己的消息。
出现的原因:消息是平均分配给每个消费者,没有考虑到消费者的处理能力
解决的办法:能者多劳
修改consumer的application.yml配置文件中添加配置

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

2.3 发布/订阅模型

在这里插入图片描述

2.3.1 Fanout模式(广播模式)

在这里插入图片描述
消息发送流程

  • 可以有多个队列
  • 每个队列绑定到exchange 生产者生产消息,发送到交换机上
  • 交换机把消息发送到绑定的所有队列
  • 订阅队列的消费者,消费消息

1、声明队列和交换机(publish)

@Component
public class RabbitQueueConfig {
    //声明队列1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }
    //声明队列2
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }
    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("amqp.fanout");
    }
}

2、把队列绑定到交换机上

//把队列1和队列2绑定到广播交换机上 
    @Bean
    public Binding bindingQueue1(){
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    @Bean
    public Binding bindingQueue2(){
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

3、发送消息

    @Override
    public String fanoutPublisher(MqMessage msg) {
        try {
            rabbitTemplate.convertAndSend(msg.getExchange(), "", msg.getMessage());
            return "Success";
        } catch (Exception e) {
            e.printStackTrace();
            return e.getMessage();
        }
    }

4、消费消息

    /**
     * fanout广播模式(生产者端将队列与交换器绑定:com.rabbit.provider.config.RabbitQueue)
     */
    // 消费者监听队列,并消费队列上的消息
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("fanout.queue1接收到的消息:" + msg);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("fanout.queue2接收到的消息:" + msg);
    }

2.3.2 direct模式(定向模式)

在这里插入图片描述
在fanout模式中,一条消息会被发布到所有订阅的队列,在某些场景下,我们希望不同的消息被不同的队列消费,就可以使用direct模式了

direct模式:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • publisher 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息

1、使用注解的方式,声明队列和交换机

//声明队列和交换机,并把队列绑定到交换机上
 
    //监听routingKey=red 、blue 的消息
    @RabbitListener(bindings = @QueueBinding(
        //队列
        value = @Queue(name = "direct.queue1"),
        //交换机
        exchange = @Exchange(name = "amqp.direct",type = ExchangeTypes.DIRECT),
        //路由key
        key = {"red","blue"}
    ))
    public void listenDirectQueue1(String msg){
        log.info("direct.queue1接收到的消息:{}",msg);
    }
 
    //监听routingKey=yellow 、blue 的消息
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "amqp.direct",type = ExchangeTypes.DIRECT),
        key = {"yellow","blue"}
    ))
    public void listenDirectQueue2(String msg){
        log.info("direct.queue2接收到的消息:{}",msg);
    }

2、声明发布者发送消息

    @Override
    public String directPublisher(MqMessage msg) {
        try {
            //将消息发送到交换机上
            rabbitTemplate.convertAndSend(msg.getExchange(), "red", "red key " + msg.getMessage());  //只有队列1接收到
            rabbitTemplate.convertAndSend(msg.getExchange(), "yellow", "yellow key " + msg.getMessage());  // 只有队列2接收到
            rabbitTemplate.convertAndSend(msg.getExchange(), "blue", "blue key " + msg.getMessage());   // 队列1和队列2都可以接收到
            return "Success";
        } catch (Exception e) {
            e.printStackTrace();
            return e.getMessage();
        }
    }

direct交换机和fanout交换机的差异:

  • fanout交换机将消息路由给每个与其绑定的队列
  • direct交换机根据routingkey判断路由给那个队列
  • 如果多个队列具有相同的routingkey,则与fanout功能相似

2.3.3 Topic模式(主题模式)

在这里插入图片描述
Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
1、使用注解的方式,声明队列和交换机

    @RabbitListener(bindings = @QueueBinding(
           value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "amqp.topic",type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        log.info("topic.queue1接收到的消息:{}",msg);
    }
 
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "amqp.topic",type = ExchangeTypes.TOPIC),
            key = "*.new"
    ))
    public void listenTopicQueue2(String msg){
        log.info("topic.queue2接收到的消息:{}",msg);
    }

2、声明发布者发布消息

    @Override
    public String topicPublisher(MqMessage msg) {
        //将消息发送到交换机上
        rabbitTemplate.convertAndSend(msg.getExchange(), "china.new", "china.new ," + msg.getMessage()); // 队列1、2接收到
        rabbitTemplate.convertAndSend(msg.getExchange(), "china.new.now", "china.new.now ," + msg.getMessage()); // 队列1接收到
        rabbitTemplate.convertAndSend(msg.getExchange(), "topic.new", "topic.new ," + msg.getMessage());   //队列2接收到
        return "Success";
    }

消息转换器

Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在的问题:数据体积过大、有安全漏洞、可读性差
解决问题:使用json转换器
配置json转换器
在publisher和consumer中引入依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

在启动类配置转换bean

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

3、在使用SpringAMQP中存在的问题


  • 消息可靠性问题 如何确保发送的消息至少被消费一次
  • 延迟消息问题 如何实现消息的延迟投递
  • 高可用问题 如何避免单点的MQ故障导致的不可用问题
  • 消息堆积 如何解决数百万消息堆积,无法及时消费问题

3.1 消息可靠性

#消息发送的流程
publisher—>exchange–>queue–>consumer
其中每一步都可能导致消息丢失,常见的丢失原因有:

  • 发送时丢失
    • 生产者发送消息为送达exchange
    • 消息到达exchange没有到queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后没有消费就宕机了

解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

3.1.1 生产者确认机制

这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机,但是没有路由到队列,返回ack及失败的原因

1、 导入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、 在publisher和consumer服务中添加application.yml配置文件

spring:
  rabbitmq:
    host: 192.168.10.128 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: root # 用户名
    password: root # 密码
    publisher-confirm-type: correlated
    publisher-returns : true
    template:
    	mandatory: true
  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple: 同步等待confirm结果直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

3、 定义return回调(ReturnCallback)
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
 
    // 为RabbitTemplate设置路由到队列失败时调用的方法
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnCallback((message, replyCode, replyTest, exchange, routingKey)
                -> log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                replyCode,replyTest,exchange,routingKey,message));
    }
}

4、定义ConfirmCallback
ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。

    @org.junit.Test
    public void testSendMessage(){
        //消息内容
        String message = "hello, spring amqp! ".concat(LocalDateTime.now().toString());
        //设置回调函数策略
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        correlationData.getFuture().addCallback(confirm -> {
                    //连接MQ正常
                    if(confirm.isAck()){
                        //正常ack情况 把消息发送到了交换机
                        log.debug("消息发送成功ack,ID{}",correlationData.getId());
                    }else{
                        //nack情况  消息没有发送到交换机
                        log.info("消息发送失败-nack,ID{}",correlationData.getId());
                    }
                }
                //连接MQ异常
                , throwable -> log.error("消息发送失败-连接mq异常,ID{}",correlationData.getId()));
        //发送消息
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE","DIRECT_ROUTING_KEY",message,correlationData);
    }

测试的时候一直报312,NO_ROUTE错误,经过几番验证,后面重新写了个交换机和队列就解决了,好像一个队列只能被一个交换机绑定!(查看网上说报312都是延迟消息发送,配置参数mandatory要改成false,但咱这里不是延迟消息) 所以先在消费者上创建后并启动:

    /**
     * 声明队列和交换机,并把队列绑定到交换机上
     * 监听routingKey=DIRECT_ROUTING_KEY 的消息
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue3"),
            exchange = @Exchange(name = "DIRECT_EXCHANGE", type = ExchangeTypes.DIRECT),
            key = {"DIRECT_ROUTING_KEY"}
    ))
    public void listenDirectQueue(String msg) {
        log.info("confim.ack接收到的消息:{}", msg);
    }

3.1.2 消费者确认机制

RabbitMQ是阅后即焚机制,确认消息被消费者消费后会立刻删除。

而RabbitMQ也是可以通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

修改application.yml配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 关闭ack  默认的是auto模式

本地失败重试机制

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改application.yml配置文件

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初始的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1、在consumer服务中定义处理失败消息啊的队列和交换机

//错误交换机
@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
//储存错误消息的队列
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
//把队列绑定到交换机上
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2、定义RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

3.1.3 消息持久化

生产者可以确定的将消息投递到mq中,但是消息发送到mq以后,突然宕机,导致消息丢失,因此想要确保消息在mq中,就需要开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

交换机持久化

@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.direct", true, false);
}

由SpringAMQP声明的交换机都是持久化的
在这里插入图片描述

队列持久化

    @Bean
    public Queue simpleQueue(){
        // 使用QueueBuilder构建队列,durable就是持久化的
        return QueueBuilder.durable("simple.queue").build();
    }
  
    @Bean
    public Queue simpleQueue(){
        //四个参数分别是:队列名,持久化,是否独有,当没有exchange与其绑定时是否自动删除
        return new Queue("fanout.queue2",true,false,false);
    }

由SpringAMQP声明的队列都是持久化的
在这里插入图片描述

消息持久化

利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode

  • 持久化
  • 非持久化
        Message message = MessageBuilder
                .withBody("hello, spring amqp!".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();

SpringAMQP发出的任何消息都是持久化的

3.1.4 延迟消息问题

死信交换机

什么是死信:
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

死信交换机(Dead Letter Exchange,DLX)如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中
在这里插入图片描述
1、在consumer服务中定义死信交换机、死信队列

//声明一个普通队列关联死信交换机
    @Bean
    public Queue simpleQueue(){
        return QueueBuilder.durable("simple.queue")
                .deadLetterExchange("dead.direct")
                .deadLetterRoutingKey("simple")
                .build();
    }
//声明死信交换机
    @Bean
    public DirectExchange dlExchange(){
        return new DirectExchange("dead.direct",true,false); 
    }
//声明死信队列
    @Bean
    public Queue dlQueue(){
        return new Queue("dead.queue",true);
    }
//将死信队列关联到死信交换机上
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
    }
# 消费者确认模式: auto
	acknowledge-mode: auto
    特征: 当消息不能被消费时,会重新入队,再次投递给消费者进行被消费
	default-requeue-rejected: false # 拒绝消息重新入队,如果队列绑定了死信交换机则消息会投递到死信交换机并路由到死信队列
# 本地重试:
	当本地重试次数耗尽时,如果当前队列没有绑定死信交换机或错误队列,则消息丢弃
    如果提供了错误队列,则消息投递到错误队列
    如果队列绑定了死信交换机,则消息以死信的形式存放到死信队列

TTL

(time to live 消息存活时间)
一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:

  • 消息所在的队列设置了超时时间
  • 消息本身设置了超时时间
    在这里插入图片描述
队列设置超时时间

1、声明 死信交换机、死信队列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.ttl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.ttl.direct"),
    key = "ttl"
))
public void listenDlQueue(String msg){
    log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}

2、声明一个队列,指定TTL

@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
        .ttl(10000) // 设置队列的超时时间,10秒
        .deadLetterExchange("dl.ttl.direct") // 指定死信交换机
        .deadLetterRoutingKey("ttl")
        .build();
}

3、声明交换机,将ttl.queue队列关联上

@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}

4、发送消息

@Test
public void testTTLQueue() {
    // 创建消息
    String message = "hello, ttl queue";
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    // 记录日志
    log.debug("发送消息成功");
}
消息设置超时时间
@Test
public void testSendTTLMessage() throws InterruptedException {
        // 1.消息体
//        String msg = "超时消息...";
        Message msg = MessageBuilder
                .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
                .setExpiration("5000")
                .build();
        // 2.发送消息
        rabbitTemplate.convertAndSend("ttl.direct","ttl", msg);
        log.info("发送消息成功...");
    }
}

延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

安装DelayExchange插件https://www.rabbitmq.com/community-plugins.html

安装DelayExchange

1、下载插件
2、我们是基于docker安装的rabbitmq,所以要把下载的文件挂载到数据卷中

// 查看数据卷的位置
docker volume inspect mq-plugins  

在这里插入图片描述
3、把下载的文件放在数据卷中

/var/lib/docker/volumes/mq-plugins/_data

4、安装插件
进入容器

docker exec -it mq bash

开启插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述
安装完成

使用DelayExchange

1、声明DelayExchange交换机

   //延迟队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue",durable = "true"),
            exchange = @Exchange(name = "delay.direct",delayed = "true"),
            key = "delay"
    ))
    public void listenDelayedQueue(String msg){
        log.info("接收到delay.queue的消息:{}",msg);
    }

2、发送消息
发送消息时,一定要携带x-delay属性,指定延迟时间

    @Test
    public void testDelayMsg(){
        Message message = MessageBuilder
                .withBody("hello delay message".getBytes(StandardCharsets.UTF_8))
                .setHeader("x-delay",1000)
                .build();
 
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("delay.direct","delay",message,correlationData);
        log.info("发送消息成功");
    }

3.3 消息堆积问题

消息堆积问题:
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃
解决消息堆积问题的思路:

  • 增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
  • 扩大队列容积,提高堆积上限

惰性队列

特征:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

设置惰性队列
1、使用命令行设置lazy-queue

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  
  • rabbitmqctl :RabbitMQ的命令行工具 set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • “^lazy-queue$” :用正则表达式匹配队列的名字
  • ‘{“queue-mode”:“lazy”}’ :设置队列模式为lazy模式
  • –apply-to queues:策略的作用对象,是所有的队列
    2、基于@Bean声明lazy-queue
   @Bean
    public Queue lazyQueue(){
        return QueueBuilder.durable("lazy-queue")
                .lazy()   //开启x-queue-mode 为lazy
                .build();
    }

3、基于@RabbitListener声明lazy-queue

    @RabbitListener(queuesToDeclare = @Queue(
            name = "lazy-queue",
            durable = "true",
            arguments = @Argument(name = "x-queue-mode",value = "lazy")
    ))
    public void listenLazyQueue(String msg){
        log.info("接收到消息:{}",msg);
    }
}
惰性队列的优点
 - 基于磁盘存储,消息上限高
 - 没有间歇性的page-out,性能比较稳定
惰性队列的缺点
 - 基于磁盘存储,消息时效性会降低
 - 性能受限于磁盘的IO

3.4 高可用问题

搭建集群

集群的分类:

  • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
  • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

……
普通集群
镜像集群
仲裁队列
集群扩容
增加仲裁队列副本

Exchange的4种类型:direct、fanout、topic、headers
RabbitMQ的常见队列模型,simple模式、work模式、fanout模式、direct模式、topic模式、headers模式、RPC
simple模式就是上文中的Basic模式

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-09-24 20:42:45  更:2022-09-24 20:46:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 8:48:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码