IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> rabbitMQ 消息队列 -> 正文阅读

[大数据]rabbitMQ 消息队列

为什么要用MQ

MQ有以下几个功能
1.流量消峰
··对流量进行限制,比如系统最多能容纳一万访问,但是到了促销时间,突然来了两万订单系统当然处理不了。但是可以使用消息队列做缓冲,把一秒钟的访问分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单操作。
2.应用解耦
··如果一个消费系统直接调用订单系统,库存系统,会员系统数据库,当其中有一个系统报错整个操作就会中断。如果使用消息队列,消费系统就先调用消息队列,订单系统,库存系统,会员系统的调用操作就由消息队列进行完成,当其中一个系统调用失败,消息队列就会再次调用直到成功。
3.异步处理
··a调用b,a不需要等待b什么时候完成,a可以做自己的事情,b完成之后给MQ的msg发送信息,再由MQ的msg发送给A处理信息。

MQ的分类

ActiveMQ
··优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据
··缺点:官方对ActiveMQ的维护越来越少,高吞吐量场景较小使用

Kafka
··优点:性能卓越,单机写入TPS在百万条每秒,最大的优点,就是吞吐量高,时效性ms级可用性非常高,kafka是分布式的,一条数据多个副本,少数机器宕机,不会丢失数据不会导致不可用,消费者采用Pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅仅被消费一次,有优秀的第三方,kafka web管理界面,kafka-Manager ;在日志领域比较成熟,被多家公司和多个开源项目使用,功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用
··缺点:kafka单机超过64个队列分区,Load会发生明显的飙升现象,队列越多,load越高,发送消息响应时间长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试,支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新慢。

RocketMQ
··优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到0丢失,MQ功能较为完善,还是分布式的,扩展性好,支持10亿级别的消息堆积,不会因为消息堆积导致性能下降,源码是java我们可以自己阅读,定制自己公司的MQ
··缺点:支持的客户端语言不多,目前是java以及c++,其中c++不成熟,社区活跃度一般,没有在MQ核心中去实现JMS等接口,有些系统要迁移需要修改大量的代码。

RabbitMQ
··优点:由于erlang语言的高并发特性,性能较好,吞吐量到万级,MQ功能比较完备,健壮稳定易用,跨平台,支持多种语言,如:Python,Ruby,.NET,Java等,支持ajax,文档齐全,开源提供的管理界面非常棒,用起来很好用,社区活跃度高。
··缺点:商业版需要收费,学习成本较高

RabbitMQ四大核心概念

生产者:
··生产数据发送消息的程序是生产者
交换机:
··交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面他将消息推送到队列中,交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定的队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。
队列:
··队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但他们只能存储在队列中,队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区,许多生产者可以发送到一个队列,许多消费者可以尝试从一个队列接收数据,这就是我们使用队列的方式。
消费者:
··消费者与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序,请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者,又可以是消费者。

在这里插入图片描述

安装erlang rabbitMQ 打开 web插件 添加用户

点击这里

Work Queues (工作队列)

工作队列的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程的时候,这些工作线程将一起处理这些任务。
在这里插入图片描述

消息应答
为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制,消息应答就是:消费者收到信息并处理该信息之后,告诉rabbitmq它已经处理了,rabbitmq可以把消息删除了。
消息应答有两种:自动应答,手动应答

··自动应答:消息发送成功立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

··手动应答:手动应答有三个方法
1.Channel.basicAck(用于肯定确认)

2.Channel.basicNack(用于否定确认)

3.Channel.basicReject(用于否定确认)
与Channel.basicNack相比少一个参数:Multiple

Multiple(是否批量处理)
Multiple的true和false代表不同意思

true:代表批量应答channel上未应答的消息,比如channel上有传送的tag的消息,5,6,7,8,当前tag是8那么此时5-8的这些还未应答的消息都会被确认收到消息应答

