IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ 高级特性 -> 正文阅读

[大数据]RabbitMQ 高级特性

一,消息可靠性投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。 RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
  • confifirm 确认模式
  • return 退回模式
rabbitmq 整个消息投递的路径为:
producer ---> rabbitmq broker ---> exchange ---> queue ---> consumer
rabbitmq broker:简单来说就是消息队列服务器实体。中文意思:中间件。接受客户端连接,实现AMQP消息队列和路由功能的进程。一个broker里可以开设多个vhost,用作不同用户的权限分离。

  • 消息从 producer exchange 则会返回一个 confirmCallback 。
  • 消息从 exchange queue 投递失败则会返回一个 returnCallback

通过 confirmCallback 判定 消息是否成功到达exchange

因为returnCallback 是?启动消息失败返回,比如路由不到队列时触发回调?

?我们将利用这两个 callback 控制消息的可靠性投递

1,confirm确认模式代码实现

代码实现:

? ? ? ? 1,创建maven 工程,消息的生产者工程,项目模块名称:rabbitmq-producer-spring

? ? ? ? 2,添加依赖

<dependencies> 
    <dependency> 
        <groupId>org.springframework</groupId> 
        <artifactId>spring‐context</artifactId> 
        <version>5.1.7.RELEASE</version> 
    </dependency> 
    <dependency> 
        <groupId>org.springframework.amqp</groupId> 
        <artifactId>spring‐rabbit</artifactId> 
        <version>2.1.8.RELEASE</version> 
    </dependency> 
    <dependency> 
        <groupId>junit</groupId> 
        <artifactId>junit</artifactId> 
        <version>4.12</version> 
    </dependency> 
    <dependency> 
        <groupId>org.springframework</groupId> 
        <artifactId>spring‐test</artifactId> 
        <version>5.1.7.RELEASE</version> 
    </dependency> 
</dependencies> 
<build> 
    <plugins> 
        <plugin> 
            <groupId>org.apache.maven.plugins</groupId> 
            <artifactId>maven‐compiler‐plugin</artifactId> 
            <version>3.8.0</version> 
        <configuration> 
            <source>1.8</source> 
            <target>1.8</target> 
        </configuration> 
        </plugin> 
    </plugins> 
</build>

3,在 resources 目录下创建 rabbitmq.properties 配置文件,添加链接RabbitMQ相关信息

rabbitmq.host=172.16.98.133 
rabbitmq.port=5672 
rabbitmq.username=guest 
rabbitmq.password=guest 
rabbitmq.virtual‐host=/
4. resources 目录下创建 spring-rabbitmq-producer.xml 配置文件,添加以下配置
<?xml version="1.0" encoding="UTF‐8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema‐instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans     http://www.springframework.org/schema/beans/spring‐beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring‐context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring‐rabbit.xsd"> 

<!‐‐加载配置文件‐‐> 
<context:property‐placeholder location="classpath:rabbitmq.properties"/> 

<!‐‐ 定义rabbitmq connectionFactory 1. 设置 publisher‐confirms="true" ‐‐> <rabbit:connection‐factory id="connectionFactory" 
                           host="${rabbitmq.host}"
                           port="${rabbitmq.port}" 
                           username="${rabbitmq.username}"                                 
                           password="${rabbitmq.password}" 
                           virtual‐host="${rabbitmq.virtual‐host}" 
                           publisher‐confirms="true" /> 

<!‐‐定义管理交换机、队列‐‐> 
<rabbit:admin connection‐factory="connectionFactory"/> 

<!‐‐定义rabbitTemplate对象操作可以在代码中方便发送消息‐‐> 
<rabbit:template id="rabbitTemplate" connection‐factory="connectionFactory"/> 

<!‐‐2. 消息可靠性投递(生产端)‐‐> 
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>     
   <rabbit:direct‐exchange name="test_exchange_confirm"> 
        <rabbit:bindings> 
            <rabbit:binding queue="test_queue_confirm" key="confirm"> 
            </rabbit:binding> 
        </rabbit:bindings> 
    </rabbit:direct‐exchange> 
