IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMQ入门 -> 正文阅读

[Java知识库]RabbitMQ入门

RabbitMQ

对于中间件的学习,我们主要学习它的设计思想和使用方式,并且要学会在不同情况下,灵活使用中间件解决业务问题

不要过分纠结于中间件的底层实现

一、MQ 的优势

为什么需要使用消息队列?

我们先来看一下传统的远程调用和使用了消息队列后的区别:

区别

可以看到,使用消息队列之后,可以再调用者服务器之间形成一个缓冲,减少网络问题带来的影响

上面只是一个粗略的消息队列的优势,其优势还有下面这些:

1、任务解耦

不使用消息队列

使用消息队列

2、任务异步处理

异步处理

可能有读者不明白,这所谓的异步处理,是什么个实现原理,这里我要稍微解释一下

原本的订单系统,是要等库存、支付、物流这三个子系统真正的处理完毕了,才会给用户以响应,但是这三个子系统每个耗时都不小,等全部处理完了,用户等的花都谢了

而是用消息队列的话,我们可以不等子系统处理完毕,而是先给用户一个处理结束的返回,让用户以为已经结束了,事实上,库存、支付、物流这三个子系统,还在服务器上自己运行

所以说,即使调用每个子服务的耗时没有变,但是用户的体验升级了

3、削峰填谷

这就是秒杀服务常有的场景了

可以保证大量请求情况下的高可用

避免大量请求

说他是削峰填谷,其实是一个十分形象的说法,削峰,就是削去瞬时超大并发量的,填谷,就是把之前削去的部分,让后面并发量比较小的时段来填,就像下面这张图:

削峰填谷

二、MQ 的劣势

万物有利也有弊, MQ 也不例外

使用 MQ 的劣势如下:

  • 系统可用性降低

如果 mq 挂了,会影响系统的运行

  • 系统复杂度身高
  • 一致性问题

A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息,如果 B 系统、C 系统处理成功,D 系统处理 失败。如何保证消息数据处理的一致性?

三、MQ 产品

市面上 MQ 产品很多,可选的有这么几类

MQ产品

我们以比较火热的 RabbitMQ 为学习重点。

四、MQ 实现方式

MQ 的实现方式有两个 AMQPJMS

1、AMQP

Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用 层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,遵 循此协议,不收客户端和中间件产品和开发语言限制。2006年,AMQP 规范发布。类比HTTP。

AMQP 的架构如下:

AMQP架构

2、JMS

JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间 件的API

JMS 是 JavaEE 规范中的一种,类比JDBC

很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有

3、二者区别

AMQP 和 JMS 的区别,就和 http 和 rpc 差不多

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模式;而AMQP的消息模式更加丰富

五、RabbitMQ

1、RabbitMQ 简介

下面是 RabbitMQ 的架构图:

虚拟机

我们可以对比一下 AMQP 的架构图,发现是差不多的,无非就是多了 Connection 和 vh

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类 似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务 时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接, 如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含 了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息
    被保存到 exchange 中的查询表中,用于 message 的分发依据

RabbitMQ 有如下 6 种模式:

简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由
模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);

https://www.rabbitmq.com/getstarted.html

六种模式

2、RabbitMQ 安装

为了方便,我们使用 docker 进行安装

1、拉去MQ镜像,这里我们要选带 management 版本的,因为后面我们需要使用 MQ 管理控制台

docker pull rabbitmq:3.7.7-management

拉去的镜像

2、创建挂载镜像文件夹,后期我们要做配置的话,都要通过挂载的文件夹进行

创建挂载文件夹

3、启动容器

docker run -d --name rabbitmq3.7.7 \
-p 5672:5672 \
-p 15672:15672 \
-v `pwd`/rabbitmq:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123 \
2888deb59dfc

4、使用 ECS 的用户,一定别忘了把 15672 端口添加到安全组中

安全组

5、登录管理界面

输入:http://<你的主机ip>:15672

登录后,就会出现如下界面

RabbitMQ管理界面

3、RabbitMQ 入门案例

我们快速搭建一个生产者,一个消费者,来演示一下 RabbitMQ 的使用

模式的话,我们使用简单模式即可

简单模式

1)生产者

Maven 依赖:

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>

生产者:

public class Producer {
    private static final String QUEUE_NAME = "simple_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("<输入RabbitMQ所在的主机的ip>");
        factory.setPort(5672); // ECS 要开放该端口
        factory.setVirtualHost("/test");
        factory.setUsername("admin");
        factory.setPassword("123");

        // 创建连接
        Connection connection = factory.newConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        // 要发送的信息
        String message = "Hello ,this is faroz";

        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage 
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

        System.out.println("已经发送消息:"+message);