false:同false相比,只会应答tag=8的消息5,6,7这三个消息依然不会被确认收到消息应答。

消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或者TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将其重新排队,如果此时其他消费者可以处理,它将很快将其分发给拎一个消费者,这样,即使某个消费者偶尔死亡,也可以确认不会丢失任何消息。
在这里插入图片描述
RabbitMQ持久化
默认情况下RabbitMQ退出或者由于某种原因崩溃时,它忽视队列和消息,除非告知他不要这样做,确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。
在这里插入图片描述
消息持久化
在这里插入图片描述
不公平分发
最开始我们学习到RabbitMQ分发消息采用的轮询分发,但是在某种场合下这种策略并不是很好,比方说两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者2处理速度很慢,很慢,这个时候我们还是采用轮询分发的话就会让处理速度快的消费者很大一部分时间处于空闲,而处理状态慢的消费者一直在干活,这种分配方式在这种情况下其实不太好,但是RabbitMQ并不知道这种情况它依然很公平的分发。
在这里插入图片描述
预取值
手动设定消息队列分发给消费者的比例,如:消费者1设定是5,消费者2设定是2,如果有七个消息要从消息队列中发送,消费者1就能获取5个,消费者2就能获取2个
在这里插入图片描述
发布确认原理
首先发布确认需要有两个条件,要求队列必须持久化,要求消息必须持久化,有以上两个条件消息也不一定保证不丢失,因为发布消息到消息队列中还没到磁盘上就有可能宕机,所以我们还需要消息确认,生产者发送消息到消息队列,队列确认将消息保存到磁盘上之后告诉消息生产者消息发送成功才算完成操作。

/**
 * 发布确认模式
 *  1.单个确认模式
 *  2.批量确认模式
 *  3.异步批量确认模式
 *
 *  使用的时间比较哪种方式是最好的
 */
public class ConfirmMessage {

    //批量发消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        //单个确认发布 :发一条就确认一条 缺点:发布速度特别慢,需要等待第一条发布确认才发布第二条
//        ConfirmMessage.publishMessageIndividually();//发布1000个单独确认消息,耗时11361ms

        //批量确认发布 :缺点:当发生故障导致发布出现问题时,不知道哪个消息出现问题,必须整批处理保存在内存中,整批重新发送
//        ConfirmMessage.publishMessageBatch();//发布1000个单独确认消息,耗时153ms

        //异步确认发布
        ConfirmMessage.publishMessageAsync();//发布1000异步确认消息,耗时20ms
    }

    //单个确认
    public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMqUtils.getChannel();
        //队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            channel.basicPublish("",queueName,null,(i+"").getBytes());
            //单个消息马上进行发布确认
            boolean b = channel.waitForConfirms();
            if (b){
                System.out.println("消息发送成功");
            }
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时"+(end - begin)+"ms");
    }

    //批量发布确认
    public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMqUtils.getChannel();
        //队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        //批量确认消息大小
        int batchSize = 100;

        //批量发送消息  批量发布确认
        for (int i = 1; i <= MESSAGE_COUNT; i++) {
            channel.basicPublish("",queueName,null,(i+"").getBytes());

            //判断达到100条时候批量确认一次
            if (i%batchSize == 0){
                System.out.println("第"+i+"条");
                channel.waitForConfirms();
            }

        }



        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时"+(end - begin)+"ms");

    }

    //异步发布确认
    public static void publishMessageAsync() throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();

        /**
         * 线程安全有序的哈希表,适用于高并发的情况下
         * 1.轻松的将序号与消息关联,
         * 2.轻松的批量删除条目 通过序号删除消息
         * 3.支持高并发
         *
         */
        ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();



        //消息确认成功回调函数
        /**
         * 1.消息的标记
         * 2.是否为标记确认
         */
        ConfirmCallback ackCallback = ( deliveryTag,  multiple) ->{
            //删除掉已经确认的消息 剩下的就是未确认的消息
            System.out.println("确认的消息"+deliveryTag);

            if (multiple){ //批量确认
                ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap =
                        outstandingConfirms.headMap(deliveryTag);
                longStringConcurrentNavigableMap.clear();
            }else { //单个确认
                outstandingConfirms.remove(deliveryTag);
            }

        };
        //消息确认失败回调函数
        /**
         * 1.消息的标记
         * 2.是否为标记确认
         */
        ConfirmCallback nackCallback = ( deliveryTag,  multiple) ->{
            //打印一下未确认的有哪些
            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息"+deliveryTag+"消息内容是:"+message);
        };
        /**
         * 消息监听器 监听哪些消息成功,哪些消息失败
         * 1.监听成功的消息
         * 2.监听失败的消息
         */
        channel.addConfirmListener(ackCallback,nackCallback);

        //开始时间
        long begin = System.currentTimeMillis();



        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = ""+i;
            channel.basicPublish("",queueName,null,message.getBytes());
            //记录下所有要发送的消息
            outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
        }



        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"异步确认消息,耗时"+(end - begin)+"ms");
    }
}

