IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ如何确保消息不丢失 -> 正文阅读

[大数据]RabbitMQ如何确保消息不丢失

消息丢失分两部分:

1.生产者发送消息到队列时,因外部原因如MQ突然宕机,导致新进来的数据还未保存到本地磁盘

2.消费者消费消息失败,例如多个消费者,其中某个消费者在处理消息时突然宕机,导致该消息未成功消费等。

解决这两种问题的方法:

防止生产者发送到队列时消息丢失,可采用发布确认模式

1.单个消息确认,能够保证消息不丢失,但吞吐量会下降

//批量发送消息的个数
public static final int MESSAGE_COUNT = 1000;

public static void publishMessageIndividually() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //队列的声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName,true,false,false,null);
    //开启发布确认
    channel.confirmSelect();
    //开始时间
    Long begin = System.currentTimeMillis();
    //批量发布消息
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i+ "";
        channel.basicPublish("",queueName,null,message.getBytes());
        //单个消息马上进行确认
        boolean flag = channel.waitForConfirms();//等待确认返回值
        if(flag){
            System.out.println("消息发送成功");
        }
    }
    //结束时间
    Long end = System.currentTimeMillis();
    System.out.println("发布"+ MESSAGE_COUNT +"单个确认消息,耗时:"+(end - begin));
}

2.批量消息确认,不能保证消息不丢失,但吞吐量相比较单个确认要高

//批量发送消息的个数
public static final int MESSAGE_COUNT = 1000;

public static void publicMessageBatch() throws Exception{
    Channel channel = RabbitMqUtils.getChannel();
    //声明队列
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName,true,false,false,null);
    //开启发布确认
    channel.confirmSelect();
    //开始时间
    Long begin = System.currentTimeMillis();
    //批量确认消息大小
    int batch = 100;
    //批量发布消息 批量消息确认
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i+ "";
        channel.basicPublish("",queueName,null,message.getBytes());

        //判断达到100调消息的时候,批量确认与粗
        if(i%100 == 0){
            //发布确认
            channel.waitForConfirms();
            System.out.println("消息发送成功");
        }
    }
    //结束时间
    Long end = System.currentTimeMillis();
    System.out.println("发布"+ MESSAGE_COUNT +"批量发布确认消息,耗时:"+(end - begin));
}

3.异步消息确认,能够保证消息不丢失,吞吐量也高,但相对的复杂度比较高

//批量发送消息的个数
public static final int MESSAGE_COUNT = 1000;

public static void publishMessageAsync() throws  Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //队列声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName,true,false,false,null);
    //开启发布确认
    channel.confirmSelect();

    //并发队列存储所有要发送的消息
    /**
     *  线程安全有序的一个哈希表  适用于高并发的情况下
     *  1.轻松的将序号于消息进行关联
     *  2.轻松的批量删除条目  只要给到序号
     *  3.支持高并发
     */
    ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();

    //开始时间
    Long begin = System.currentTimeMillis();

    //消息确认成功 回调函数式
    ConfirmCallback ackCallBack = (deliveryTag,multiple) -> {
        //2.删除掉已经确认的消息 剩下的就是未确认的消息
        if(multiple){//判断是否为批量确认
            ConcurrentNavigableMap<Long,String> confirmed = outstandingConfirms.headMap(deliveryTag);
            confirmed.clear();
        }else {//单个确认
            outstandingConfirms.remove(deliveryTag);
        }
        System.out.println("已确认的消息" + deliveryTag);
    };
    //消息确认失败 回调函数

    /**
     * 参数:
     *  1.消息的标记
     *  2.是否为批量确认
     */
    ConfirmCallback nackCallBack = (deliveryTag,multiple) -> {
        //3.大一一下未确认的消息
        String message = outstandingConfirms.get(deliveryTag);
        System.out.println("未确认的消息:" + message + "未确认消息序号" + deliveryTag);
    };
    //准备消息监听器 监听哪些消息成功,那些失败
    /**
     * 参数说明:
     *  1.监听哪些消息成功
     *  2.监听哪些消息失败
     */
    channel.addConfirmListener(ackCallBack,nackCallBack);//确认监听器,异步通知

    //批量发布消息
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = "消息"+i;
        channel.basicPublish("",queueName,null,message.getBytes("UTF-8"));
        //1.此处记录下所有要发送的消息
        outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
    }

    //结束时间
    Long end = System.currentTimeMillis();
    System.out.println("发布"+ MESSAGE_COUNT +"异步发布确认消息,耗时:"+(end - begin));
}

防止消息在消费时丢失,可采用消息手动应答。消息在手动应答时是不丢失的,会重新放回到队列中重新消费

//消息再手动应答时是不丢失,放回到队列中重新消费
public class Consumer1 {
    //队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接收消息处理时间较短");

        //消息回调接口
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            //沉睡1秒
            SleepUtils.sleep(1);
            System.out.println("接收到的消息:" + new String(message.getBody(),"UTF-8"));
            //手动应答代码
            /** 参数:
             * 1.消息的标记
             * 2.是否批量应答:false 不批量应答信道中的消息  true:批量应答
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        //接收消息采用手动应答
        boolean autoAck = false;
        //设置消息分发类型:0轮询分发  1不公平分发
        //int prefetchCount = 1;
        //设置预取值:当prefetchCount > 1的时候,为预取值
        int prefetchCount = 2;
        channel.basicQos(prefetchCount);
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag ->{
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        }));
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-13 17:31:50  更:2021-07-13 17:33:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/6 9:21:17-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码