</beans>

或者配置yml 文件

5, 编写测试代码

@RunWith(SpringJUnit4ClassRunner.class) 
@ContextConfiguration(locations = "classpath:spring‐rabbitmq‐producer.xml") 
public class ProducerTest { 

@Autowired 
private RabbitTemplate rabbitTemplate; 
/**
* 确认模式: 
* 步骤: 
* 1. 确认模式开启:ConnectionFactory中开启publisher‐confirms="true" 
* 2. 在rabbitTemplate定义ConfirmCallBack回调函数 
*/ 

@Test 
public void testConfirm() { 

    //2. 定义回调 ** 
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { 

        /***
         * @param correlationData 相关配置信息 
         * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败 
         * @param cause 失败原因 
        */ 
    @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { 

        System.out.println("confirm方法被执行了...."); 

        if (ack) { 
            //接收成功 
            System.out.println("接收成功消息" + cause); 
        } else { 
            //接收失败 
            System.out.println("接收失败消息" + cause); 
            //做一些处理,让消息再次发送。 
        } 
    } 
}); 

//3. 发送消息 
rabbitTemplate.convertAndSend("test_exchange_confirm111", "confirm", "message confirm...."); 
    } 
}


@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class confirmTest4 implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 确认模式:
     * 步骤:
     * 1,确认模式开启:在yml 配置文件里配置 publisher-confirms: true
     * 2, 在rabbitTemplate 定义ConfirmCallBack 回调函数
     */

    @Test
    public void testConfirms() {
        rabbitTemplate.setConfirmCallback(new confirmTest4());

        Map<String, String> msg = new HashMap<>();
        msg.put("pageId", "5abefd525b05aa293098fca6");
        //转成json串
        String jsonString = JSON.toJSONString(msg);
        rabbitTemplate.convertAndSend("ex_routing_cms_postpage", "5abefd525b05aa293098fca6", jsonString);
    }
    /**
     *
     * @param correlationData 相关配置信息
     * @param ack exchange交换机 是否成功收到了消息,true成功,false代表失败
     * @param cause 失败原因
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("confirm 方法开始执行..............");

        if (ack) {
            //接收成功
            System.out.println("接收成功的消息:" + cause);
        } else {
            //接收失败
            System.out.println("接收失败消息:" + cause);
            //接收失败后,我们做一些处理,让消息再次发送,达到消息可靠性传递
        }
    }

其中重写confirm 方法里的参数,

? ? ? ? correlationData:消息唯一标识

? ? ? ? ack:确认结果

????????cause:失败原因

?这是发送消息到queue 成功后,展示的内容,

但是如果发送失败是这样的

?当发送失败是可以配置return 退回模式代码。

2,?return退回模式代码实现

回退模式:? 当消息发送给EXchange 后,Exchange 路由到Queue 失败是才会执行ReturnCallBack,具体实现如下:

? ? ? ? 1,在Spring-rabbitmq-producer.xml 配置文件,在rabbit:connection-factory 节点添加配置

publisher‐returns="true"

或者 在application.yml 中设置

设置交换机处理失败消息的模式,两种方法,一种写在代码里,一种配置文件

1, 配置文件

?2,代码方式:

/**
* 步骤:
* 1. 开启回退模式:publisher‐returns="true"
* 2. 设置ReturnCallBack
* 3. 设置Exchange处理消息的模式:
* 1. 如果消息没有路由到Queue,则丢弃消息(默认)
* 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
*/
@Test
public void testReturn() {

    //设置交换机处理失败消息的模式
    rabbitTemplate.setMandatory(true);

    //2.设置ReturnCallBack
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

    /**
    *
    * @param message 消息对象
    * @param replyCode 错误码
    * @param replyText 错误信息
    * @param exchange 交换机
    * @param routingKey 路由键
    */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
    String exchange, String routingKey) {
        
        System.out.println("return 执行了....");
        System.out.println(message);
        System.out.println(replyCode);
        System.out.println(replyText);
        System.out.println(exchange);
        System.out.println(routingKey);
        //处理
    }
});

    //3. 发送消息
    rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message
    confirm....");
}