交换机
此前消息生产者发布消息只能由一个消息队列分发到一个消费者消费一次,但是如果我们需要一个消息能够被消费多次,就需要用到我们的交换机。运行步骤:消息生产者发布消息到RabbitMQ的交换机,由交换机去绑定多个消息队列,一个消息队列发布给一个消费者,这样就可以一个消息多次消费。
交换机类型:
··直接(direct)
··主题(topic)
··标题(headers)
··扇出(fanout)
Fanout 发布订阅模式
它是将接收到的所有消息广播到它知道的所有队列中

消息生产者

/**
 * 发消息 交换机
 */
public class EmitLog {

    //交换机的名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
            System.out.println("生产者发出消息:"+message);
        }
    }
}

消费者1

public class ReceiveLogs01 {

    //交换机名称
    public static final String exhange_name = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机
        channel.exchangeDeclare(exhange_name,"fanout");
        //声明一个队列 临时队列
        /**
         * 生产一个临时的队列,队列名称随机
         * 当消费者断开与队列的连接的时候,队列就自动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        /**
         * 绑定交换机与队列
         */
        channel.queueBind(queueName,exhange_name,"");
        System.out.println("等待接收消息,把接收到的消息打印在屏幕上");

        //接收消息
        DeliverCallback deliverCallback = ( consumerTag, message)->{
            System.out.println("控制台打印接收到的消息:"+new String(message.getBody()));
        };
        //消费者取消消息时回调的接口
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});

    }

}

消费者2

public class ReceiveLogs02 {

    //交换机名称
    public static final String exhange_name = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机
        channel.exchangeDeclare(exhange_name,"fanout");
        //声明一个队列 临时队列
        /**
         * 生产一个临时的队列,队列名称随机
         * 当消费者断开与队列的连接的时候,队列就自动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        /**
         * 绑定交换机与队列
         */
        channel.queueBind(queueName,exhange_name,"");
        System.out.println("等待接收消息,把接收到的消息打印在屏幕上");

        //接收消息
        DeliverCallback deliverCallback = ( consumerTag, message)->{
            System.out.println("控制台打印接收到的消息:"+new String(message.getBody()));
        };
        //消费者取消消息时回调的接口
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});

    }

}

直接交换机(Direct exchange)
上面我们使用的扇出交换机,相当于广播模式,讲消息广播给所有消费者,而这种交换机并不能给我们带来很大的灵活性,他只能无意识的进行广播。这里我们使用直接交换机来进行替换,这种类型的工作方式是,消息只到它绑定的routingKey队列仲去。

