IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMQ 消息中间件 -> 正文阅读

[Java知识库]RabbitMQ 消息中间件

一、什么是消息中间件?

1、什么是消息中间件?

在分布式项目中,一个系统 A (消费者),调用另一个系统 B (提供者)去向用户发送一些成功提示消息(下单成功等)。如果我们直接让 A 去调用 B,那么会存在耦合性的问题,系统的性能也会收到局限

在这里插入图片描述

业务场景说明:
像这样的消息中间件(也叫消息队列)在大型电子商务类网站,如京东、淘宝、去哪儿等网站有着深入的应用,为什么会产生消息队列?有几个原因:

  • 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间在添加一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;这就是降低耦合度
  • 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;这就是对系统进行消峰处理
  • 在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高系统吞吐量

2、什么是消息队列?

  • 消息队列(MQ Message Queue)就是这样的一款消息中间件的系统,它是一种应用程序对应用程序的通信方法
  • 消息队列就是数据结构中的 “先进先出” 队列,消息进入 MQ 后会排队等待订阅它的系统来处理它

在这里插入图片描述

  • 消息传递:指的是程序之间通过消息发送数据j进行通信(上图第二种),而不是通过彼此之间调用来通信(上图第一种)
  • 队列的主要作用是消除高并发访问高峰(消峰),加快网站的响应速度。

2.1、消息队列的三大作用

解耦服务、异步处理、流量消峰


1. 应用解耦

传统模式
在这里插入图片描述传统模式下,系统间的耦合性太强;


中间件模式
在这里插入图片描述
中间件模式的优点:

  • 将消息写入消息队列,需要消息的系统可以自己从消息中间件中订阅,无需消息发布者做任何修改 (发布订阅者模式)

2. 异步处理
场景说明:用户注册后,需要发注册消息和注册短信给用户,传统的方法有两种:

  1. 串行的方法
  2. 并行的方法

1)串行方式

将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。

这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。

在这里插入图片描述
2)并行方式

将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。但是顾客还是多等了无关紧要的 50ms

在这里插入图片描述
3)消息队列

引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理

在这里插入图片描述
这样可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计);

引入消息队列后处理后,响应时间是串行的3分之1,是并行的2分之1。

总之: 中间件模式将一些非必要的业务,以异步的方式执行,加快响应速度


3. 流量消峰
流量消峰一般在秒杀活动中应用广泛。

场景: 秒杀活动,一般会应为流量过大,导致服务挂掉。为了解决这个问题,一般在应用前端加入消息队列。


传统模式

如订单系统,在下单的时候就会向数据库中写入数据,但是数据库只能支撑 1000/s 左右的并发操作,并发量再高就会容易宕机。在高峰期的时候,并发量可能会突然激增,达到数据库承载极限,这个时候数据库可能就会卡死。

在这里插入图片描述
中间件模式

前端发送过来的数据被 消息中间件保留下来,通过消息队列一点一点(1000/s)的被后端服务订阅处理,这样中间件就会起到缓冲(消峰)的作用了

在这里插入图片描述MQ:帮我们处理了流量洪峰,保护了系统 A

中间模式的优点

  • 系统按照数据库能够处理的并发量(1000/s)慢慢地从消息队列中拉取消息。在生产中,短暂的高峰期积压是被允许的
  • 流量消峰也叫做消峰填谷
  • 使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”

2.2、AMQP 和 JMS

MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。

  • AMQP 是一种高级消息队列协议(Advanced Message Queuing Protocol),更准确的说是一种 binary wire-level protocol(链接协议)。这是其和 JMS 的本质差别,AMQP 不从 API 层进行限定,而是直接定义网络交换的数据格式。
  • JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

AMQP 和 JMS 的区别

  • JMS 是定义了统一的接口,来对消息操作进行统一;AMQP 是通过规定协议来统一数据交互的格式
  • JMS 限定了必须使用Java语言;AMQP 只是协议,不规定实现方式,因此是跨语言的。
  • JMS 规定了两种消息模式;而 AMQP 的消息模式更加丰富

2.3、消息队列的产品

