IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMQ——持久化、发布确认 -> 正文阅读

[Java知识库]RabbitMQ——持久化、发布确认

本来应该昨天更新的,但是使用浏览器访问服务器的RabbitMQ的web端口时chrome显示不是私密链接不让登录Edge也是相同问题,百度找了很多还是无法解决,原因是服务器没有安装SSL证书,直接使用ip访问。

1. 持久化

当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失。默认情况下RabbitMQ退出或者崩溃时,会忽视掉队列和消息。为了保证消息不丢失需要将队列和消息都标记为持久化。

1.1 实现持久化

  1. 队列持久化:在创建队列时将channel.queueDeclare();第二个参数改为true。
  2. 消息持久化:在使用信道发送消息时channel.basicPublish();将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN表示持久化消息。
/**
 * @Description 持久化MQ
 * @date 2022/3/7 9:14
 */
public class Producer3 {

    private static final String LONG_QUEUE = "long_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        // 持久化队列
        channel.queueDeclare(LONG_QUEUE,true,false,false,null);

        Scanner scanner = new Scanner(System.in);
        int i = 0;
        while (scanner.hasNext()){
            i++;
            String msg = scanner.next() + i;
            // 持久化消息
            channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送消息:'" + msg + "'成功");
        }
    }

}

但是存储消息还有存在一个缓存的间隔点,没有真正的写入磁盘,持久性保证不够强,但是对于简单队列而言也绰绰有余。

1.2 不公平分发

轮询分发的方式在消费者处理效率不同的情况下并不适用。所以真正的公平应该是遵循能者多劳的前提。

在消费者处修改channel.basicQos(1);表示开启不公平分发

/**
 * @Description 不公平分发消费者
 * @date 2022/3/7 9:27
 */
public class Consumer2 {
    private static final String LONG_QUEUE = "long_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 模拟并发沉睡三十秒
            try {
                Thread.sleep(30000);
                System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        // 设置不公平分发
        channel.basicQos(1);

        channel.basicConsume(LONG_QUEUE,false,deliverCallback,
                consumerTag -> {
                    System.out.println(consumerTag + "消费者取消消费");
                });
    }
}

1.3 测试不公平分发

测试目的:是否能实现能者多劳。
测试方法:两个消费者睡眠不同的事件来模拟处理事件不同,如果处理时间(睡眠时间)短的能够处理多个消息就代表目的达成。

先启动生产者创建队列,再分别启动两个消费者。

生产者按照顺序发四条消息:
在这里插入图片描述
睡眠时间短的线程A接收到了三条消息
在这里插入图片描述
而睡眠时间长的线程B只接收到的第二条消息:
在这里插入图片描述
因为线程B在处理消息时消耗的时间较长,所以就将其他消息分配给了线程A。
实验成功!

1.4 预取值

消息的发送和手动确认都是异步完成的,因此就存在一个未确认消息的缓冲区,开发人员希望能够限制缓冲区的大小,用来避免缓冲区里面无限制的未确认消息问题。

这里的预期值就值得是上述方法channel.basicQos();里面的参数,如果在当前信道上存在等于参数的消息就不会在安排当前信道进行消费消息。

1.4.1 代码测试

测试方法:

  1. 新建两个不同的消费者分别给定预期值5个2。
  2. 给睡眠时间长的指定为5,时间短的指定为2。
  3. 假如按照指定的预期值获取消息则表示测试成功,但并不是代表一定会按照5和2分配,这个类似于权重的判别。

代码根据上述代码修改预期值即可。

2. 发布确认

发布确认就是生产者发布消息到队列之后,队列确认进行持久化完毕再通知给生产者的过程。这样才能保证消息不会丢失。

需要注意的是需要开启队列持久化才能使用确认发布。
开启方法:channel.confirmSelect();

2.1 单个确认发布

是一种同步发布的方式,即发送完一个消息之后只有确认它确认发布后,后续的消息才会继续发布,在指定的时间内没有确认就会抛出异常。缺点就是特别慢。

/**
 * @Description 确认发布——单个确认
 * @date 2022/3/7 14:49
 */