public class ReceiveLogsDirect01 {
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机 直接交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明一个队列
        channel.queueDeclare("console",false,false,false,null);
        //绑定队列
        channel.queueBind("console",EXCHANGE_NAME,"info");
        channel.queueBind("console",EXCHANGE_NAME,"warning");
        //接收消息
        DeliverCallback deliverCallback = (consumerTag ,message)->{
            System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:"+new String(message.getBody()));
        };
        channel.basicConsume("console",true,deliverCallback,consumerTag->{});
    }

}
public class ReceiveLogsDirect02 {
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机 直接交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明一个队列
        channel.queueDeclare("disk",false,false,false,null);
        //绑定队列
        channel.queueBind("disk",EXCHANGE_NAME,"error");
        //接收消息
        DeliverCallback deliverCallback = (consumerTag ,message)->{
            System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:"+new String(message.getBody()));
        };
        channel.basicConsume("disk",true,deliverCallback,consumerTag->{});
    }

}

消息生产方直接找到direct_logs交换机下的对应RoutingKey,这里与队列名称没有关系

public class DirectLogs {

    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();

        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes());
            System.out.println("生产者发出消息:"+message);
        }
    }
}

主题交换机(Topic)
尽管使用direct交换机改进了我们的系统,但是它仍然存在局限性,比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想info.base的消息,那这个时候direct就办不到了,这个时候只能使用topic类型

要求:
发送类型是topic交换机的消息routing_key不能随便写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:stock.sud.nyse , nyse.vmw 这种类型的,当然这个单词列表最多不能超过255个字节;
*(星号)可以替代一个单词
#(井号)可以替代零个或多个单词

topic可以替代扇出交换机和直接交换机

/**
 * 生产者
 */
public class EmitLogTopic {
    //交换机的名称
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        HashMap<String, String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit", "队列被Q1Q2接收到");
        bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
        for (Map.Entry<String, String> stringStringEntry : bindingKeyMap.entrySet()) {
            String key = stringStringEntry.getKey();
            String value = stringStringEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,key,null,value.getBytes("UTF-8"));
            System.out.println("生产者发出消息的内容:"+value);
        }


    }
}
/**
 * 声明主题交换机以及相关队列
 */
public class ReceiveLogsTopic01 {

    //交换机的名称
    public static final String EXCHANGE_NAME ="topic_logs";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //声明一个队列
        String queueName = "Q1";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
        System.out.println("等待接收消息");
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println(new String(message.getBody()));
            System.out.println("接收队列:"+queueName+"绑定键:"+message.getEnvelope().getRoutingKey());
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
    }

}
/**
 * 声明主题交换机以及相关队列
 */
public class ReceiveLogsTopic02 {

    //交换机的名称
    public static final String EXCHANGE_NAME ="topic_logs";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //声明一个队列
        String queueName = "Q2";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");

        System.out.println("等待接收消息");
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println(new String(message.getBody()));
            System.out.println("接收队列:"+queueName+"绑定键:"+message.getEnvelope().getRoutingKey());
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
    }

}

死信队列
某些时候由于特殊原因导致queue中的某些信息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要用到RabbitMQ的死信队列机制,当消息消费发生异常的时候,将消息投入死信队列中,还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
在这里插入图片描述
消息生产者

/**
 * 死信队列 生产者代码
 */
public class Producer {

    public static final String NORMAL_EXCHANGE_NAME= "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //死信消息 设置TTL时间 单位是ms
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 1; i < 11; i++) {
            String message = "info"+i;
            channel.basicPublish(NORMAL_EXCHANGE_NAME,"zhangsan",null,message.getBytes());
        }
    }

}

消息消费者1 声明死信队列 死信交换机,普通队列 普通交换机

/**
 * 死信队列
 * 消费者1
 */
public class Consumer01 {

    //普通交换机
    public static final String NORMAL_EXCHANGE_NAME= "normal_exchange";
    //死信交换机
    public static final String DEAD_EXCHANGE_NAME = "dead_exchange";
    //普通队列名称
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();

