IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ 交换机、死信、SpringBoot整合以及集群的搭建 -> 正文阅读

[大数据]RabbitMQ 交换机、死信、SpringBoot整合以及集群的搭建

1、交换机 Exchanges

1.1、Exchanges

交换机(Exchanges)是什么?

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

交换机(Exchanges)的类型

  • 直接(direct)
  • 主题(topic)
  • 标题(headers)
  • 扇出(fanout)

无名 exchange

channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话。

1.2、临时队列

每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。

创建临时队列的方式如下:

String queueName = channel.queueDeclare().getQueue();

1.3、绑定(bindings)

binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队
列进行了绑定关系。
在这里插入图片描述
1.4、扇出(fanout)

Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认有些 exchange 类型。而交换机和临时队列的关系如下。
在这里插入图片描述
简单理解的来说,也就是一条消息发送经过交换机之后可以由多个消费者进行消费消息,而在生活中比较常见的例子就是微信群或者qq群一个人发的消息其他的群成员都可以接收到。下面由代码进行分析,这里需要两个消费者绑定到交换机进行接收消息。两个消费者的代码都一致,这里就给出一个的代码。另外就是这里的RabbitmqUtil是之前进行抽离出来的生产信道的工具类,可以参考该博文 :消息中间件之RabbitMQ的安装及消息发送接收

    // 交换机
    public static final String EXCHANGE = "log";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE, "fanout");
        // 声明临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 进行绑定
        channel.queueBind(queueName, EXCHANGE, "");
        System.out.println("等待接收");

        // 接收
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费中断");
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }

之后进行生产者发送消息,同样的绑定到这个交换机。

    // 交换机
    public static final String EXCHANGE = "log";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE, "fanout");

        Scanner sc = new Scanner(System.in);
        while (sc.hasNext()) {
            // 消息内容
            String message = sc.next();
            channel.basicPublish(EXCHANGE, "", null, message.getBytes());
            System.out.println("发送成功");
        }
    }

之后运行起来,进行发送消息,可以看到效果,在两个消费者当中都会接收到消息,这也就是fanout扇出。
在这里插入图片描述
1.5、直接(Direct)

队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key,创建绑定我们用代码,绑定之后的意义由其交换类型决定

channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");

而对于直接类型也就是发送消息绑定的key是唯一的,也就是发送的消息只能由一个绑定的key值的交换机进行接收。

消费者代码如下:并且这里我们对一个队列可以进行绑定多个key值,也就是对console绑定了info和warning。

    public static final String EXCHANGE = "direct";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE, "direct");
        // 声明队列
        channel.queueDeclare("console", true, false, false, null);
        // 进行绑定
        channel.queueBind("console", EXCHANGE, "info");
        channel.queueBind("console", EXCHANGE, "warning");
        System.out.println("等待接收");

        // 接收
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费中断");
        };
        channel.basicConsume("console", true, deliverCallback, cancelCallback);
    }

同样的另一个消费者代码也是一致,只需要将队列名称和绑定的key值进行修改一下即可,最后我们在浏览器当中可以看到。
在这里插入图片描述
最后就是生产者了,这里我们将消息发哦是那个给到info这个key值,也就是发给第一个消费者,查看效果。

    public static final String EXCHANGE = "direct";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();

        Scanner sc = new Scanner(System.in);
        while (sc.hasNext()) {
            // 消息内容
            String message = sc.next();
            channel.basicPublish(EXCHANGE, "info", null, message.getBytes());
            System.out.println("发送成功");
        }
    }

1.6、主题(Topic)

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”,“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,其中有两个替换符是需要注意的:

  • *(星号)可以代替一个单词
  • #(井号)可以替代零个或多个单词

首先我们在代码编写之前给定一些匹配规则,如下图所示,(这是代码生成后的交换机信息,这里只是为了理解就先把截图放在前面了)
在这里插入图片描述
之后我们使用代码进行演示,

    public static final String EXCHANGE = "topic";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE, "topic");
        // 声明队列
        channel.queueDeclare("T1", true, false, false, null);
        // 进行绑定
        channel.queueBind("T1", EXCHANGE, "*.error");
        channel.queueBind("T1", EXCHANGE, "*.info.*");
        System.out.println("T1 等待接收");

        // 接收
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
            System.out.println("绑定值:" + message.getEnvelope().getRoutingKey());
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费中断");
        };
        channel.basicConsume("T1", true, deliverCallback, cancelCallback);
    }