package com.xuecheng.manage_cms.returns;

import com.alibaba.fastjson.JSON;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.HashMap;
import java.util.Map;


@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ReturnTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;



    @Test
    public void testReturn() {

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message 消息对象
             * @param replyCode 错误码
             * @param replyText 错误信息
             * @param exchange 交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 执行了..........");
                System.out.println("message = " + message);
                System.out.println("replyCode = " + replyCode);
                System.out.println("replyText = " + replyText);
                System.out.println("exchange = " + exchange);
                System.out.println("routingKey = " + routingKey);

            }
        });

        Map<String, String> msg = new HashMap<>();
        msg.put("pageId", "5abefd525b05aa293098fca6");
        //转成json串
        String jsonString = JSON.toJSONString(msg);
        rabbitTemplate.convertAndSend("ex_routing_cms_postpage", "5abefd525b05aa293098fca61", jsonString);
    }
}

设置routingKey为一个不符合规则的key,观察控制台打印结果。

returnedMessage方法中参数

????????消息主体message : message
? ? ? ? 消息主体 message : replyCode
? ? ? ? 描述:replyText
? ? ? ? 消息使用的交换器 exchange : exchange
? ? ? ? 消息使用的路由键 routing : routingKey

总结

对于确认模式

设置 ConnectionFactory publisher-confifirms="true" 开启 确认模式。
使用 rabbitTemplate.setConfifirmCallback 设置回调函数。当消息发送到 exchange 后回调 confiirm方法。在方法中判断 ack ,如果为 true ,则发送成功,如果为 false ,则发送失败,需要处理。
对于退回模式
设置 ConnectionFactory publisher-returns="true" 开启 退回模式。
使用 rabbitTemplate.setReturnCallback 设置退回函数,当消息从 exchange 路由到 queue 失败后,如果设置
rabbitTemplate.setMandatory(true) 参数,则会将消息退回给 producer 。并执行回调函数
returnedMessage

RabbitMQ 中也提供了事务机制,但是性能较差,此处不做讲解。
使用 channel 列方法,完成事务控制:
txSelect(), 用于将当前 channel 设置成 transaction 模式
txCommit() ,用于提交事务
txRollback(), 用于回滚事务