        //声明死信和普通的交换机类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //声明普通队列
        HashMap<String, Object> arguments = new HashMap<>();
        //过期时间
//        arguments.put("x-message-ttl",10000);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
        //设置队列长度
//        arguments.put("x-max-length",6);
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //绑定普通的交换机与普通队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE_NAME,"zhangsan");
        //绑定死信交换机与死信队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"lisi");
        System.out.println("等待接收消息。。。");

        DeliverCallback deliverCallback = (consumerTag,message)->{
            String msg = new String(message.getBody());
            //拒接info5
            if (msg.equals("info5")){
                System.out.println("拒绝的消息是:"+msg);
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            }else {
                System.out.println("consumer01接收的消息是"+new String(message.getBody(),"UTF-8"));
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }

        };
        //开启手动应答才能拒绝
        channel.basicConsume(NORMAL_QUEUE,false, deliverCallback,consumerTag->{});
    }


}

消费者2消费死信队列消息

/**
 * 死信队列
 * 消费者1
 */
public class Consumer02 {

    //死信队列名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();


        DeliverCallback deliverCallback = (consumerTag,message)->{

            System.out.println("consumer02接收的消息是"+new String(message.getBody(),"UTF-8"));

        };
        channel.basicConsume(DEAD_QUEUE,true, deliverCallback,consumerTag->{});
    }


}

延迟队列
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中国的元素是希望在指定时间到了以后或者之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
使用场景例如:
1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内没有上传过商品,则发生消息提醒
3.用户注册成功后,如果三天内没有登陆则进行短信提醒
4.用户发起退款,如果三天之内没有得到处理则通知相关运营人员
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

队列TTL配置
在这里插入图片描述
这里开始用的是springBoot绑定设置两个交换机三个队列

/**
 * TTL 配置文件类代码
 */
@Configuration
public class TtlQuquqConfig {

    //普通交换机名称
    public static final String X_EXCHANGE = "X";
    //死信交换机名称
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通队列名称
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信队列名称
    public static final String DEAD_LETTER_QUEUE = "QD";

    //声明xEXCHANGE
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    //声明yExchagne 别名
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //声明队列
    @Bean("queueA")
    public Queue queueA(){
        HashMap<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-rounting-key","YD");
        //设置ttl 单位ms
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }

    //声明队列
    @Bean("queueB")
    public Queue queueB(){
        HashMap<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-rounting-key","YD");
        //设置ttl 单位ms
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }

    //死信队列
    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    //绑定
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //绑定
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    //绑定
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

注意:使用死信队列做延迟有一个缺陷,就是死信队列是需要排队的,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个不会优点得到执行。

RabbitMQ插件实现延迟队列
在官网上下载 https://www.rabbitmq.com/community-plugins.html,下载
rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。
进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

发布确认springboot版本
在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是这样比较极端的情况,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢?
在这里插入图片描述
当我们消息生产者发现消息给交换机失败,应该要吧消息放到缓存中使用定时任务再次发送。
在这里插入图片描述
配置文件


##交换机确认回调模式
spring.rabbitmq.publisher-confirm-type=correlated
#消息回退 如果消息发布不出去 就会把消息回退给生产者
spring.rabbitmq.publisher-returns=true

配置类

/**
 * 配置类 发布确认 (高级)
 */
@Configuration
public class ConfirmConfig {
    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //队列
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    //RoutingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";

    //声明交换机
    @Bean
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    //声明队列
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    //绑定
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange,@Qualifier("confirmQueue") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with(CONFIRM_ROUTING_KEY);
    }
}

callBack类

