IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ 笔记 -> 正文阅读

[大数据]RabbitMQ 笔记

课程介绍

课程目的

  1. 了解消息中间件背景知识、使用场景、发展等
  2. 掌握 RabbitMQ、RocketMQ、Kafka 这三款主流的消息中间件的架构、模型和使用(开发、 安装、集群部署、运维、监控等)
  3. 掌握消息的可靠性、幂等性、顺序消息、延迟消息、事务消息等进阶的知识,以及大规模生产 环境中的使用经验,轻松应对各种复杂的业务场景
  4. 掌握顶级开源消息中间件核心源码,理解其背后的架构设计思想以及在高性能存储系统、网络 编程等方面的技巧(会涉及网络通信、操作系统等底层知识)
  5. 理解主流消息中间件的优缺点,具备技术选型能力
  6. 让你无论是在日后的工作还是面试求职中遇到消息中间件相关问题都能轻松应对

课程主要内容

消息中间件概述:

分布式系统中如何进行远程通信

为什么要使用消息中间件?市场上有哪些产品?有什么优缺点?该用哪个

JMS 规范和 AMQP 协议

RabbitMQ 部分:

RabbitMQ 架构、环境准备和整合

高级特性如消息的可靠性保障、死信队列、延迟队列等

RabbitMQ 的集群、运维

源码分析,解析 RabbitMQ 的启动过程、交换器的实现、队列的实现等

第一部分:消息中间件概述

第 1 节 分布式架构通信

1.1 分布式架构通信原理

SOA 架构:

image.png2

根据实际业务,把系统拆分成合适的、独立部署的模块,模块之间相互独立。 优点:分布式、松耦合、扩展灵活、可重用。

SOA 架构系统中,使用 Dubbo 和 Zookeeper 进行服务间的远程通信。

优点: Dubbo 使用自定义的 TCP 协议,可以让请求报文体积更小,或者使用 HTTP2 协议,也可以减少报文 的体积,提高传输效率。

微服务架构:

image.png

SpringCloud 中使用 Feign 解决服务之间远程通信的问题。

Feign:轻量级 RESTful 的 HTTP 服务客户端,广泛应用于 Spring Cloud 中。

符合面向接口化的编程 习惯。

本质:封装了 HTTP 调用流程,类似 Dubbo 的服务调用。

多用于同步远程调用。

RPC 主要基于 TCP/UDP 协议,HTTP 协议是应用层协议,是构建在传输层协议 TCP 之上的,RPC 效率 更高

RPC 长连接:不必每次通信都像 HTTP 一样三次握手,减少网络开销;

HTTP 服务开发迭代更快:在接口不多,系统与系统之间交互比较少的情况下,HTTP 就显得更加方 便;相反,在接口比较多,系统与系统之间交互比较多的情况下,HTTP 就没有 RPC 有优势。

1.2 分布式同步通信的问题

电商项目中,如果后台添加商品信息,该信息放到数据库。 我们同时,需要更新搜索引擎的倒排索引 同时,假如有商品页面的静态化处理,也需要更新该页面信息

image.png

方式一、可以在后台添加商品的方法中,如果数据插入数据库成功,就调用更新倒排索引的方法, 接着调用更新静态化页面的方法。

代码应该是:

Long goodsId = addGoods(goods); if (goodsId != null) {

refreshInvertedIndex(goods);

refreshStaticPage(goods); }

问题: 假如更新倒排索引失败,该怎么办? 假如更新静态页面失败怎么办?

解决方式:

如果更新倒排索引失败,重试

如果更新静态页面失败,重试

代码应该是这样:

public Long saveGoods() {

Long goodsId = addGoods(goods); 
if (goodsId != null) { 
// 调用递归的方法,实现重试 
boolean indexFlag = refreshInvertedIndex(goods);
 // 调用递归的方法,实现重试 
boolean pageFlag = refreshStaticPage(goods); }

}

private boolean refreshInvertedIndex(Goods goods) {

// 调用服务的方法 
boolean flag = indexService.refreshIndex(goods);
 if (!flag) { refreshInvertedIndex(goods); }

}

private boolean refreshStaticPage(Goods goods) {

// 调用服务的方法
 boolean flag = staticPageService.refreshStaticPage(goods); 
if (!flag) { refreshStaticPage(goods); }

}




以上代码在执行中的问题:

  1. 如果相应的更新一直失败,岂不是一直死循环直到调用栈崩溃?
  2. 如果相应的更新一直在重试,在重试期间,添加商品的方法调用是不是一直阻塞中?
  3. 如果添加商品的时候并发量很大,效率会很低?

或许可以加上迭代的等待时间,迭代的次数加以限制,减少 CPU 消耗。

或许还可以加上多线程,同时执行更新的操作,减少执行的时间。

但是都是基于该调用一定在可见的时间内调用成功。

还是老问题:如果更新失败怎么办?

归根到底,是同步调用处理不当。这个问题在分布式架构中尤为严重。

方式二:可以先执行添加商品的方法,商品添加成功,将更新索引和更新静态页面的任务缓存到一 个公共的位置,然后由相应的服务从该位置获取任务来执行。

Long goodsId = addGoods(goods); 
if (goodsId != null)
 { goodsTaskService.cache(goods); }

此时,由于添加商品仅仅是将数据插入数据库,然后将任务信息缓存,调用立刻返回。

对于添加商品方法的调用,不会存在线程阻塞,不会存在调用栈崩溃。

再考虑远一点。

由于更新倒排索引的的服务和更新静态页面的服务要从公共的缓存或者叫任务池中取出任务并执 行,它们也会有执行失败的问题,也需要重试。如果一直更新失败,也需要一个方式来处理。 比如如果更新失败,则每隔 3 秒钟重试一次,重试三次都失败则放弃执行。 然后将错误结果放到另一个公共的地方,等待后续的补偿,无论是手工还是自动的。

还有问题:

  1. 这个公共的任务池,会不会宕机?会不会服务不可用?如何解决?
  2. 你一定确信消息发送到任务池了吗?
  3. 如果在向任务池发送任务失败该如何处理?
  4. 如果重试的时候发送成功了,但是实际上发送了多次,更新倒排索引服务和更新静态页面服务 会不会重复执行?
  5. 如果重复执行,最终结果会不会不一样?