        channel.clearConfirmListeners();
        connection.close();
    }
}

在执行结束之后,我们来看看 rabbitmq 的管理控制台:

可以看到,queue 多了一条可以被消费的信息

queue 里的信息

我们点开来看,确实是我们发送的 message

我们发送的信息

2)消费者

我们先将创建连接的操作封装起来

public class ConnectionUtil {

    private static ConnectionFactory factory = new ConnectionFactory();
    static {
        factory.setHost("47.117.129.89");
        factory.setPort(5672); // ECS 要开放该端口
        factory.setVirtualHost("/test");
        factory.setUsername("admin");
        factory.setPassword("123");
    }

    public static Connection getConn() {
        // 创建连接
        try {
            return factory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void close(Connection conn, Channel channel) {
        try {
            if (channel!=null) {
                channel.close();
            }
            if (conn!=null) {
                conn.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public static void close(Connection conn) {
        close(conn,null);
    }

    public static void close(Channel channel) {
        close(null,channel);
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws IOException {
        Connection conn = ConnectionUtil.getConn();

        Channel channel = conn.createChannel();
        channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * properties 属性信息
             * body 消息
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("接收到的消息为:" + new String(body, "utf-8"));
            }
        };



        //监听消息
        /**
        * 参数1:队列名称
        * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复 会删除消息,设置为false则需要手动确认
        * 参数3:消息接收到后回调
        */
        channel.basicConsume(Producer.QUEUE_NAME,true,consumer);

        // 消费者按照道理,不应该关闭连接,要一直持续监听消息队列中的信息
        //ConnectionUtil.close(conn,channel);
    }
}

执行结果如下:

执行结果

我们在多执行几次 producer,可以发现,消费者打印出了多条信息:

消费者打印信息

4、RabbitMQ 工作模式

之前提到,RabbitMQ 中,有六种工作模式

1)简单模式

简单模式

快速入门案例中,使用的就是简单模式,这里不再赘述

2)工作队列模式(working queue)

工作队列

这种模式相比于简单模式,只是多了几个消费者,所以这种模式,适用于任务较重的情况

因为代码都差不多,这里就不再赘述了

这里要注意,这种情况是多个消费者监听一个队列,对队列中的资源获取,是竞争性的

3)订阅模式(publish/subscribe)

订阅模式

相较于前两个模式,这个模式多了一个对象,交换机(exchange)

就是上图的 X

交换机

**交换机(图中的 X)**一方面,接收生产者发送的消息。另一方面,知道如何处理消 息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange的类型。Exchange有常见以下3种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key 的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

没有绑定队列的话,消息丢失

1-发布订阅模式 (Publisher/Subscribe)

发布订阅,其实就是广播,生产者会把消息广播到所有绑定的队列中,所有相关队列的消费者都可以进行消费

发布订阅模式

Publisher:

这里我们使用广播模式发送消息,即所有被绑定的队列,都会获得发布者发布的消息

public class PSPublisher {

    public static final String FANOUT_EXCHANGE = "fanout_exchange";

    public static final String FANOUT_QUEUE1 = "fanout_queue1";

    public static final String FANOUT_QUEUE2 = "fanout_queue2";


    public static void main(String[] args) throws IOException {
        // 创建连接
        Connection conn = ConnectionUtil.getConn();

        // 创建频道
        Channel channel = conn.createChannel();

        // 声明交换机
        channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);

        // 申明队列
        channel.queueDeclare(FANOUT_QUEUE1,true,false,false,null);
        channel.queueDeclare(FANOUT_QUEUE2,true,false,false,null);

        // 队列绑定交换机
        channel.queueBind(FANOUT_QUEUE1,FANOUT_EXCHANGE,"");
        channel.queueBind(FANOUT_QUEUE2,FANOUT_EXCHANGE,"");

        for (int i = 0; i < 10; i++) {
            String message = "这是FANOUT模式发送的信息:" + i;
            channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
            System.out.println("发送的消息为:"+message);
        }
        ConnectionUtil.close(conn,channel);
    }
}

执行完成后,我们查看一下控制台:

控制台

Subscriber1:

public class PSConsumer1 {

    public static void main(String[] args) throws IOException {
        Connection conn = ConnectionUtil.getConn();

        Channel channel = conn.createChannel();

        // 声明交换机
        channel.exchangeDeclare(PSPublisher.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);

        // 绑定队列
        channel.queueDeclare(PSPublisher.FANOUT_QUEUE1,true,false,false,null);

        // 创建消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("路由key为:"+envelope.getRoutingKey());
                System.out.println("交换机为:"+envelope.getExchange());
                System.out.println("消息 id 为:"+envelope.getDeliveryTag());
                System.out.println("消息为:"+new String(body,"utf-8"));
            }
        };

        // 监听消息
        channel.basicConsume(PSPublisher.FANOUT_QUEUE1,true,consumer);
    }

}