而对于另外一个消费者我们只需要对topic主题匹配规则进行更改一下即可,然后我们在生产者当中加上多条消息进行发送,进行查看那个消费者会接收到消息进行消费。

    public static final String EXCHANGE = "topic";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();

        Map<String, String> map = new HashMap();
        map.put("info", "info");
        map.put("error", "error");
        map.put("error.info", "error.info");
        map.put("info.error", "info.error");
        map.put("a", "a");
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            String message = entry.getValue();
            channel.basicPublish(EXCHANGE, key, null, message.getBytes());
            System.out.println("发送成功");
        }
    }

2、死信队列

2.1、死信是什么?

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

2.2、死信的来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

在这里插入图片描述
2.3、消息 TTL 过期

首先在这里进行演示说明:使用到的是两个消费者进行接收消息,其中一个进行对普通消息对死信消息的转发,另一个为接收死信队列的消息,首先是消费者1:

    // 普通交换机
    public static final String NORMAL_EXCHANGE = "normalExhange";
    // 死信交换机
    public static final String DEAD_EXCHANGE = "deadExchange";
    // 普通队列
    public static final String NORMAL_QUEUE = "normalQueue";
    // 死信队列
    public static final String DEAD_QUEUE = "deadQueue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // 声明交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");
        channel.exchangeDeclare(DEAD_EXCHANGE, "direct");
        // 声明队列
        Map<String, Object> map = new HashMap<>();
        // 过期时间
        // map.put("x-message-ttl",100000);
        // 指定对应的死信交换机
        map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置指定的死信key
        map.put("x-dead-letter-routing-key", "dead");
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, map);
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        // 进行交换机和队列的绑定
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");
        // 接收
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费中断");
        };
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);
    }

之后新加一个生成者进行发送消息给到普通交换机,并且设置消息延时。

    public static final String NORMAL_EXCHANGE = "normalExhange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // 发一个死信消息
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 0; i < 10; i++) {
            String message = i + "";
            channel.basicPublish(NORMAL_EXCHANGE, "normal", properties, message.getBytes());
        }
        System.out.println("发送成功");
    }

之后run一下这个消费者可以在web端上面看到对应的交换机和队列就都创建好了,之后我们停止这个消费者,用来模拟消费者宕机,之后再run生成者,在这里我们可以看到由于消费者的服务停掉了,这时的普通交换机就会把消息转发给到死信队列,再可以添加一个消费者对死信队列的消息进行接收:

    // 死信队列
    public static final String DEAD_QUEUE = "deadQueue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // 接收
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费中断");
        };
        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
    }

2.4、队列达到最大长度

首先我们在消费者1当中需要给map添加一个最大长度的限制,

	map.put("x-max-length",6);

并且在生产者当中不需要给定设置延时的设置,之后同样的先启动消费者1,生成队列,之后停掉消费者1,再启动生产者进行发送消息,可以看到,在普通队列当中只能够存6条消息,多出来的4条就被存在死信队列。
在这里插入图片描述
2.5、消息被拒

我们在消费者当中给添加一个消息拒绝,这里拒绝消息为5的这一条消息。

在这里插入图片描述
在这里插入图片描述
之后我们都run起来,在死信队列当中进行查看死信队列的消息,可以看到有一条消息,也就是被拒的5。
在这里插入图片描述
3、延迟队列

3.1、延迟队列是什么?

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

3.2、RabbitMQ和SpringBoot的整合

其中SpringBoot可参考:SpringBoot详解
Swagger可参考:Swagger2详解
在这里插入图片描述
我们在Springboot当中对RabbitMQ进行集成,首先导入相关依赖;

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ 测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:演示代码架构图:
在这里插入图片描述
首先添加配置类:

package com.lzq.rabbit.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlQueueConfig {
    // 普通交换机
    public static final String X_EXCHANGE = "xExchange";
    // 死信交换机
    public static final String Y_DEAD_EXCHANGE = "yExchange";
    // 普通队列
    public static final String QUEUE_A = "Q_A";
    public static final String QUEUE_B = "Q_B";
    // 死信队列
    public static final String DEAD_QUEUE_C = "Q_C";

    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_EXCHANGE);
    }

    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YC");
        //声明队列的 TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }

    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YC");
        //声明队列的 TTL
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    @Bean("queueC")
    public Queue queueC() {
        return QueueBuilder.durable(DEAD_QUEUE_C).build();
    }

    // 进行绑定
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    @Bean
    public Binding queueCBindingY(@Qualifier("queueC") Queue queueC, @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueC).to(yExchange).with("YC");
    }
}

之后添加一个controller用来发送请求进行模拟发送消息:

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send/{message}")
    public void sendMsg(@PathVariable String message){
        System.out.println(message+""+new Date().toString());
        // 发送消息
        rabbitTemplate.convertAndSend("xExchange","XA","消息来自10s :"+message);
        rabbitTemplate.convertAndSend("xExchange","XB","消息来自40s :"+message);
    }

