IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMq的基本使用 -> 正文阅读

[Java知识库]RabbitMq的基本使用

什么是RabbitMq?

RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that the letter carrier will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office, and a letter carrier.

通俗的说RabbitMq就是用来接收和转发消息的,本身不对消息进行任何处理,只是接收和转发。

为什么要用RabbitMq?

1、解耦
在这里插入图片描述
这里有两个模块,用户注册完后短信通知用户注册成功,这里用户注册就依赖于短信通知,存在耦合,当短信通知出现异常时,用户注册也会影响使用,下面引入消息队列。
在这里插入图片描述
引入消息队列后用户注册于短信通知已经解耦了,此后用户注册完后发送消息队列后就可以不管了,剩下的就交给消息队列了。

2、异步
??还是上面的案例,本来用户注册完后需要等待短信通知调用完后才能给用户回馈,引入消息队列后用户注册和短信通知就可以异步执行,大大提高了响应速度。

3、流量削峰
??举个例子,某个特殊的日子,有个商家搞整点优惠活动,力度还挺大,听到这个消息,大家都跃跃欲试。到了时间点,大家打开活动网站,准备下单,发现网站崩了,原来是到了整点,大量的请求发给了服务器,服务器一时间难以接受,就挂了。怎么解决,MQ的流量削峰。
在这里插入图片描述

引入MQ
在这里插入图片描述

RabbitMq的组成

在这里插入图片描述

  • Producer 发送消息的一方
  • Exchange(交换机)Producer 将消息发送给交换机,交换机来实现消息的分发
  • Queue(队列) 存储消息的地方
  • Consumer 信息的消费者

RabbitMq分发策略

简单队列

在这里插入图片描述

生产者的实现:

public class Send {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()) {
            /**
             * @param queue 队列的名字
             * @param durable 队列持久化,下次重启队列依然存在
             * @param exclusive 排他性,队列是基于连接(Connection)可见的,是该Connection私有的,Connection.close()后队列删除
             * @param autoDelete 队列消息被消费后自动删除
             * @param arguments other properties (construction arguments) for the queue
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String message="Hello World";
             /**
             * @param exchange 交换机名字,未设定会存在一个默认交换机
             * @param routingKey 路由key,未指定交换机时是队列名字
             * @param props other properties for the message - routing headers etc
             * @param body 发送的消息
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

运行后:
在这里插入图片描述

消费者的实现:

public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        try{
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            /**
             * @param queue  队列名称
             * @param autoAck 是否自动应答
             * @param deliverCallback 收到消息后的回调函数
             * @param cancelCallback 取消订阅后的回调函数
             */
            channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    String info=new String(message.getBody(),"utf-8");
                    System.out.println(" [x] Received '" + info + "'");
                }
            }, consumerTag -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

运行后:
在这里插入图片描述

工作队列

在这里插入图片描述
消息生产者:

public class WorkSend {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection=null;
        Channel channel=null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //限制每个消费者一次只能消费一条信息
            channel.basicQos(1);
            channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
            String info="work task info--------";
            //生产十条待消费信息
            for(int i=0;i<10;i++){
                System.out.println(info.getBytes());
                channel.basicPublish("",TASK_QUEUE_NAME,null,(info+i).getBytes());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者(这里创建了两个消费者,代码都一样):

public class WorkRecv2 {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection=null;
        try {
            connection=connectionFactory.newConnection();
            final Channel channel=connection.createChannel();
            channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
            //这里关闭了自动应答,采用手动应答方式
            channel.basicConsume(TASK_QUEUE_NAME, false, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                  String info = new String(message.getBody(), "utf-8");
                    System.out.println(info);
                    //手动应答,第一个参数为消息序列号,第二个参数为是否批量应答
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                }
            }, new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {

                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

运行后(两个消费者每人每次消费一条信息,依次消费):
在这里插入图片描述
在这里插入图片描述

分发队列

生产者产生消息,只要订阅了该队列的都能收到消息通知
在这里插入图片描述

消息生产者:

public class FanoutExchange {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); 
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String message =  "info: Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

运行后:
在这里插入图片描述
消息消费者:

public class FanoutRecive {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /**
         * @param exchange 交换机名字
         * @param type 交换机的类型
         */
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //默认实现的queue,具有排他性和自动删除
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

运行后:
在这里插入图片描述
在这里插入图片描述

Routing

通过路由key绑定队列与交换机,交换机可以定点投放信息
在这里插入图片描述
消息生产者:

public class RouteSend {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            String message="发给blue的";
            channel.basicPublish(EXCHANGE_NAME, "blue", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent "+ message + "'");
        }
    }

}

运行后:
在这里插入图片描述
消息消费者(这里创建两个消费者,只需将绑定的路由key改下就行):

public class BlueRecv {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        /**
         * 队列通过key绑定交换机
         * @param queue 队列名字
         * @param exchange 交换机名字
         * @param routingKey 绑定的 routingKey
         */
        channel.queueBind(queueName, EXCHANGE_NAME, "blue");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

运行后:
在这里插入图片描述
在这里插入图片描述

Topics

在这里插入图片描述

  • * 能匹配一个
  • # 能匹配零个或多个

例如 lazy.# 能匹配 lazy.orange.elephantlazy.brown.fox ,而 lazy.*.fox 只能匹配lazy.brown.fox

消息生产者:

public class TopicSend {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            String message="info to lazy.#-----";
            channel.basicPublish(EXCHANGE_NAME, "lazy.red.bird", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent  " + message + "'");
        }
    }
}

在这里插入图片描述

消费者代码(创建了两个消费者,改一下routingKey就行,改为 lazy.*.bird):

public class TopicRecv {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        //这里的routingKey和之前的不同
            channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");

        System.out.println(" [*] Waiting for messages.");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

运行后(两个都收到了消息):
在这里插入图片描述
在这里插入图片描述

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-10-12 23:18:12  更:2021-10-12 23:19:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 21:18:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码