看来真是解决了一个问题,引进来三个问题。 如果上述的问题都由我们从 0 开始解决,开发难度可想而知。

分布式服务中,由于业务拆分,应用也需要拆分,甚至数据库分库分表。 但是完成一个业务处理,往往要设计到多个模块之间的协调处理。此时模块之间,服务与服务之间 以及客户端与服务端之间的通信将变得非常复杂。

1.3 分布式异步通信模式

image.png

比较典型的“生产者消费者模式”,可以跨平台、支持异构系统,通常借助消息中间件来完成。

优点:系统间解耦,并具有一定的可恢复性,支持异构系统,下游通常可并发执行,系统具备弹 性。

服务解耦、流量削峰填谷等

缺点:消息中间件存在一些瓶颈和一致性问题,对于开发来讲不直观且不易调试,有额外成本。

使用异步消息模式需要注意的问题:

  1. 哪些业务需要同步处理,哪些业务可以异步处理?
  2. 如何保证消息的安全?消息是否会丢失,是否会重复?
  3. 请求的延迟如何能够减少?
  4. 消息接收的顺序是否会影响到业务流程的正常执行?
  5. 消息处理失败后是否需要重发?如果重发如何保证幂等性?

第 2 节 消息中间件简介

2.1 消息中间件概念

维基百科对消息中间件的解释:面向消息的系统(消息中间件)是在分布式系统中完成消息的发送 和接收的基础软件。

消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基 于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进 程的通信。

消息中间件就是在通信的上下游之间截断:break it,Broker

然后利用中间件解耦、异步的特性,构建弹性、可靠、稳定的系统。

体会一下:“必有歹人从中作梗”,”定有贵人从中相助“

异步处理、流量削峰、限流、缓冲、排队、最终一致性、消息驱动等需求的场景都可以使用消息中

间件。

image.png

2.2 自定义消息中间件

并发编程领域经典面试题:请使用 java 代码来实现“生产者消费者模式”。

BlockingQueue(阻塞队列)是 java 中常见的容器,在多线程编程中被广泛使用。

当队列容器已满时生产者线程被阻塞,直到队列未满后才可以继续 put;

当队列容器为空时,消费者线程被阻塞,直至队列非空时才可以继续 take。

image.png

provider

package com.galaxy;

import java.util.Queue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;

/**
 * @author lane
 * @date 2021年08月15日 下午3:00
 */
public class Provider implements Runnable {

    private BlockingQueue<Mask> queue;

    public Provider(BlockingQueue<Mask> queue){
        this.queue = queue;
    }

    private int index;