Subscriber2:

消费者2 和消费者1差不多,只要修改绑定的队列即可

public class PSConsumer2 {
    public static void main(String[] args) throws IOException {
        Connection conn = ConnectionUtil.getConn();

        Channel channel = conn.createChannel();

        // 声明交换机
        channel.exchangeDeclare(PSPublisher.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);

        // 绑定队列
        channel.queueDeclare(PSPublisher.FANOUT_QUEUE2,true,false,false,null);

        // 创建消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("路由key为:"+envelope.getRoutingKey());
                System.out.println("交换机为:"+envelope.getExchange());
                System.out.println("消息 id 为:"+envelope.getDeliveryTag());
                System.out.println("消息为:"+new String(body,"utf-8"));
            }
        };

        // 监听消息
        channel.basicConsume(PSPublisher.FANOUT_QUEUE2,true,consumer);
    }
}

我们两个消费者都执行以下,可以看到,队列中已经没有消息了:

image-20210902142307475

2- 路由模式(routing)
  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey (路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey 。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列 的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息

image-20210902143802934

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

这里为了编写方便,我们将路由简化为只有两个,一个是插入,一个是更新

新的路由模型

生产者:

public class RPublisher {

    public static final String ROUTER_EXCHANGE = "router_exchange";

    public static final String QUEUE_INSERT = "queue_insert";

    public static final String QUEUE_UPDATE = "queue_update";

    public static void main(String[] args) throws IOException {
        Connection conn = ConnectionUtil.getConn();
        Channel channel = conn.createChannel();

        // 声明交换机,这里交换机一定要声明为 BuiltinExchangeType.DIRECT
        channel.exchangeDeclare(ROUTER_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 声明队列
        channel.queueDeclare(QUEUE_INSERT,true,false,false,null);
        channel.queueDeclare(QUEUE_UPDATE,true,false,false,null);

        // 队列绑定交换机,同时别忘了声明每个队列的路由 key
        channel.queueBind(QUEUE_INSERT,ROUTER_EXCHANGE,"insert");
        channel.queueBind(QUEUE_UPDATE,ROUTER_EXCHANGE,"update");

        // insert 队列中添加消息=
        String msg1 = "新增商品;路由模式为:insert  router-key 为:insert";
        channel.basicPublish(ROUTER_EXCHANGE,"insert",null,msg1.getBytes());


        String msg2 = "更新商品;路由模式为:update  router-key 为:update";
        channel.basicPublish(ROUTER_EXCHANGE,"update",null,msg2.getBytes());
        
        ConnectionUtil.close(conn,channel);
    }
}

队列信息

消费者1:

public class RConsumer1 {
    public static void main(String[] args) throws IOException {
        // 获取连接
        Connection conn = ConnectionUtil.getConn();

        // 创建频道
        Channel channel = conn.createChannel();

        // 申明交换机
        channel.exchangeDeclare(RPublisher.ROUTER_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 声明队列
        channel.queueDeclare(RPublisher.QUEUE_INSERT,true,false,false,null);

        // 绑定队列
        channel.queueBind(RPublisher.QUEUE_INSERT,RPublisher.ROUTER_EXCHANGE,"insert");

        // 创建消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重
             传标志(收到消息失败后是否需要重新发送) * properties 属性信息
             * body 消息
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("路由key为:"+envelope.getRoutingKey());
                System.out.println("交换机为:"+envelope.getExchange());
                System.out.println("消息 id为:"+envelope.getDeliveryTag());
                System.out.println("消费者1收到的消息为:"+new String(body,"utf-8"));
            }
        };

        /**
         * 监听消息
         *
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复 会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(RPublisher.QUEUE_INSERT,true,consumer);
    }
}

消费者2:

消费者2同理,只要将消费者1 中的队列信息进行更换即可

3 - 通配符模式(topic)

通配符模式和路由模式一样,都是可以通过路由 key 指定消息发送的位置

区别在于,通配符模式可以使用通配符,一次匹配多个路由 key

通配符规则:

#:匹配多个单词

*:匹配一个单词

item.# :能够匹配 item.insert.abc 或者 item.insert

item.* :只能匹配 item.insert

通配符匹配图示

示例其实和上面的路由模式差不多,要改的地方在于把之前生产者中声明的交换机,交给消费者声明,然后将路由 key 换为通配符

image-20210910221604704

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-10-20 12:21:41  更:2021-10-20 12:21:49 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 21:32:49-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码