/**
 * 回调接口
 */
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnsCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //注入
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 交换机确认回调方法
     * 1.发消息 交换机收到了回调、
     *  1.1 correlationData 保存回调消息的ID以及相关信息、
     *  1.2 ack 交换机收到信息 true
     *  1.3 cause 失败原因 null
     *
     * 2.发消息 交换机失败了 回调
     *  2.1 correlationData 保存回调消息的ID以及相关信息、
     *  2.2 ack 交换机收到信息 false
     *  2.3 cause 失败原因 null
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id =correlationData != null ? correlationData.getId() : "";
        if (ack){
            log.info("交换机已经收到ID为:{}的消息",id);
        }else {
            log.info("交换机还未收到ID为:{}的消息,原因:{}",id,cause);
        }
    }

    //可以在当消息传递过程中不可达到目的时将消息返回给生产者
    //只有不可达目的的时候 才进行回退
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息{}被交换机{}退回,退回原因:{},路由Key:{}",
                new String(returnedMessage.getMessage().getBody()),returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
    }
}

消息生产者

/**
 * 开始发消息 测试发布确认
 */
@RestController
@Slf4j
@RequestMapping("/confirm")
public class ProducerController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
        log.info("发送消息内容:{}",message);

        //下面试试routingkey出错,交换机还是显示成功收到消息,但是没能去到队列中
         correlationData = new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY+"2",message,correlationData);
        log.info("发送消息内容:{}",message);
    }

}

消息消费者

/**
 * 接收消息
 */
@Component
@Slf4j
public class Consumer {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfirmMessage(Message message){
        log.info("接收到的队列confirm.queue消息:"+new String(message.getBody()));
    }

}

备份交换机
在这里插入图片描述

/**
 * 配置类 发布确认 (高级)
 */
@Configuration
public class ConfirmConfig {
    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //队列
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    //RoutingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";

    //备份交换机
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
    //备份队列
    public static final String BACKUP_QUEUE_NAME = "backup_queue";
    //报警队列
    public static final String WARNING_QUEUE_NAME = "warning_queue";

    //声明交换机
    @Bean
    public DirectExchange confirmExchange() {
        //备份交换机
      return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();

//        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    //声明队列
    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    //绑定
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange, @Qualifier("confirmQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(directExchange).with(CONFIRM_ROUTING_KEY);
    }

    //声明备份交换机
    @Bean
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    //声明备份队列
    @Bean
    public Queue backupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    //声明报警队列
    @Bean
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    //绑定备份队列和备份交换机
    @Bean
    public Binding backupQueueBindingBackupExchange(@Qualifier("backupExchange") FanoutExchange fanoutExchange, @Qualifier("backupQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    //绑定报警队列到备份交换机
    @Bean
    public Binding warrningQueueBindingBackupExchange(@Qualifier("backupExchange") FanoutExchange fanoutExchange, @Qualifier("warningQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

}

幂等性
用户对于同一操作发现的一次请求或多次请求的结果是一致的,不会因为多次点击而产生副作用。举个最简单例子,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成两条,在以前的单应用系统中,我们只需要把数据放入事务中即可,但是响应客户端的时候也有可能出现网络中断或者异常等等。
优先级队列
在我们系统中有一个订单催付的场景,我们客户在天猫下订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内没有完成支付,那么就会给用户推送一条短信提醒。看似很简单一个功能,但是天猫商家对我们来说,肯定要区分大客户和小客户,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理所应当他们的订单优先得到处理。

在这里插入图片描述

如何添加
在这里插入图片描述
在这里插入图片描述
惰性队列
正常情况下:消息是保存到内存中的
惰性队列:消息是保存在磁盘中
在这里插入图片描述
搭建集群
在这里插入图片描述
镜像队列
当有多个队列进行集群的时候,其中一个服务队列存储着消息如果挂了,别的服务是不知道有这个消息。其他节点是不知道有这个队列消息。
在这里插入图片描述

高可用负载均衡
我们连接rabbitMQ是根据IP地址连接,如果在集群模式下,我们正在连接的ip地址的节点如果关闭,我们的服务是不知道的,依旧连接旧的IP地址节点。
在这里插入图片描述
这里解决方案可以使用Haproxy实现负载均衡,使用Keepalived实现双机

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-15 23:47:47  更:2021-07-15 23:48:06 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/8 7:28:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码