    @Override
    public void run() {
        while (true){

            try {
                Thread.sleep(500);
                if (queue.remainingCapacity()<=0){

                    System.out.println("口罩生产仓库已满!");
                }
                else {
                    Mask mask = new Mask();
                    mask.setId(index++);
                    mask.setType("N95");
                    System.out.println("正在生产口罩id: " + (index - 1));
                    queue.put(mask);
                    System.out.println("仓库口罩个数" + queue.size());
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }
}

consumer

package com.galaxy;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;

/**
 * @author lane
 * @date 2021年08月15日 下午3:00
 */
public class Consumer implements Runnable {

    private BlockingQueue<Mask> queue;

    public Consumer(BlockingQueue<Mask> queue){
        this.queue = queue;
    }


    @Override
    public void run() {
        while (true){
            try {
                Thread.sleep(200);
                Mask mask = queue.take();
                System.out.println("正在出售口罩的ID"+mask.getId()+"口罩的类型"+mask.getType());


            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }
}

mask

package com.galaxy;

/**
 * @author lane
 * @date 2021年08月15日 下午3:05
 */
public class Mask {

    private Integer id;
    private String type;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    @Override
    public String toString() {
        return "Mask{" +
                "id=" + id +
                ", type='" + type + '\'' +
                '}';
    }
}

sale

package com.galaxy;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;

/**
 * @author lane
 * @date 2021年08月15日 下午4:00
 */
public class Sale {
    public static void main(String[] args) {
        BlockingQueue<Mask> queue = new ArrayBlockingQueue<>(20);

        new Thread(new Provider(queue)).start();

        new Thread(new Consumer(queue)).start();

    }


}

result

正在生产口罩id: 0
仓库口罩个数1
正在出售口罩的ID0口罩的类型N95
正在生产口罩id: 1
仓库口罩个数1
正在出售口罩的ID1口罩的类型N95
正在生产口罩id: 2
仓库口罩个数1
正在出售口罩的ID2口罩的类型N95

上述代码放到生产环境显然是不行的

比如:没有集群,没有分布式,玩儿法太单一,不能满足企业 级应用的要求。。。

比如:消息有没有持久化? 怎么确定消息一定能发送成功? 怎么确定消息一定能被消费成功? 高并发下的性能怎么样? 系统可靠吗? 有没有 Pub/Sub 模式? 有没有考虑过限流?

2.3 主流消息中间件及选型

在传统金融机构、银行、政府机构等有一些老系统还在使用 IBM 等厂商提供的商用 MQ 产品。

当前业界比较流行的开源消息中间件包括:ActiveMQ、RabbitMQ、RocketMQ、Kafka、 ZeroMQ 等,其中应用最为广泛的要数 RabbitMQ、RocketMQ、Kafka 这三款。

Redis 在某种程度上也可以是实现类似“Queue”和“Pub/Sub”的机制,严格意义上不算消息中间件。

image.png

image.png

image.png

选取原则

首先,产品应该是开源的。开源意味着如果队列使用中遇到 bug,可以很快修改,而不用等待开发 者的更新。

其次,产品必须是近几年比较流行的,要有一个活跃的社区。这样遇到问题很快就可以找到解决方 法。同时流行也意味着 bug 较少。流行的产品一般跟周边系统兼容性比较好。

最后,作为消息队列,要具备以下几个特性:

1、消息传输的可靠性:保证消息不会丢失。

2、支持集群,包括横向扩展,单点故障都可以解决。

3、性能要好,要能够满足业务的性能需求。

RabbitMQ

RabbitMQ 开始是用在电信业务的可靠通信的,也是少有的几款支持 AMQP 协议的产品之一。

优点:

  1. 轻量级,快速,部署使用方便
  2. 支持灵活的路由配置。RabbitMQ 中,在生产者和队列之间有一个交换器模块。根据配置的路 由规则,生产者发送的消息可以发送到不同的队列中。路由规则很灵活,还可以自己实现。
  3. RabbitMQ 的客户端支持大多数的编程语言。

缺点

  1. 如果有大量消息堆积在队列中,性能会急剧下降
  2. RabbitMQ 的性能在 Kafka 和 RocketMQ 中是最差的,每秒处理几万到几十万的消息。如果应 用要求高的性能,不要选择 RabbitMQ。
  3. RabbitMQ 是 Erlang 开发的,功能扩展和二次开发代价很高。

RocketMQ

RocketMQ 是一个开源的消息队列,使用 java 实现。借鉴了 Kafka 的设计并做了很多改进。

RocketMQ 主要用于有序,事务,流计算,消息推送,日志流处理,binlog 分发等场景。

经过了历次的 双 11 考验,性能,稳定性可可靠性没的说。

RocketMQ 几乎具备了消息队列应该具备的所有特性和功能。 java 开发,阅读源代码、扩展、二次开发很方便。

对电商领域的响应延迟做了很多优化。在大多数情况下,响应在毫秒级。如果应用很关注响应时 间,可以使用 RocketMQ。

性能比 RabbitMQ 高一个数量级,每秒处理几十万的消息。

缺点: 跟周边系统的整合和兼容不是很好。

Kafka

Kafka 的可靠性,稳定性和功能特性基本满足大多数的应用场景。 跟周边系统的兼容性是数一数二的,尤其是大数据和流计算领域,几乎所有相关的开源软件都支持 Kafka。

Kafka 高效,可伸缩,消息持久化。支持分区、副本和容错。

Kafka 是 Scala 和 Java 开发的,对批处理和异步处理做了大量的设计,因此 Kafka 可以得到非常高的 性能。它的异步消息的发送和接收是三个中最好的,但是跟 RocketMQ 拉不开数量级,每秒处理几十万 的消息。

如果是异步消息,并且开启了压缩,Kafka 最终可以达到每秒处理 2000w 消息的级别。

但是由于是异步的和批处理的,延迟也会高,不适合电商场景。

特点RabbitMQRocketMQKafka
单机吞吐量1w 量级10w 量级10w 量级
开发语言ErlangJavaJava 和 Scala
消息延迟微秒毫秒毫秒
消息丢失可能性很低参数优化后可以 0 丢失参数优化后可以 0 丢失
消费模式推拉推拉拉取
主题数量对吞吐量 的影响\几百上千个主题会对吞吐量有 一个小的影响几十上百个主题会极大 影响吞吐量
可用性高(主从)很高(主从)很高(分布式)

如果对于吞吐量要求比较高 10 万 +

RocketMQ、Kafka

如果对于消息延时要求比较高

RabbitMQ、RocketMQ

如果对于开发语言要求自己扩展

RocketMQ、Kafka

如果消息丢失要求高

RocketMQ、Kafka

如果对于整合兼容要求比较高

RabbitMQ、Kafka

如果偏向于电商

RocketMQ

如果偏向于 Spring Cloud 系列整合

RabbitMQ

如果偏向与大数据

Kafka

2.4 消息中间件应用场景

消息中间件的使用场景非常广泛,比如,12306 购票的排队锁座,电商秒杀,大数据实时计算等。

电商秒杀案例:

比如 6.18,活动从 0:00 开始,仅限前 200 名,秒杀即将开始时,用户会疯狂刷新 APP 或者浏览器来 保证自己能够尽早的看到商品。

当秒杀开始前,用户在不断的刷新页面,系统应该如何应对高并发的读请求呢?

在秒杀开始时,大量并发用户瞬间向系统请求生成订单,扣减库存,系统应该如何应对高并发 的写请求呢?

系统应该如何应对高并发的读请求

使用缓存策略将请求挡在上层中的缓存中

能静态化的数据尽量做到静态化

加入限流(比如对短时间之内来自某一个用户,某一个 IP、某个设备的重复请求做丢弃处理)

系统应该如何应对高并发的写请求

生成订单,扣减库存,用户这些操作不经过缓存直达数据库。如果在 1s 内,有 1 万个数据连接同 时到达,系统的数据库会濒临崩溃。如何解决这个问题呢?我们可以使用 消息队列。

消息队列的作用:

削去秒杀场景下的峰值写流量——流量削峰

通过异步处理简化秒杀请求中的业务流程——异步处理

解耦,实现秒杀系统模块之间松耦合——解耦

削去秒杀场景下的峰值写流量

将秒杀请求暂存于消息队列

业务服务器响应用户“秒杀结果正在处理中。。。”,释放系统资源去 处理其它用户的请求。

削峰填谷

削平短暂的流量高峰,消息堆积会造成请求延迟处理,但秒杀用户对于短暂延迟有一定 容忍度。

秒杀商品有 1000 件,处理一次购买请求的时间是 500ms,那么总共就需要 500s 的时间。这时你 部署 10 个队列处理程序,那么秒杀请求的处理时间就是 50s,也就是说用户需要等待 50s 才可以看到 秒杀的结果,这是可以接受的。这时会并发 10 个请求到达数据库,并不会对数据库造成很大的压力。

通过异步处理简化秒杀请求中的业务流程

先处理主要的业务,异步处理次要的业务。 如主要流程是生成订单、扣减库存;次要流程比如购买成功之后会给用户发优惠券,增加用户的积 分。 此时秒杀只要处理生成订单,扣减库存的耗时,发放优惠券、增加用户积分异步去处理了。

解耦,实现秒杀系统模块之间松耦合

将秒杀数据同步给数据团队,有两种思路:

  1. 使用 HTTP 或者 RPC 同步调用,即提供一个接口,实时将数据推送给数据服务。 系统的耦合度高,如果其中一个服务有问题,可能会导致另一个服务不可用。
  2. 使用消息队列 将数据全部发送给消息队列,然后数据服务订阅这个消息队列,接收数据进行处理。

拉勾 B 端 C 端数据同步案例:

拉勾网站分 B 端和 C 端,B 端面向企业用户,C 端面向求职者。 这两个模块业务处理逻辑不同,数据库表结构不同,实际上是处于解耦的状态。

但是各自又需要对方的数据,需要共享:如

  1. 当 C 端求职者在更新简历之后,B 端企业用户如何尽早看到该简历更新?
  2. 当 B 端企业用户发布新的职位需求后,C 端用户如何尽早看到该职位信息?

无论是 B 端还是 C 端,都有各自的搜索引擎和缓存,B 端需要获取 C 端的更新以更新搜索引擎和缓 存;C 端需要获取 B 端的更新以更新 C 端的搜索引擎与缓存。 如何解决 B 端 C 端数据共享的问题?

解决方式:

  1. 同步方式:B 端和 C 端通过 RPC 或 WebService 的方式发布服务,让对方来调用,以获取对方的 信息。求职者每更新一次简历,就调用一次 B 端的服务,进行数据的同步;B 端企业用户每更 新职位需求,就调用 C 端的服务,进行数据的同步。
  2. 异步方式:使用消息队列,B 端将更新的数据发布到消息队列,C 端将更新的数据发布到消息 队列,B 端订阅 C 端的消息队列,C 端订阅 B 端的消息队列。

使用同步方式,B 端和 C 端耦合比较紧密,如果其中一个服务有问题,可能会导致另一个服务不可 用。比如 C 端的 RPC 挂掉,企业用户有可能无法发布新的职位信息,因为发布了对方也看不到;B 端的 RPC 挂掉,求职者可能无法更新简历,因为即使简历更新了,对方也看不到。

可以让 B 端或 C 端在对方 RPC 挂掉的时候,先将该通知消息缓存起来,等对方服务恢复 之后再进行同步。

这正是引入异步方式,使用消息队列的目的。

使用消息队列的异步方式,对 B 端 C 端进行解耦,只要消息队列可用,双方都可以将需要同步的信息 发送到消息队列,对方在收到消息队列推送来的消息的时候,各自更新自己的搜索引擎,更新自己的缓 存数据。

支付宝购买电影票

image.png

如上图,用户在支付宝购买了一张电影票后很快就收到消息推送和短信(电影院地址、几号厅、座 位号、场次时间等),同时用户会积累一定的会员积分。

这里,交易系统并不需要一直等待消息送达等动作都完成后才返回成功,允许一定延迟和瞬时不一 致(最终一致性),而且后面两个动作通常可以并发执行。

如果后期监控大盘想要获取实时交易数据,只需要新增个消费者程序并订阅该消息即可,交易系统 对此并不感知,松耦合。

第 3 节 JMS 规范和 AMQP 协议

3.1 JMS 经典模式详解

JMS 即 Java 消息服务(Java Message Service)应用程序接口,是一个 Java 平台中关于面向消息中间 件(MOM,Message oriented Middleware)的 API,用于在两个应用程序之间,或分布式系统中发送 消息,进行异步通信。与具体平台无关的 API,绝大多数 MOM 提供商都支持。 它类似于 JDBC(Java Database Connectivity)。

3.1.1 JMS 消息

消息是 JMS 中的一种类型对象,由两部分组成:报文头和消息主体。

报文头包括消息头字段和消息头属性。

字段是 JMS 协议规定的字段,属性可以由用户按需添加。 JMS 报文头全部字段:

image.png

消息主体则携带着应用程序的数据或有效负载。

根据有效负载的类型来划分,可以将消息分为几种类型:

  1. 简单文本(TextMessage)
  2. 可序列化的对象(ObjectMessage)
  3. 属性集合(MapMessage)
  4. 字节流(BytesMessage)
  5. 原始值流(StreamMessage)
  6. 无有效负载的消息(Message)。
3.1.2 体系架构

JMS 由以下元素组成:

  1. JMS 供应商产品

JMS 接口的一个实现。该产品可以是 Java 的 JMS 实现,也可以是非 Java 的面向消息中间件的适 配器。

  1. JMS Client

生产或消费基于消息的 Java 的应用程序或对象。

  1. JMS Producer

创建并发送消息的 JMS 客户。

  1. JMS Consumer

接收消息的 JMS 客户。

  1. JMS Message

包括可以在 JMS 客户之间传递的数据的对象

  1. JMS Queue

缓存消息的容器。消息的接受顺序并不一定要与消息的发送顺序相同。消息被消费后将从队列 中移除。

  1. JMS Topic

Pub/Sub 模式。

3.1.3 对象模型
  1. ConnectionFactory 接口(连接工厂)

用户用来创建到 JMS 提供者的连接的被管对象。JMS 客户通过可移植的接口访问连接,这样当 下层的实现改变时,代码不需要进行修改。管理员在 JNDI 名字空间中配置连接工厂,这样, JMS 客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接 工厂。

  1. Connection 接口(连接)

连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与 JMS 提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题 到目标。

  1. Destination 接口(目标)

目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是 队列,或者是主题。JMS 管理员创建这些对象,然后用户通过 JNDI 发现它们。和连接工厂一 样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。

  1. Session 接口(会话)

表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的, 就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事 务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用 户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者 来接收消息。

  1. MessageConsumer 接口(消息消费者)

由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或(非阻 塞)接收队列和主题类型的消息。

  1. MessageProducer 接口(消息生产者)

由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个 通用的发送者,在发送消息时指定目标。

  1. Message 接口(消息)

是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个 消息有三个主要部分:

消息头(必须):包含用于识别和为消息寻找路由的操作设置。

一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创 建定制的字段和过滤器(消息选择器)。

一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节 消息,流消息和对象消息)。

image.png

3.1.4 模式

Java 消息服务应用程序结构支持两种模式:

  1. 点对点也叫队列模式
  2. 发布/订阅模式

在点对点或队列模型

一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费 者的队列,并直接将消息发送到消费者的队列,概括为:

一条消息只有一个消费者获得 生产者无需在接收者消费该消息期间处于运行状态,接收者也同样无需在消息发送时处于运行 状态。 每一个成功处理的消息要么自动确认,要么由接收者手动确认。

image.png

发布/订阅模式

支持向一个特定的主题发布消息。 0 或多个订阅者可能对接收特定消息主题的消息感兴趣。 发布者和订阅者彼此不知道对方。 多个消费者可以获得消息

在发布者和订阅者之间存在时间依赖性。

发布者需要建立一个主题,以便客户能够订阅。 订阅者必须保持持续的活动状态以接收消息,否则会丢失未上线时的消息。 对于持久订阅,订阅者未连接时发布的消息将在订阅者重连时重发。

image.png

3.1.5 传递方式

JMS 有两种传递消息的方式。

标记为 NON_PERSISTENT 的消息最多投递一次,而标记为 PERSISTENT 的消息将使用暂存后再转送 的机理投递。

如果一个 JMS 服务下线,持久性消息不会丢失,等该服务恢复时再传递。默认的消息传递方式是非 持久性的。使用非持久性消息可能降低内务和需要的存储器,当不需要接收所有消息时使用。

3.1.6 供应商

开源软件:

  1. Apache ActiveMQ
  2. RabbitMQ
  3. RocketMQ
  4. JBoss 社区所研发的 HornetQ
  5. Joram
  6. Coridan 的 MantaRay
  7. The OpenJMS Group 的 OpenJMS

专有的供应商包括:

  1. BEA 的 BEA WebLogic Server JMS
  2. TIBCO Software 的 EMS
  3. GigaSpaces Technologies 的 GigaSpaces
  4. Softwired 2006 的 iBus
  5. IONA Technologies 的 IONA JMS
  6. SeeBeyond 的 IQManager(2005 年 8 月被 Sun Microsystems 并购)
  7. webMethods 的 JMS±
  8. my-channels 的 Nirvana
  9. Sonic Software 的 SonicMQ
  10. SwiftMQ 的 SwiftMQ
  11. IBM 的 WebSphere MQ

3.2 JMS 在应用集群中的问题

生产中应用基本上都是以集群部署的。在 Queue 模式下,消息的消费没有什么问题,因为不同节点 的相同应用会抢占式地消费消息,这样还能分摊负载。

如果使用 Topic 广播模式?对于一个消息,不同节点的相同应用都会收到该消息,进行相应的操 作,这样就重复消费了。。。

image.png

image.png

方案一:选择 Queue 模式,创建多个一样的 Queue,每个应用消费自己的 Queue。

弊端:浪费空间,生产者还需要关注下游到底有几个消费者,违反了“解耦”的初衷。

方案二:选择 Topic 模式,在业务上做散列,或者通过分布式锁等方式来实现不同节点间的竞争。

弊端:对业务侵入较大,不是优雅的解决方法。

ActiveMQ 通过“虚拟主题”解决了这个问题。 生产中似乎需要结合这两种模式:即不同节点的相同应用间存在竞争,会部分消费(P2P),而不 同的应用都需要消费到全量的消息(Topic)模式。这样就可以避免重复消费。

JMS 规范文档 3.3 AMQP 协议剖析(jms-1_1-fr-spec.pdf)下载地址:

https://download.oracle.com/otndocs/jcp/7195-jms-1.1-fr-spec-oth-JSpec/

JMS 是 JEE 平台的标准消息传递 API。它可以在商业和开源实现中使用。每个实现都包括一个 JMS 服 务器,一个 JMS 客户端库,以及用于管理消息传递系统的其他特定于实现的组件。 JMS 提供程序可以是 消息传递服务的独立实现,也可以是非 JMS 消息传递系统的桥梁。

JMS 客户端 API 是标准化的,因此 JMS 应用程序可在供应商的实现之间移植。但是:

  1. 底层消息传递实现未指定,因此 JMS 实现之间没有互操作性。除非存在桥接技术,否则想要共 享消息传递的 Java 应用程序必须全部使用相同的 JMS 实现。
  2. 如果没有供应商特定的 JMS 客户端库来启用互操作性,则非 Java 应用程序将无法访问 JMS。
  3. AMQP 0-9-1 是一种消息传递协议,而不是像 JMS 这样的 API。任何实现该协议的客户端都可以 访问支持 AMQP 0-9-1 的代理。
  4. 协议级的互操作性允许以任何编程语言编写且在任何操作系统上运行的 AMQP 0-9-1 客户端都 可以参与消息传递系统,而无需桥接不兼容的服务器实现。

3.3 AMQP 协议剖析

3.3.1 协议架构

AMQP 全称高级消息队列协议(Advanced Message Queuing Protocol),是一种标准,类似于 JMS,兼容 JMS 协议。目前 RabbitMQ 主流支持 AMQP 0-9-1,3.8.4 版本支持 AMQP 1.0。

image.png

image.png

3.3.2 AMQP 中的概念

Publisher:消息发送者,将消息发送到 Exchange 并指定 RoutingKey,以便 queue 可以接收到指 定的消息。

Consumer:消息消费者,从 queue 获取消息,一个 Consumer 可以订阅多个 queue 以从多个 queue 中接收消息。

Server:一个具体的 MQ 服务实例,也称为 Broker。

Virtual host:虚拟主机,一个 Server 下可以有多个虚拟主机,用于隔离不同项目,一个 Virtual host 通常包含多个 Exchange、Message Queue。

Exchange:交换器,接收 Producer 发送来的消息,把消息转发到对应的 Message Queue 中。

Routing key:路由键,用于指定消息路由规则(Exchange 将消息路由到具体的 queue 中),通 常需要和具体的 Exchange 类型、Binding 的 Routing key 结合起来使用。

Bindings:指定了 Exchange 和 Queue 之间的绑定关系。Exchange 根据消息的 Routing key 和 Binding 配置(绑定关系、Binding、Routing key 等)来决定把消息分派到哪些具体的 queue 中。这依 赖于 Exchange 类型。

Message Queue:实际存储消息的容器,并把消息传递给最终的 Consumer。

3.3.3.AMQP 传输层架构

简要概述

AMQP 是一个二进制的协议,信息被组织成数据帧,有很多类型。数据帧携带协议方法和其他信 息。所有数据帧都拥有基本相同的格式:帧头,负载,帧尾。数据帧负载的格式依赖于数据帧的类型。

我们假定有一个可靠的面向流的网络传输层(TCP/IP 或等价的协议)。

在一个单一的 socket 连接中,可能有多个相互独立的控制线程,称为“channel”。每个数据帧使用 通道号码编号。通过数据帧的交织,不同的通道共享一个连接。对于任意给定通道,数据帧严格按照序 列传输。

我们使用小的数据类型来构造数据帧,如 bit,integer,string 以及字段表。数据帧的字段做了轻微 的封装,不会让传输变慢或解析困难。根据协议规范机械地生成成数据帧层相对简单。

线级别的格式被设计为可伸缩和足够通用,以支持任意的高层协议(不仅是 AMQP)。我们假定 AMQP 会扩展,改进以及随时间的其他变化,并要求 wire-level 格式支持这些变化。

数据类型

AMQP 使用的数据类型如下:

Integers(数值范围 1-8 的十进制数字):用于表示大小,数量,限制等,整数类型无符号 的,可以在帧内不对齐。 Bits(统一为 8 个字节):用于表示开/关值。

Short strings:用于保存简短的文本属性,字符串个数限制为 255,8 个字节

Long strings:用于保存二进制数据块。

Field tables:包含键值对,字段值一般为字符串,整数等。

协议协商

AMQP 客户端和服务端进行协议协商。意味着当客户端连接上之后,服务端会向客户端提出一些选 项,客户端必须能接收或修改。如果双方都认同协商的结果,继续进行连接的建立过程。协议协商是一 个很有用的技术手段,因为它可以让我们断言假设和前置条件。

在 AMQP 中,我们需要协商协议的一些特殊方面:

1、 真实的协议和版本。服务器可能在同一个端口支持多个协议。

2、 双方的加密参数和认证方式。这是功能层的一部分。

3、 数据帧最大大小,通道数量以及其他操作限制。

对限制条件的认同可能会导致双方重新分配 key 的缓存,避免死锁。每个发来的数据帧要么遵守认 同的限制,也就是安全的,要么超过了限制,此时另一方出错,必须断开连接。出色地践行了“要么一 切工作正常,要么完全不工作”的 RabbitMQ 哲学。

协商双方认同限制到一个小的值,如下:

  1. 服务端必须告诉客户端它加上了什么限制。
  2. 客户端响应服务器,或许会要求对客户端的连接降低限制。

数据帧界定

TCP/IP 是流协议,没有内置的机制用于界定数据帧。现有的协议从以下几个方面来解决:

  1. 每个连接发送单一数据帧。简单但是慢。
  2. 在流中添加帧的边界。简单,但是解析很慢。
  3. 计算数据帧的大小,在每个数据帧头部加上该数据帧大小。这简单,快速,AMQP 的选择。
3.3.4 AMQP 客户端实现 JMS 客户端

RabbitMQ 的 JMS 客户端用 Java 实现,既与 JMS API 兼容,也与 AMQP 0-9-1 协议兼 容。

局限性

RabbitMQ JMS 客户端不支持某些 JMS 1.1 功能:

JMS 客户端不支持服务器会话。

XA 事务支持接口未实现。

RabbitMQ JMS 主题选择器插件支持主题选择器。

队列选择器尚未实现。

支持 RabbitMQ 连接的 SSL 和套接字选项,但仅使用 RabbitMQ 客户端提供的(默认)SSL 连接 协议。

RabbitMQ 不支持 JMS NoLocal 订阅功能,该功能禁止消费者接收通过消费者自己的连接发布 的消息。可以调用包含 NoLocal 参数的方法,但该方法将被忽略。

RabbitMQ 使用 amqp 协议,JMS 规范仅对于 Java 的使用作出的规定,跟其他语言无关,协议是语言 无关的,只要语言实现了该协议,就可以做客户端。如此,则不同语言之间互操作性得以保证。

AMQP 协议文档下载地址:

https://www.amqp.org/sites/amqp.org/files/amqp0-9-1.zip

第二部分:RabbitMQ

第 1 节 RabbitMQ 架构与实战

1.1 RabbitMQ 介绍、概念、基本架构

1.1.1 RabbitMQ 介绍

RabbitMQ,俗称“兔子 MQ”(可见其轻巧,敏捷),是目前非常热门的一款开源消息中间件,不管 是互联网行业还是传统行业都广泛使用(最早是为了解决电信行业系统之间的可靠通信而设计)。

  1. 高可靠性、易扩展、高可用、功能丰富等
  2. 支持大多数(甚至冷门)的编程语言客户端。
  3. RabbitMQ 遵循 AMQP 协议,自身采用 Erlang(一种由爱立信开发的通用面向并发编程的语 言)编写。
  4. RabbitMQ 也支持 MQTT 等其他协议。

RabbitMQ 具有很强大的插件扩展能力,官方和社区提供了非常丰富的插件可供选择:

https://www.rabbitmq.com/community-plugins.html

1.1.2 RabbitMQ 整体逻辑架构

image.png

1.1.3 RabbitMQ Exchange 类型

RabbitMQ 常用的交换器类型有:fanout、direct、topic、headers 四种

Fanout

会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,如图:

image.png

Direct

direct 类型的交换器路由规则很简单,它会把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的 队列中,如下图:

image.png

Topic

topic 类型的交换器在 direct 匹配规则上进行了扩展,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,这里的匹配规则稍微不同,它约定:

BindingKey 和 RoutingKey 一样都是由"."分隔的字符串;BindingKey 中可以存在两种特殊字符“*”和 “#”,用于模糊匹配,其中"*“用于匹配一个单词,”#"用于匹配多个单词(可以是 0 个)。

image.png

Headers

headers 类型的交换器不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送的消息到交换器时, RabbitMQ 会获取到该消息的 headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键 值对,如果匹配,消息就会路由到该队列。headers 类型的交换器性能很差,不实用。

1.1.4 RabbitMQ 数据存储

存储机制

RabbitMQ 消息有两种类型:

  1. 持久化消息和非持久化消息。
  2. 这两种消息都会被写入磁盘。

持久化消息在到达队列时写入磁盘,同时会内存中保存一份备份,当内存吃紧时,消息从内存中清 除。这会提高一定的性能。

非持久化消息一般只存于内存中,当内存压力大时数据刷盘处理,以节省内存空间。

RabbitMQ 存储层包含两个部分:队列索引和消息存储。

image.png

队列索引:rabbit_queue_index

索引维护队列的落盘消息的信息,如存储地点、是否已被给消费者接收、是否已被消费者 ack 等。

每个队列都有相对应的索引。

image.png

索引使用顺序的段文件来存储,后缀为.idx,文件名从 0 开始累加,每个段文件中包含固定的 segment_entry_count 条记录,默认值是 16384。每个 index 从磁盘中读取消息的时候,至少要在内存 中维护一个段文件,所以设置 queue_index_embed_msgs_below 值得时候要格外谨慎,一点点增大也 可能会引起内存爆炸式增长。

image.png

image.png

消息存储:rabbit_msg_store

消息以键值对的形式存储到文件中,一个虚拟主机上的所有队列使用同一块存储,每个节点只有一 个。

存储分为持久化存储(msg_store_persistent)和短暂存储(msg_store_transient)。

持久化存 储的内容在 broker 重启后不会丢失,短暂存储的内容在 broker 重启后丢失。

store 使用文件来存储,后缀为.rdq,经过 store 处理的所有消息都会以追加的方式写入到该文件 中,当该文件的大小超过指定的限制(file_size_limit)后,将会关闭该文件并创建一个新的文件以供新 的消息写入。文件名从 0 开始进行累加。在进行消息的存储时,RabbitMQ 会在 ETS(Erlang Term Storage)表中记录消息在文件中的位置映射和文件的相关信息。

image.png

image.png

消息(包括消息头、消息体、属性)可以直接存储在 index 中,也可以存储在 store 中。最佳的方式 是较小的消息存在 index 中,而较大的消息存在 store 中。这个消息大小的界定可以通过 queue_index_embed_msgs_below 来配置,默认值为 4096B。当一个消息小于设定的大小阈值时,就 可以存储在 index 中,这样性能上可以得到优化。一个完整的消息大小小于这个值,就放到索引中,否 则放到持久化消息文件中。

rabbitmq.conf 中的配置信息:

# queue_index_embed_msgs_below = 4096
# queue_index_embed_msgs_below = 4kb

如果消息小于这个值,就在索引中存储,如果消息大于这个值就在 store 中存储:

大于这个值的消息存储于 msg_store_persistent 目录中的 .rdq 文件中:

image.png

小于这个值的消息存储于 .idx 索引文件中:

image.png

读取消息时,先根据消息的 ID(msg_id)找到对应存储的文件,如果文件存在并且未被锁住,则直 接打开文件,从指定位置读取消息内容。如果文件不存在或者被锁住了,则发送请求由 store 进行处 理。45

删除消息时,只是从 ETS 表删除指定消息的相关信息,同时更新消息对应的存储文件和相关信息。 在执行消息删除操作时,并不立即对文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记 为垃圾数据而已。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效 数据可以合并成一个文件,并且所有的垃圾数据的大小和所有文件(至少有 3 个文件存在的情况下)的 数据大小的比值超过设置的阈值 garbage_fraction(默认值 0.5)时,才会触发垃圾回收,将这两个文件 合并,执行合并的两个文件一定是逻辑上相邻的两个文件。合并逻辑:

  1. 锁定这两个文件
  2. 先整理前面的文件的有效数据,再整理后面的文件的有效数据
  3. 将后面文件的有效数据写入到前面的文件中
  4. 更新消息在 ETS 表中的记录
  5. 删除后面文件

image.png

队列结构

通常队列由 rabbit_amqqueue_process 和 backing_queue 这两部分组成

rabbit_amqqueue_process 负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消 息、处理消息的确认(包括生产端的 confirm 和消费端的 ack)等。

backing_queue 是消息存储的具体形 式和引擎,并向 rabbit_amqqueue_process 提供相关的接口以供调用。

image.png

如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费 者,不会经过队列这一步。当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投 递。

rabbit_variable_queue.erl 源码中定义了 RabbitMQ 队列的 4 种状态:

  1. alpha:消息索引和消息内容都存内存,最耗内存,很少消耗 CPU
  2. beta:消息索引存内存,消息内存存磁盘
  3. gama:消息索引内存和磁盘都有,消息内容存磁盘
  4. delta:消息索引和内容都存磁盘,基本不消耗内存,消耗更多 CPU 和 I/O 操作

消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态会不断发 送变化。

持久化的消息,索引和内容都必须先保存在磁盘上,才会处于上述状态中的一种

gama 状态只有持久化消息才会有的状态。

在运行时,RabbitMQ 会根据消息传递的速度定期计算一个当前内存中能够保存的最大消息数量 (target_ram_count),如果 alpha 状态的消息数量大于此值,则会引起消息的状态转换,多余的消息 可能会转换到 beta、gama 或者 delta 状态。区分这 4 种状态的主要作用是满足不同的内存和 CPU 需求。

对于普通没有设置优先级和镜像的队列来说,backing_queue 的默认实现是 rabbit_variable_queue,其内部通过 5 个子队列 Q1、Q2、delta、Q3、Q4 来体现消息的各个状态。

image.png

image.png

消费者获取消息也会引起消息的状态转换。

当消费者获取消息时

  1. 首先会从 Q4 中获取消息,如果获取成功则返回。
  2. 如果 Q4 为空,则尝试从 Q3 中获取消息,系统首先会判断 Q3 是否为空,如果为空则返回队列 为空,即此时队列中无消息。
  3. 如果 Q3 不为空,则取出 Q3 中的消息;进而再判断此时 Q3 和 Delta 中的长度,如果都为空,则 可以认为 Q2、Delta、 Q3、Q4 全部为空,此时将 Q1 中的消息直接转移至 Q4,下次直接从 Q4 中获取消息。
  4. 如果 Q3 为空,Delta 不为空,则将 Delta 的消息转移至 Q3 中,下次可以直接从 Q3 中获取消息。 在将消息从 Delta 转移到 Q3 的过程中,是按照索引分段读取的,首先读取某一段,然后判断读 取的消息的个数与 Delta 中消息的个数是否相等,如果相等,则可以判定此时 Delta 中己无消 息,则直接将 Q2 和刚读取到的消息一并放入到 Q3 中,如果不相等,仅将此次读取到的消息转 移到 Q3。

这里就有两处疑问,第一个疑问是:为什么 Q3 为空则可以认定整个队列为空?

  1. 试想一下,如果 Q3 为空,Delta 不为空,那么在 Q3 取出最后一条消息的时候,Delta 上的消息 就会被转移到 Q3 这样与 Q3 为空矛盾;
  2. 如果 Delta 为空且 Q2 不为空,则在 Q3 取出最后一条消息时会将 Q2 的消息并入到 Q3 中,这样 也与 Q3 为空矛盾;
  3. 在 Q3 取出最后一条消息之后,如果 Q2、Delta、Q3 都为空,且 Q1 不为空时,则 Q1 的消息会 被转移到 Q4,这与 Q4 为空矛盾。

其实这一番论述也解释了另一个问题:为什么 Q3 和 Delta 都为空时,则可以认为 Q2、Delta、Q3、 Q4 全部为空?

通常在负载正常时,如果消费速度大于生产速度,对于不需要保证可靠不丢失的消息来说,极有可 能只会处于 alpha 状态。

对于持久化消息,它一定会进入 gamma 状态,在开启 publisher confirm 机制时,只有到了 gamma 状态时才会确认该消息己被接收,若消息消费速度足够快、内存也充足,这些消息也不会继续 走到下一个状态。

为什么消息的堆积导致性能下降?

在系统负载较高时,消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加 处理每个消息的平均开销。因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息 的能力就会降低,使得后流入的消息又被积压到很深的队列中,继续增大处理每个消息的平均开销,继 而情况变得越来越恶化,使得系统的处理能力大大降低。

应对这一问题一般有 3 种措施:

  1. 增加 prefetch_count 的值,即一次发送多条消息给消费者,加快消息被消费的速度。
  2. 采用 multiple ack,降低处理 ack 带来的开销
  3. 流量控制

1.2 安装和配置 RabbitMQ

安装环境:

  1. 虚拟机软件:VMWare 15.1.0
  2. 操作系统:CentOS Linux release 7.7.1908
  3. Erlang:erlang-23.0.2-1.el7.x86_64
  4. RabbitMQ:rabbitmq-server-3.8.4-1.el7.noarch

RabbitMQ 的安装需要首先安装 Erlang,因为它是基于 Erlang 的 VM 运行的。

RabbitMQ 需要的依赖:socat 和 logrotate,logrotate 操作系统中已经存在了,只需要安装 socat 就 可以了。

RabbitMQ 与 Erlang 的兼容关系详见:https://www.rabbitmq.com/which-erlang.html

image.png

安装配置启动

#关闭防火墙
#1、安装依赖:
yum install socat -y
#2、安装Erlang
#下载 
#https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.2/erlang-23.0.2-1.el7.x86_ 64.rpm
#安装
rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm
#3、安装RabbitMQ
#下载地址:
#https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3. 8.5-1.el7.noarch.rpm
#安装
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm
#默认安装位置
/usr/lib/rabbitmq
#跳转到可执行文件位置
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.5/sbin
#开启UI插件
rabbitmq-plugins enable rabbitmq_management
#启动rabbitmq
systemctl start rabbitmq-server
#或
./rabbitmq-server
#或者后台启动
./rabbitmq-server -detached
#查看帮助
rabbitmqctl help
#查看用户
rabbitmqctl list_users
#添加用户
?  / rabbitmqctl add_user root 1234
Adding user "root" ...
#设置标签
?  / rabbitmqctl set_user_tags root administrator
Setting tags for user "root" to [administrator] ...
#设置权限 对于虚拟主机/赋予配置、读、写权限
?  / rabbitmqctl set_permissions --vhost / root" ".*" ".* ".*"
Setting permissions for user "root" in vhost "/" ...
#登陆rabbitmq
http://172.16.94.13:15672/

标签

TagCapabilities
(None)没有访问 management 插件的权限
management可以使用消息协议做任何操作的权限,加上:1. 可以使用 AMQP 协议登录的虚拟主机的权限 2. 查看它们能登录的所有虚拟主机中所有队列、交换器和绑定的权限 3. 查看和关闭它们自己的通道和连接的权限 4. 查看它们能访问的虚拟主机中的全局统计信息,包括其他用户的活动
policymaker所有 management 标签可以做的,加上: 1. 在它们能通过 AMQP 协议登录的虚拟主机上,查看、创建和删除策略以及虚 拟主机参数的权限
monitoring所有 management 能做的,加上:1. 列出所有的虚拟主机,包括列出不能使用消息协议访问的虚拟主机的权限 2. 查看其他用户连接和通道的权限 3. 查看节点级别的数据如内存使用和集群的权限 4. 查看真正的全局所有虚拟主机统计数据的权限
administrator所有 policymaker 和 monitoring 能做的,加上:1. 创建删除虚拟主机的权限 2. 查看、创建和删除用户的权限 3. 查看、创建和删除权限的权限 4. 关闭其他用户连接的权限

访问

http://172.16.94.13:15672/

image.png

1.3 RabbitMQ 常用操作命令

# 前台启动Erlang VM和RabbitMQ 
rabbitmq-server
# 后台启动 
rabbitmq-server -detached
# 停止RabbitMQ和Erlang VM 
rabbitmqctl stop
# 查看所有队列 
rabbitmqctl list_queues
# 查看所有虚拟主机 
rabbitmqctl list_vhosts
# 在Erlang VM运行的情况下启动RabbitMQ应用 
rabbitmqctl start_app rabbitmqctl stop_app
# 查看节点状态 
rabbitmqctl status
# 查看所有可用的插件 
rabbitmq-plugins list
# 启用插件
rabbitmq-plugins enable <plugin-name>
# 停用插件
rabbitmq-plugins disable <plugin-name>
# 添加用户 
rabbitmqctl add_user username password
# 列出所有用户: 
rabbitmqctl list_users
# 删除用户:
rabbitmqctl delete_user username
# 清除用户权限:
rabbitmqctl clear_permissions -p vhostpath username
# 列出用户权限: 
rabbitmqctl list_user_permissions username
# 修改密码: 
rabbitmqctl change_password username newpassword
# 设置用户权限:
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
# 创建虚拟主机:
rabbitmqctl add_vhost vhostpath
# 列出所以虚拟主机: 
rabbitmqctl list_vhosts
# 列出虚拟主机上的所有权限:
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机:
rabbitmqctl delete_vhost vhost vhostpath
# 移除所有数据,要在 rabbitmqctl stop_app 之后使用: 
rabbitmqctl reset
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-18 12:46:21  更:2021-08-18 12:47:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:03:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码