市场上常见的消息队列有如下:

  • ActiveMQ:基于JMS
  • ZeroMQ:基于C语言开发
  • Rabbitmq:基于AMQP协议,erlang语言开发,稳定性好
  • RocketMQ:基于JMS,阿里巴巴产品
  • Kafka:类似MQ的产品;分布式消息系统,高吞吐量

在这里插入图片描述

2.4、RabbitMQ

  • RabbitMQ 是由 erlang 语言开发,基于 AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
  • RabbitMQ 官方地址:http://www.rabbitmq.com/
  • RabbitMQ 提供了6种模式:简单模式,work 模式,Publish/Subscribe 发布与订阅模式,Routing 路由模式,Topics 主题模式,RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍);
  • 官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

在这里插入图片描述


RabbitMQ 简介

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,它只是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

所以 RabbitMQ 是跨平台的

RabbitMQ 基础架构如下图:
在这里插入图片描述

  • Broker:就相当于一个数据库服务
    接收和分发消息的应用,RabbitMQ Server就是 Message Broker。
  • Virtual host:就相当于是数据库服务中的一个库
    出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等;
  • Connection:相当于 jdbc链接
    publisher/consumer 和 broker 之间的 TCP 连接;
  • Channel:就是对一个链接进行多路复用,省去每次建立链接的开销
    如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP - Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了- channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销;
  • Exchange:交换机或者说是路由,负责按照指定模式分发消息到多个消息队列中
    message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:点对点 direct (point-to-point) ,发布订阅 topic (publish-subscribe) ,广播 fanout (multicast)
  • Queue:存储消息的容器,消息最终被送到这里,等待 consumer 取走
  • Binding:通过 key 将一个或一类消息绑定到某一个消息 Queue 中
    exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

二、RabbitMQ 的安装

1、安装

由于 RabbitMQ 是 erlang 语言开发的,所以我们要首先安装 erlang 语言的开发环境,此外 RabbitMQ 还依赖了 socat,所以要下载 socat
安装工具提取码:6rzc

RabbitMQ 版本对应的 erlang 版本:
https://www.rabbitmq.com/which-erlang.html

  1. 顺序安装
    rpm -ivh erlang-21.3.8.9-1.el7.x86_64.rpm
    rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm
    rpm -ivh rabbitmq-server-3.8.1-1.el7.noarch.rpm

  2. 启动管理插件(/usr/lib/rabbitmq/bin/)
    rabbitmq-plugins enable rabbitmq_management

  3. 启动 RabbitMQ
    systemctl start rabbitmq-server.service
    systemctl status rabbitmq-server.service
    systemctl restart rabbitmq-server.service
    systemctl stop rabbitmq-server.service

  4. 测试
    在虚拟机浏览器中输入:localhost:15672/
    默认账户:guest,密码:guest
    注:guest 账户不支持远程访问

  5. 添加自定义账户(支持远程访问)
    添加管理员账号密码:rabbitmqctl add_user admin admin
    分配账号角色:rabbitmqctl set_user_tags admin administrator
    修改密码:rabbitmqctl change_password admin 123456
    查看用户列表:rabbitmqctl list_users

在这里插入图片描述

管理界面标签页介绍

  • overview:概览
  • connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
  • channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
  • Exchanges:交换机,用来实现消息的路由
  • Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

端口

  • 5672:rabbitMq 的编程语言客户端连接端口
  • 15672:rabbitMq 管理界面端口
  • 25672:rabbitMq 集群的端口

2、卸载

  • rpm -qa | grep rabbitmq
  • rpm -e rabbitmq-server

3、管理界面

如果不使用 guest,我们也可以自己创建一个用户
在这里插入图片描述

3.1、添加用户及用户类别


添加用户

添加管理员账号密码:rabbitmqctl add_user admin admin
分配账号角色:rabbitmqctl set_user_tags admin administrator
修改密码:rabbitmqctl change_password admin 123456
查看用户列表:rabbitmqctl list_users