public class SoloProducer {

    private static final int MESSAGE_COUNT = 100;

    private static final String QUEUE_NAME = "confirm_solo";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = ""+i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));

            // 单个发布确认
            boolean flag = channel.waitForConfirms();
            if (flag){
                System.out.println("发送消息:" + i);
            }
        }

        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");   }

}

2.2 批量确认发布

一批一批的确认发布可以提高系统的吞吐量。但是缺点是发生故障导致发布出现问题时,需要将整个批处理保存在内存中,后面再重新发布。

/**
 * @Description 确认发布——批量确认
 * @date 2022/3/7 14:49
 */
public class BatchProducer {

    private static final int MESSAGE_COUNT = 100;

    private static final String QUEUE_NAME = "confirm_batch";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 设置一个多少一批确认一次。
        int batchSize = MESSAGE_COUNT / 10;
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = ""+i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));

            // 批量发布确认
            if (i % batchSize == 0){
                if (channel.waitForConfirms()){
                    System.out.println("发送消息:" + i);
                }
            }

        }

        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
    }

}

显然效率要比单个确认发布的高很多。

2.3 异步确认发布

在编程上比上述两个要复杂,但是性价比很高,无论是可靠性还行效率的都好很多,利用回调函数来达到消息可靠性传递的。

/**
 * @Description 确认发布——异步确认
 * @date 2022/3/7 14:49
 */
public class AsyncProducer {

    private static final int MESSAGE_COUNT = 100;

    private static final String QUEUE_NAME = "confirm_async";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();

        // 记录开始时间
        long beginTime = System.currentTimeMillis();

        // 确认成功回调
        ConfirmCallback ackCallback = (deliveryTab,multiple) ->{
            System.out.println("确认成功消息:" + deliveryTab);
        };

        // 确认失败回调
        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
            System.out.println("未确认的消息:" + deliveryTab);
        };

        // 消息监听器
        /**
         * addConfirmListener:
         *                  1. 确认成功的消息;
         *                  2. 确认失败的消息。
         */
        channel.addConfirmListener(ackCallback,nackCallback);

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "" + i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));

        }


        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
    }

}

2.4 处理未确认的消息

最好的处理方式把未确认的消息放到一个基于内存的能被发布线程访问的队列。

例如:ConcurrentLinkedQueue可以在确认队列confirm callbacks与发布线程之间进行消息的传递。

处理方式:

  1. 记录要发送的全部消息;
  2. 在发布成功确认处删除;
  3. 打印未确认的消息。

使用一个哈希表存储消息,它的优点:

  1. 可以将需要和消息进行关联;
  2. 轻松批量删除条目;
  3. 支持高并发。
ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
/**
 * @Description 异步发布确认,处理未发布成功的消息
 * @date 2022/3/7 18:09
 */
public class AsyncProducerRemember {
    private static final int MESSAGE_COUNT = 100;

    private static final String QUEUE_NAME = "confirm_async_remember";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();

        // 线程安全有序的一个hash表,适用与高并发
        ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();

        // 记录开始时间
        long beginTime = System.currentTimeMillis();

        // 确认成功回调
        ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
            //2. 在发布成功确认处删除;
            // 批量删除
            if (multiple){
                ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
                confirmMap.clear();
            }else {
                // 单独删除
                map.remove(deliveryTab);
            }
            System.out.println("确认成功消息:" + deliveryTab);
        };

        // 确认失败回调
        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
            // 3. 打印未确认的消息。
            System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);
        };

        // 消息监听器
        /**
         * addConfirmListener:
         *                  1. 确认成功的消息;
         *                  2. 确认失败的消息。
         */
        channel.addConfirmListener(ackCallback,nackCallback);

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "" + i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 1. 记录要发送的全部消息;
            map.put(channel.getNextPublishSeqNo(),msg);
        }


        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

2.5 总结

显然来说,异步处理除了在编码处有些麻烦,在处理时间效率和可用性上都是比单处理和批处理好很多。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-08 22:14:58  更:2022-03-08 22:16:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 10:23:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码