最后添加一个消费者

    @RabbitListener(queues = "Q_C")
    public void receiveC(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        System.out.println("死信:" + msg + "" + new Date().toString());
    }

发送请求:http://localhost:8080/send/12345

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

新增一个队列QD和X Y交换机进行绑定,新增代码:

    public static final String QUEUE_D = "Q_D";
    
    @Bean("queueD")
    public Queue queueD() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        return QueueBuilder.durable(QUEUE_D).withArguments(args).build();
    }
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueD).to(xExchange).with("XC");
    }

添加一个接口进行测试:

    @GetMapping("/sendLetter/{message}/{ttlTime}")
    public void sendLetterMsg(@PathVariable String message,@PathVariable String ttlTime){
        // 发送消息
        rabbitTemplate.convertAndSend("xExchange","XC","消息来自10s :"+message+", 时长:" + ttlTime,msg->{
            // 设置时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
        log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);
    }

4、发布确认——高级篇

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

4.1、发布确认 springboot 版本

这里使用简单的发布确认,首先添加配置类:

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE = "confirmExchange";
    public static final String CONFRIM_QUEUE = "confirmQueue";
    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFRIM_QUEUE).build();
    }
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}

新开一个接口请求:

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/confirm/{message}")
    public void confirm(@PathVariable String message){
        rabbitTemplate.convertAndSend("confirmExchange","key1","消息来自:"+message);
    }

加上一个消费者:

@Slf4j
@Component
public class ConfirmConsum {
    @RabbitListener(queues = ConfirmConfig.CONFRIM_QUEUE)
    public void receive(Message message, Channel channel) throws Exception {
        log.info("当前时间:{},收到信息{}", new Date().toString(), message);
    }
}

添加一个回调接口:并且在配置文件当中需要添加:

spring.rabbitmq.publisher-confirm-type=correlated

之后编写接口

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换机不管是否收到消息的一个回调方法
     * 消息相关数据
     * ack 交换机,是否收到消息
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }
}

进行发送消息,当修改掉控制器当中的交换机的名称,进行发送消息,交换机不存在,也就导致了消息的丢失。
在这里插入图片描述
4.2、消息回退

Mandatory 参数
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

在回调接口当中重写returnedMessage方法,

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setMandatory(true);
        //设置回退消息交给谁处理
        rabbitTemplate.setReturnsCallback(this);

    }

    /**
     * 交换机不管是否收到消息的一个回调方法
     * 消息相关数据
     * ack 交换机,是否收到消息
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println(returnedMessage.getMessage());
        log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",
                returnedMessage.getMessage().getBody(), returnedMessage.getExchange(),
                returnedMessage.getReplyText(), returnedMessage.getRoutingKey());
    }
}

在发送消息的接口当中修改掉routingkey的值,模拟交换机发送消息到不了队列,运行代码查看结果:
在这里插入图片描述
4.3、备份交换机

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。

在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

5、RabbitMQ——幂等性、优先级、惰性

5.1、幂等性

幂等性是什么?

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等。

消息重复消费

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

解决思路

MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。

消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作

  • 唯一 ID+指纹码机制,利用数据库主键去重
  • 利用 redis 的原子性去实现

唯一 ID+指纹码机制

指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

Redis 原子性

利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费

5.2、优先级队列

使用场景

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。

添加优先级

在代码当中的消费者当中可以进行指定,也就是通过map进行设置:

Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);

实战

添加消费者:

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();

        //设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
        Map<String, Object> params = new HashMap();
        params.put("x-max-priority", 10);
        channel.queueDeclare(QUEUE_NAME, true, false, false, params);

        //推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println(message);
        };
        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断");
        };

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }

添加生产者:

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();

        //给消息赋予一个 priority 属性
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();

        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            if (i == 5) {
                channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
            } else {
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            }
            System.out.println("发送消息完成:" + message);
        }
    }

进行测试,在生产者发送所有消息之后再运行消费者进行消费消息,可以看到5的消息设置了优先级,也就是说5会被先消费。

5.3、惰性队列

使用场景

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。

两种模式

队列具备两种模式:default 和 lazy。默认的为default 模式,在3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。 如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。

在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

内存开销对比

在这里插入图片描述
在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅 占用 1.5MB,这是因为惰性队列的数据存在了磁盘上了。

6、RabbitMQ集群搭建

在前面已经有了一台虚拟机是可以进行使用的了,我们只需要对这台虚拟机进行克隆出来两个作为节点虚拟机即可。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-09 10:18:18  更:2021-08-09 10:20:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/23 6:20:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码