用户类别

  1. 超级管理员(administrator)
    可登录管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
  2. 监控者(monitoring)
    可登录管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
  3. 策略制定者(policymaker)
    可登录管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息。
  4. 普通管理者(management)
    仅可登录管理控制台,无法看到节点信息,也无法对策略进行管理。
  5. 其他
    无法登录管理控制台,通常就是普通的生产者和消费者。

3.2、创建 Virtual Hosts

虚拟主机:类似于 mysql 中的 database。他们都是以 “/” 开头
在这里插入图片描述

设置虚拟主机的权限
在这里插入图片描述
在这里插入图片描述

三、RabbitMQ 入门案例

官方文档:https://www.rabbitmq.com/getstarted.html

1、简单模式

需求:使用简单模式完成消息传递
在这里插入图片描述

步骤:

  1. 创建工程(生成者、消费者)
  2. 分别添加依赖
  3. 编写生产者发送消息
  4. 编写消费者接收消息

创建生产者

public class Producer {

    public static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建链接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.17.132");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        // 2.创建链接
        Connection connection = connectionFactory.newConnection();

        // 3.创建信道
        Channel channel = connection.createChannel();

        /**
         * @param queue 队列名称
         * @param durable 在 RabbitMQ 服务器重启之后,队列是否能存活(是否支持持久化)
         * @param exclusive 是否独占本次链接(只能本消费者可以监听这个队列,)
         * @param autoDelete 当没有消费者使用这个队列的时候,是否自动删除这个队列
         * @param arguments 队列的一些其他参数(如:超时时间,过期时间,长度等等)
         */
        // 4.创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 5.发送消息
        String message = "Hello, Rabbit2!!!";

        /**
         * @param exchange 交换机的名称,没有设置为空
         * @param routingKey 路由 key,简单模式可以设置为队列名称
         * @param props 消息的属性。例如:消息的长度、类型等
         * @param body 需要传递的消息
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));


        // 6. 关闭资源
        channel.close();
        connection.close();
    }
}

创建消费者

public class Consumer {

    public static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建链接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.17.132");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");


        // 2.创建链接
        Connection connection = connectionFactory.newConnection();

        // 3.创建信道
        Channel channel = connection.createChannel();

        // 4.创建消息消费者,接受并确认消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag = " + consumerTag);
                System.out.println("envelope = " + envelope);
                System.out.println("properties = " + properties);
                System.out.println("message = " + new String(body, "UTF-8"));
            }
        };

        /**
         * @param queue 声明队列的名称
         * @param autoAck 是否自动确认消息(确认之后就会删除消息,一个消息只能确认一次)
         * @param callback 消费者
         */
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 5.关闭资源
        //channel.close();
        //connection.close();
    }
}

消费者
在这里插入图片描述

2、Work 模式

在这里插入图片描述

先启动消费者,在启动生产者

生产者

public class Producer {
    public static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        for (int i = 0; i < 30; i++) {
            String message = "Hello, Rabbit" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        }

        channel.close();
        connection.close();
    }
}

消费者

多个消费者同时消费一个队列,默认是平均分配,每个消费者拿到的消息数量是相等的

如果想实现性能高的消费者处理的消息更多,性能差的消费者处理的消息较少,则需要:

  1. 指定Qos为 1;
  2. 手动确认消息

不确认消息会怎样?

  1. 如果消息没有被确认,一旦消费者退出,停止消费,未被确认的消息不会被删除,原位返回队列;

什么时候该确认消息?

  1. 不重要的消息自动确认即可,重要的消息手动确认
  2. 假如:当我们下订单的时候,订单成功(数据库写入)就确认消息;如果数据库写入失败(报异常)就不确认消息
public class Consumer1 {
    public static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();
        // 设置 Qos
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("message = " + new String(body, "UTF-8"));

                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                /**
                 * 手动确认消息
                 * @param deliveryTag 消息递送的标签
                 * @param multiple 是否一次确认多条消息
                 */
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 手动确认消息
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}
public class Consumer2 {

    public static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("message = " + new String(body, "UTF-8"));

                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

3、发布与订阅模式

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-11-25 07:59:26  更:2021-11-25 07:59:43 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 3:48:44-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码