?二,Consumer ACK(消息接收确认

消息消费者如何通知 Rabbit 消息消费成功?

ackAcknowledge,确认。 表示消费端收到消息后的确认方式。

有三种确认方式:
? 自动确认: acknowledge=" none "
? 手动确认: acknowledge=" manual "
? 根据异常情况确认: acknowledge=" auto " ,(这种方式使用麻烦,不作讲解)
其中自动确认是指,当消息一旦被 Consumer 接收到,则自动确认收到,并将相应 message RabbitMQ 的消息 缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认方式,则需要在业务处理成功后,调用 channel.basicAck(),手动签收,如果出现异常,则调 用 channel.basicNack() 方法,让其自动重新发送消息

代码实现

1. 创建 maven 工程,消息的消费者工程,项目模块名称: rabbitmq-consumer-spring
2. 添加依赖
<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring‐context</artifactId>
        <version>5.1.7.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring‐rabbit</artifactId>
        <version>2.1.8.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring‐test</artifactId>
        <version>5.1.7.RELEASE</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven‐compiler‐plugin</artifactId>
            <version>3.8.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>        
    </plugins>
</build>
3. resources 目录下创建 rabbitmq.properties 配置文件,添加链接 RabbitMQ 相关信息
rabbitmq.host = 172.16.98.133
rabbitmq.port = 5672
rabbitmq.username = guest
rabbitmq.password = guest
rabbitmq.virtual‐host = /
4. resources 目录下创建 spring-rabbitmq-consumer.xml 配置文件,添加以下配置

<?xml version="1.0" encoding="UTF‐8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema‐instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring‐beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring‐context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring‐rabbit.xsd">

<!‐‐加载配置文件‐‐>
<context:property‐placeholder location="classpath:rabbitmq.properties"/>

<!‐‐ 定义rabbitmq connectionFactory ‐‐>
<rabbit:connection‐factory id="connectionFactory" 
                           host="${rabbitmq.host}"
                           port="${rabbitmq.port}"
                           username="${rabbitmq.username}"
                           password="${rabbitmq.password}"

                           virtual‐host="${rabbitmq.virtual‐host}"/>

<context:component‐scan base‐package="com.itheima.listener" />

<!‐‐定义监听器容器 添加 acknowledge="manual" 手动‐‐>
<rabbit:listener‐container connection‐factory="connectionFactory" acknowledge="manual"
>
<rabbit:listener ref="ackListener" queue‐names="test_queue_confirm">
</rabbit:listener>
</rabbit:listener‐container>
</beans>

5. 编写 ackListener 监听类实现 ChannelAwareMessageListener 接口

/**
 * Consumer ACK机制:
 * 1. 设置手动签收。acknowledge="manual"
 * 2. 让监听器类实现ChannelAwareMessageListener接口
 * 3. 如果消息成功处理,则调用channel的 basicAck()签收
 * 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
 */
@Component
public class ackListener implements ChannelAwareMessageListener {

    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = "ex_routing_cms_postpage",type = "direct"),
            value = @Queue(value = "queue_cms_postpage_01",durable = "true"),
            key = "#.#"
    ))

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //1, 接收转换消息
            System.out.println("接收消息内容:"+new String(message.getBody()));
            //2, 处理业务逻辑
            System.out.println("处理业务逻辑");
            //int i = 2/0 ; // 代码运行错误
            //3,手动签收
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
            e.printStackTrace();
            System.out.println("代码执行错误了。。。。。。");
            //4,代码出现错误后,设置拒绝签收,重新发送
            //第三个参数:requeue: 消息重回队列,如何设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
            channel.basicNack(deliveryTag,true,true);
            //了解
            //channel.basicReject(deliveryTag,true);
        }
    }
}
上面的
? ? @RabbitListener(bindings = @QueueBinding(
? ? ? ? ? ? exchange = @Exchange(value = "ex_routing_cms_postpage",type = "direct"),
? ? ? ? ? ? value = @Queue(value = "queue_cms_postpage_01",durable = "true"),
? ? ? ? ? ? key = "#.#"
? ? ))
是监听注解
消息发送,通过ack 可以知道?Rabbit 消息消费成功。
如果消费者接受消息后,执行代码失败,可让消息重回队列,重新发送消息给消费端。

小结:

  • rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
  • 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
  • 如果出现异常,则在catch中调用 basicNackbasicReject,拒绝消息,让MQ重新发送消息。
如何保证消息的高可靠性传输?
1. 持久化
? exchange 要持久化
? queue 要持久化
? message 要持久化
2. 生产方确认 Confifirm
3. 消费方确认 Ack
4.Broker 高可用

三,消费端限流

?

? ? ? ? 代码实现:

? ? ? ? 1, 编写一个监听类,保证当前的监听类的消息处理机制是ACK(手动方式)

@Component 
public class QosListener implements ChannelAwareMessageListener { 

    @Override 
    public void onMessage(Message message, Channel channel) throws Exception { 

        Thread.sleep(1000); 
        
        //1.获取消息 
        System.out.println(new String(message.getBody())); 

        //2. 处理业务逻辑 
        //3. 签收 
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); 
    } 
}

2,在配置文件的listener-container 配置属性中添加配置(或者用@Confirguration)

<rabbit:listener‐container connection‐factory = "connectionFactory" acknowledge = "manual"
prefetch = "1" >
配置说明:
perfetch = 1, 表示消费端每次从 mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下 一条消息。


/**
 * Consumer ACK机制:
 * 1. 设置手动签收。acknowledge="manual"
 * 2. 让监听器类实现ChannelAwareMessageListener接口
 * 3. 如果消息成功处理,则调用channel的 basicAck()签收
 * 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
 */
@Component
public class ackListener implements ChannelAwareMessageListener {

    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = "ex_routing_cms_postpage",type = "direct"),
            value = @Queue(value = "queue_cms_postpage_01",durable = "true"),
            key = "#.#"
    ))
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            System.out.println("一个进来了");
            Thread.sleep(10000);
            System.out.println("一个结束了");
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

总结:

rabbit:listener-container 中配置 prefetch 属性设置消费端一次拉取多少消息
消费端的确认模式一定为手动确认。 acknowledge="manual"

四,TLL(TIME TO LIVE)

存活时间/过期时间: 当消息到达存活时间后,还没有被消费,会被自动清除.

RabbitMQ 可以对消息设置过期时间,也可以对整个队列(Queue) 设置过期时间.

代码实现:

????????设置队列的过期时间:

? ? ? ? 1,在消息的生产方,在spring-rabbitmq-producer.xml配置文件中,添加如下配置

<!‐‐ttl‐‐>
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
    <!‐‐设置queue的参数‐‐>
    <rabbit:queue‐arguments>
        <!‐‐x‐message‐ttl指队列的过期时间‐‐>
        <entry key="x‐message‐ttl" value="100000" value‐type="java.lang.Integer"/>
    </rabbit:queue‐arguments>
</rabbit:queue>


<rabbit:topic‐exchange name="test_exchange_ttl" >
    <rabbit:bindings>
        <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic‐exchange>

?2 编写发送消息测试方法

@Test 
public void testTtl() { 

    for (int i = 0; i < 10; i++) { 
    // 发送消息 
        rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");         
    } 
}
测试结果:当消息发送成功后,过 10s 后在 RabbitMQ的管理控制台会看到消息会自动删除。

设置单个消息的过期时间

? ? ? ? 编写代码测试,并且设置队列的过期时间为100s,单个消息的过期时间为5s,

@Test 
public void testTtl() { 

    // 消息后处理对象,设置一些消息的参数信息 
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { 

        @Override 
        public Message postProcessMessage(Message message) throws AmqpException { 
    
        //1.设置message的信息 
        message.getMessageProperties().setExpiration("5000");

        //消息的过期时间 
        //2.返回该消息 
        return message; 
    } 
};

    //消息单独过期 
    rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor); 

    for (int i = 0; i < 10; i++) { 
        if(i == 5){ 
            //消息单独过期 
            rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor); 
        }else{
            //不过期的消息 
            rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl...."); 
    } 
} 
}    

消息过期就是在发送消息后面加个参数,??MessagePostProcessor 类, 这个类里配置过期时间,那么这条消息过期时间就不一样

如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
????????
????????队列过期后,会将队列所有消息全部移除。
????????
????????消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉 )

总结:

设置队列过期时间使用参数: x-message-ttl ,单位: ms( 毫秒 ) ,会对整个队列消息统一过期。
设置消息过期时间使用参数: expiration 。单位: ms( 毫秒 ),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
如果两者都进行了设置,以时间短的为准。

五,死信队列

死信队列,英文缩写: DLX Dead Letter Exchange(死信交换机) 当消息成为 Dead message 后,可以被重新 发送到另一个交换机,这个交换机就是 DLX

死信队列:没有被及时消费的消息存放的队列

?消息成为死信的三种情况:

? ? ? ? 1,队列消息长度到达极限

? ? ? ? 2,消费者拒接消费消息

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-09 10:18:18  更:2021-08-09 10:18:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 19:49:39-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码