IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ入门笔记 -> 正文阅读

[大数据]RabbitMQ入门笔记

Messaging that just works — RabbitMQ

RabbitMQ中文文档 · RabbitMQ in Chinese (mr-ping.com)

一、消息中间件的核心组成部分

1:消息的协议
2:消息的持久化机制
3:消息的分发策略
4:消息的高可用,高可靠
5:消息的容错机制

1、消息的协议

消息中间件负责数据的传递,存储,和分发消费三个部分,数据的存储和分发的过程中肯定要遵循某种约定成俗的规范,即协议。

比如我MQ发送一个信息,是以什么数据格式发送到队列中,然后每个部分的含义是什么,发送完毕以后的执行的动作,以及消费者消费消息的动作,消费完毕的响应结果和反馈是什么,然后按照对应的执行顺序进行处理。如果你还是不理解:大家每天都在接触的http请求协议。

而消息中间件采用的并不是http协议,而常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议。

AMQP协议

AMQP:(全称:Advanced Message Queuing Protocol) 是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
特性:
1:分布式事务支持。
2:消息的持久化支持。
3:高性能和高可靠的消息处理优势。

AMQP 0-9-1 Model Explained — RabbitMQ这是rabbitmq官网上关于该协议的介绍,其中就定了该协议通信的基本模式,已经其核心概念有哪些,例如交换机类型、queue、channel、connection、broker、publisher、consumer等,属于重点。

MQTT协议

MQTT协议:(Message Queueing Telemetry Transport)消息队列是IBM开放的一个即时通讯协议,物联网系统架构中的重要组成部分。
特点:
1:轻量
2:结构简单
3:传输快,不支持事务
4:没有持久化设计。
应用场景:
1:适用于计算能力有限
2:低带宽
3:网络不稳定的场景。
支持者:

OpenMessage协议

是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。
特点:
1:结构简单
2:解析速度快
3:支持事务和持久化设计。

Kafka协议

Kafka协议是基于TCP/IP的二进制协议。消息内部是通过长度来分割,由一些基本数据类型组成。
特点是:
1:结构简单
2:解析速度快
3:无事务支持
4:有持久化设计

2、消息的持久化机制

简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。

常见的持久化方式

ActiveMQRabbitMQKafkaRocketMQ
文件存储支持支持支持支持
数据库支持///

3、消息的分发策略

MQ消息队列有如下几个角色
1:生产者
2:存储消息
3:消费者
那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的git就有推拉机制,我们发送的http请求就是一种典型的拉取数据库数据返回的过程。而消息队列MQ是一种推送的过程,而这些推机制会适用到很多的业务场景也有很多对应推机制策略。

消息分发策略的机制和对比

ActiveMQRabbitMQKafkaRocketMQ
发布订阅支持支持支持支持
轮询分发支持支持支持/
公平分发/支持支持/
重发支持支持/支持
消息拉取/支持支持支持

4、消息的高可用,高可靠

所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。

集权有很多种模式,无论什么集群模式,他们的最终目的都是为保证:消息服务器不会挂掉,出现了故障依然可以抱着消息服务继续使用。

反正终归三句话:
1:要么消息共享,
2:要么消息同步
3:要么元数据共享

如何保证中间件消息的可靠性呢?可以从两个方面考虑:
1:消息的传输:通过协议来保证系统间数据解析的正确性。
2:消息的存储可靠:通过持久化来保证消息的可靠性。

二、MQ分类

1.ActiveMQ

优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较 低的概率丢失数据。

缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。

2.Kafka

为大数据而生的消息中间件, 以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥 着举足轻重的作用。目前已经被 LinkedIn,Uber, Twitter, Netflix 等大公司所采纳。

优点: 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级可用性非 常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采 用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web 管理界面 Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持: 功能 较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消 息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序, 但是一台代理宕机后,就会产生消息乱序,社区更新较慢;

3.RocketMQ

RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一 些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场 景。

优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分 布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码,定制自己公司的MQ

缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码。

4.RabbitMQ

2007 年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最 主流的消息中间件之一。

优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易 用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高 https://www.rabbitmq.com/news.html

缺点:商业版需要收费,学习成本较高

MQ 的选择

1.Kafka Kafka 主要特点是基于Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集 和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能, 肯定是首选 kafka 了。

2.RocketMQ 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削 峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务 场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。

3.RabbitMQ 结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分 方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。

三、RabbitMQ简介

RabbitMQ是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。通过将消息的发送和接收分离来实现应用程序的异步和解偶

RabbitMQ核心概念

在这里插入图片描述

  • 生产者(Producer):发送消息的应用。
  • 消费者(Consumer):接收消息的应用。
  • 队列(Queue):存储消息的缓存。
  • 消息(Message):又生产者通过RabbitMQ发送给消费者的信息。
  • 连接(Connection):连接RabbitMQ和应用服务器的TCP连接。
  • 通道(Channel):Connection里的一个虚拟通道,避免频繁创建connection带来资源消耗。当你通过消息队列发送或者接收消息时,这个操作都是通过通道进行的。
  • 交换机(Exchange):从生产者那里接收消息,并根据交换类型分发到对应的消息列队里。要实现消息的接收,一个队列必须绑定一个交换机。
  • 绑定(Binding):绑定是队列和交换机的一个链接。
  • 路由键(Routing Key):路由键是供交换机查看并根据键的值来决定如何分发消息到列队的一个键。路由键可以说是消息的目的地址。
  • AMQP:AMQP(高级消息队列协议Advanced Message Queuing Protocol)是RabbitMQ使用的消息协议。
  • 用户(Users):在RabbitMQ里,是可以通过指定的用户名和密码来进行连接的。每个用户可以分配不同的权限,例如读权限,写权限以及在实例里进行配置的权限。
  • Virtual Host:提供资源的逻辑分组和分离.RabbitMQ中的资源权限的作用域是每个virtual host.

关于Virtual Host。vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的队列、绑定、交换器和权限控制;vhost通过在各个实例间提供逻辑上分离,允许你为不同应用程序安全保密地运行数据;vhost是AMQP概念的基础,必须在连接时进行指定,RabbitMQ包含了默认vhost:“/”;当在RabbitMQ中创建一个用户时,用户通常会被指派给至少一个vhost,并且只能访问被指派vhost内的队列、交换器和绑定,vhost之间是绝对隔离的。

vhost可以理解为虚拟broker,即mini-RabbitMQ server,其内部均含有独立的queue、bind、exchange等,最重要的是拥有独立的权限系统,可以做到vhost范围内的用户控制。当然,从RabbitMQ全局角度,vhost可以作为不同权限隔离的手段(一个典型的例子,不同的应用可以跑在不同的vhost中)。

RabbitMq如何运行呢,真实业务中通常是将业务数据序列化后作为消息来传递来完成我们相应的业务。

在这里插入图片描述

四、RabbitMQ安装:

根据官网安装,因为是基于erlang的,所以首先要先安装erlang。这里要注意看它官网给的erlang和rabbitMQ的推荐版本。

安装erlang:

wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-21.3.8.21-1.el7.x86_64.rpm/download.rpm

可以在windows上下载之后通过ssh工具传到虚拟机的linux上。

这里我放在了/usr/local/environments下,cd到该路径,然后执行如下命令:

#安装 erlang
rpm -ivh erlang-21.3.8.21-1.el7.x86_64.rpm 
#下面这个是官网说的要安装的依赖,避免等会安装rabbitMQ报错
yum install socat -y

安装RabbitMQ

下载

wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.8-1.el7.noarch.rpm/download.rpm

安装

rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

到此就安装完成了。

#启动(启动过程比较慢,等等就行)
service rabbitmq-server start
#查看状态
 service rabbitmq-server status
#停止
 service rabbitmq-server stop
#开机自启动
chkconfig rabbitmq-server on

激活管理界面

激活 web 管理插件 (开启前先关闭rabbitmq-server)

#先关闭rabbitmq-server
service rabbitmq-server stop
#激活 web 管理插件
rabbitmq-plugins enable rabbitmq_management

报错:

[root@muyi environments]# rabbitmq-plugins enable rabbitmq_management
{:query, :rabbit@muyi, {:badrpc, :timeout}}

关键是这里的rabbit@muyi ,@后面的标识的是hostname.hostdomain里的 name,比如我的hostname文件中为muyi.com,所以它报错的是rabbit@muyi。

原因:这里hostname没有追加到/etc/hosts文件中,所以通过默认localhost hostname访问服务发生timeout。

在/etc/hosts中 的127.0.0.1 一行的末尾添加 hostname ,就是@后面的值,我这里是muyi。

然后就解决了。

然后重新启动rabbitmq-server

service rabbitmq-server start

然后通过http://{node-hostname}:15672/访问管理界面,比如我这里的hostname是muyi.com,那我应该http://muyi.com:15672/ or http://localhost:15672/都能访问到.

如果是非该主机,则用ip地址加端口号即可

如果没有响应,那么可能是没有开放响应端口,可以选择关闭防火墙或开放该端口15672.

关闭防火墙:

#centos7  默认防火墙是firewalld    
#所以查看防火墙状态命令:     
systemctl status firewalld.service   
#关闭运行的防火墙使用命令:    
systemctl stop firewalld.service    
#前面的方法,一旦重启操作系统,防火墙就自动开启了,该怎么设置才能永久关闭防火墙呢?
#输入命令开机禁止防火墙服务器:
systemctl disable firewalld.service
#输入命令开机启动防火墙服务器:
systemctl enable firewalld.service

然后就能成功访问管理界面了。

管理界面访问与权限

管理界面需要身份验证和授权,就像 RabbitMQ 需要连接客户端一样。 除了成功的身份验证之外,管理 UI 访问还由用户tag控制。 标签使用rabbitmqctl 进行管理。 默认情况下,新创建的用户没有设置任何tag。

tags有四种:management、policymaker、monitoring、administrator,权限依次递增,每个用户最多一个tag。tag的管理使用rabbitmqctl命令

关于这四种tag具体的权限列表,具体见官网https://www.rabbitmq.com/management.html#permissions

  • 1、administrator 可以登录控制台、查看所有信息、可以对rabbitmq进行管理
  • 2、monitoring 监控者 登录控制台,查看所有信息
  • 3、policymaker 策略制定者 登录控制台,指定策略
  • 4、managment 普通管理员 登录控制台

创建用户

这里创建一个有administrator tag的用户

# create a user
rabbitmqctl add_user admin 123456
# tag the user with "administrator" for full management UI and HTTP API access
rabbitmqctl set_user_tags admin administrator

设置用户权限:

set_permissions [-p ]
rabbitmqctl set_permissions -p “/” admin “." ".” “.*”

让用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 当前用户和角色

查看用户和角色

rabbitmqctl list_users

可以看见除了我们创建的admin外,还有个guest账户,这是rabbitmq自带的,密码也是guest,但是只能在localhost上登录。

重置命令

关闭应用的命令为 rabbitmqctl stop_app

清除的命令为 rabbitmqctl reset

重新启动命令为 rabbitmqctl start_app

五、RabbitMQ的配置

RabbitMQ默认情况下有一个配置文件,定义了RabbitMQ的相关配置信息,默认情况下能够满足日常的开发需求。如果需要修改需要,需要自己创建一个配置文件进行覆盖。

https://www.rabbitmq.com/configure.html

相关端口

5672:RabbitMQ的通讯端口
25672:RabbitMQ的节点间的CLI通讯端口是
15672:RabbitMQ HTTP_API的端口,管理员用户才能访问,用于管理RabbitMQ,需要启动Management插件。
1883,8883:MQTT插件启动时的端口。
61613、61614:STOMP客户端插件启用的时候的端口。
15674、15675:基于webscoket的STOMP端口和MOTT端口

六、官网tutorial入门案例学习

这部分按照官网的RabbitMQ Tutorials中的7个demo做个实践。 这些只是简单的学习,可以看看 documentation ,以及这些其它的guides: Publisher Confirms and Consumer Acknowledgements, Production Checklist and Monitoring

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ewfwxqNS-1628862897188)(D:\cs-book\笔记\RabbitMQ\img\image-20210808185157879.png)]

概述:

  • hello world对应简单模式,使用的是默认交换机,一个producer一个consumer。

  • work queue 主要是内容是将i西澳西分发给不同的consumer。

    • 调度模式分:轮询调度和公平调度。
    • 还涉及消息确认。
    • 消息持久化:队列持久化和消息持久化。
  • Pubulish/Subscribe开始介绍交换机和队列的工作机制,如何将一个消息同时发送给多个消费者。主要介绍了fanout交换机。

  • Routing主要内容是如何选择性的接收消息。设计直接交换机,以及直接交换机的多重绑定。

  • Topics主要是讲基于一直匹配模式来灵活的选择要接受的消息,然后介绍了topic交换机。

  • RPC介绍了请求响应模式,不怎么用。

  • Pubisher Confirms主要围绕可靠发布,如何确保消息送达,介绍了三种实现方式。

1.Hello World

在这里插入图片描述

RabbitMQ 是一个消息代理(broker):它接受和转发消息。

在本教程的这一部分中,我们将用 Java 编写两个程序; 发送单个消息的生产者和接收消息并将其打印出来的消费者。 我们将忽略 Java API 中的一些细节,专注于这个非常简单的事情只是为了开始。 相当于message学习的的“Hello World”。

2.Work queues

Work queues(又名:Taskqueues )背后的主要思想是避免立即执行资源密集型任务而不得不等待它完成。 相反,我们安排任务稍后完成。 我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将弹出任务并最终执行作业。之后讲这些消息分发给许多的workers。

根据分发策略,又分为轮询调度(Round-Robin)公平调度(fair dispatch)

轮询调度:任务顺序分发给worker,不考虑任务复杂度。

公平调度:同一时间每个worker只做一个任务,如果任务完成了则再分发任务。这样能够缓解任务分布不均问题,具体解释之后会讲到。

这个概念在 Web 应用程序中特别有用,因为在短的 HTTP 请求窗口期间不可能处理复杂的任务。

当队列中发送了多个message,并且有多个消费者连接在该队列上时,RabbitMQ 会将每条消息依次发送给下一个消费者,这些消费者会通过 round-robin的分发策略使得每个消费者获得的消息数量相同。

但是其中一个消费者开始了一项长期任务并且只完成了部分任务而死亡,按照如果没有消息确认机制,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除。 在这种情况下,如果你杀死一个worker(消费者程序),我们将丢失它刚刚处理的消息。 我们还将丢失所有已分派给该特定worker但尚未处理的消息。

Message acknowledgment(消息确认)

消息确认是告诉 RabbitMQ 特定消息已被接收、处理,然后RabbitMQ 就可以删除它。

详细的内容看官网:Consumer Acknowledgements and Publisher Confirms — RabbitMQ

消息自动重新入队:如果消费者在没有发送 ack (也就是在手动确认模式下未收到确认)的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解消息未完全处理并将重新排队。 如果有其他消费者同时在线,它会迅速将其重新交付给另一个消费者。 这样您就可以确保不会丢失任何消息,即使worker偶尔会死亡。而且消息不会超时; RabbitMQ 会在消费者死亡时重新传递消息。

手动消息确认默认是打开的,它会在明确受到确认时才将消息视为发送成功了。设置autoAck=true 会将其关闭,而采用自动消息确认。

自动消息确认

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失 了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当 然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使 得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以 某种速率能够处理这些消息的情况下使用

消息确认要确保在同一个channel,否则会引发channel级别的协议异常。

手动消息确认的几种方式:

手动发送确认信息可以是肯定的(positive)也可以是否定的(negative),可以使用以下协议方法之一:

  • Channel.basicAck(用于肯定确认) RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  • Channel.basicNack(用于否定确认)
  • Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数 不处理该消息了直接拒绝,可以将其丢弃

肯定确认让 RabbitMQ 记录一个消息已经发送成功,可以丢弃。带有 basic.reject 的否定确认具有相同的效果。两者的区别主要在于语义上: 肯定确认假设消息已经被成功处理,而否定确认则表明传递没有被处理,但仍然应该被删除。

 // positively acknowledge a single delivery, the message will
             // be discarded
             channel.basicAck(deliveryTag, false);

可以批处理手动确认以减少网络流量。true 代表批量应答 channel 上未应答的消息。

// positively acknowledge all deliveries up to
             // this delivery tag
             channel.basicAck(deliveryTag, true);

手动消息确认的代码如下:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    try {
        doWork(message);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println(" [worker1] Done");
        //手动消息确认
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});

如果忘记了消息确认,那么消息会不断重复,而rabbitmq不会释放没有ack的消息,这就会导致内存占用越来越大!可以使用 rabbitmqctl 打印messages_unacknowledged来debug这个问题:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Message durability(消息持久化)

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。 但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。

需要做两件事来确保消息不会丢失:将队列和消息都持久化

队列持久化

之前声明的队列不是持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果要队列实现持久化,需要在声明队列的时候把 durable 参数设置为true.

但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新 创建一个持久化的队列,不然就会出现错误

durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

消息持久化

只需设置MessageProperties 的值为MessageProperties.PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

消息持久化注意事项

将消息标记为持久并不能完全保证消息不会丢失。 虽然它告诉 RabbitMQ 将消息保存到磁盘,但是当 RabbitMQ 已经接受一条消息并且还没有保存它时,仍然有很短的时间窗口。 此外,RabbitMQ 不会对每条消息都执行 fsync(2) —— 它可能只是保存到缓存中,而不是真正写入磁盘。 持久性保证不强,但对于我们简单的任务队列来说已经足够了。 如果您需要更强的保证,那么您可以使用publisher confirms.。

Fair dispatch(公平调度)

前面的dispatch模式——轮询模式(Round-Robin)不能完全按照我们想要的方式工作。 例如,在有两个 worker 的情况下,当所有奇数消息都很重,偶数消息很轻时,一个 worker 会一直很忙,而另一个 worker 几乎不做任何工作。 而RabbitMQ 对此一无所知,仍然会均匀地发送消息,因为 RabbitMQ 只是在消息进入队列时分派消息, 它不考虑消费者未确认消息的数量,只是盲目地将每条第 n 条消息分派给第 n 条消费者。

在这里插入图片描述

为了解决这个问题,我们可以使用带有 prefetchCount = 1 设置的 basicQos 方法。 这告诉 RabbitMQ 一次不要给一个worker多个消息。 或者,换句话说,在处理并确认前一条消息之前,不要向worker发送新消息。 相反,它会将它分派给下一个不忙的worker。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

这种方式可能会导致队列被未处理的消息堆满,所以要保持观察或增加更多worker。

现在我们可以继续教程 3 来学习如何向许多消费者传递相同的消息。

3.Publish/Subscribe

之前的两部分中,一个任务只能传递给一个worker,而这里将学习将一个消息传递给多个消费者。这种模式就叫做:Publish/Subscribe 发布订阅模式。

为了说明该模式,我们将构建一个简单的日志系统。 它将由两个程序组成——第一个将发出日志消息,第二个将接收并打印它们。

在我们的日志系统中,接收器程序的每个运行副本都会收到消息。 这样我们就可以运行一个接收器并将日志定向到磁盘; 同时我们将能够运行另一个接收器并在屏幕上查看日志。

本质上,发布的日志消息将被广播给所有接收者。

Exchanges交换机

之前学习了向队列发送消息和从队列接收消息,接下来学习Rabbit中的完整消息传递模型。

RabbitMQ 消息传递模型的核心思想是生产者从不直接向队列发送任何消息。 实际上,生产者经常甚至根本不知道消息是否会被传送到任何队列。

在这里插入图片描述

相反,生产者只能向交换机发送消息。 交换机一方面接收来自生产者的消息,另一方面将它们推送到队列中。 交换机必须确切地知道如何处理它收到的消息。 它应该附加到特定队列吗? 它应该附加到许多队列中吗? 或者它应该被丢弃。 其规则由交换机类型定义。

有几种可用的交换类型:direct, topic, headers 和fanout(扇出)。 我们将关注最后一个——fanout。 让我们创建一个这种类型的交换机,并将其称为日志:

channel.exchangeDeclare("logs", "fanout");

fanout交换机很简单,正如其名字一样,它直接将受到的消息发送给所有他知道的队列。这刚好是我们的日志系统需要的。

查看交换机列表

使用如下rabbitmqctl 命令可以查看当前server上的交换机列表,其中有一些是amq.*命名的交换机,还有一个default交换机,这些都是系统自带的,现阶段用不上,不管。

sudo rabbitmqctl list_exchanges

在这里插入图片描述

NameLess交换机

回顾前面的阶段,我们好像并没有使用交换机,那我们是如何发送的消息呢?

实际上我们之前的用法是这样的:

channel.basicPublish("", "hello", null, message.getBytes());

我们用空字符串""作为交换机名,这其实是在使用default交换机。

默认交换机隐式绑定到每个队列,routing key等于队列名称。 无法明确绑定到默认交换或从默认交换中解除绑定。 它也无法删除。

现在我们创建了新的类型为fanout的交换机logs之后,就可以使用它了:

channel.basicPublish( "logs", "", null, message.getBytes());

Temporary queues(临时队列)

下面这个代码能够创建一个随机命名的,且在我们和消费者队列断开之后立马删除的队列,并返回该队列名字。这样做的目的是获得一个新的、空的队列,这样每次获得消息都是最新的。返回的队列名形式如amq.gen-JzTY20BRgKO-HjmUJj0wLg,是一个随机的名字。

String queueName = channel.queueDeclare().getQueue();

Bindings

在这里插入图片描述

我们已经创建了一个扇出交换和一个队列。 现在我们需要告诉交换器向我们的队列发送消息。 交换和队列之间的这种关系称为绑定。

channel.queueBind(queueName, "logs", "");

这样,logs交换机就会将消息附加到我们的队列中

查看bingding列表:

rabbitmqctl list_bindings

总结:

在这里插入图片描述

生成日志消息的生产者程序与之前的教程看起来没有太大区别。 最重要的变化是我们现在想要将消息发布到我们的日志交换而不是无名的交换。 我们需要在发送时提供一个 routingKey,但fanout exchanges忽略这个值。

下面是provider的方,命名为Emitlog

public class Emitlog {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.235.128");
        factory.setUsername("admin");
        factory.setPassword("123456");
        try(Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()){
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String message = "hello logs";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

如果还没有队列绑定到交换器,消息将会丢失,但这对我们来说是可以的; 如果还没有消费者在听,我们可以安全地丢弃消息。

consumer方命名为ReceiveLogs,可以运行多个副本:

public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.235.128");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

和前面提到的一样,如果ReceiveLogs还没有启动,Emitlog就发送了消息,则消息会丢失。

当ReceiveLogs停止后,其对应的queue会自动删除。

要了解如何侦听消息的子集,让我们继续学习教程 4

4.Routing

本届将学习只订阅一个消息的子集,即只接受需要的消息,而不是所有的消息都接受。

channel.queueBind(queueName, EXCHANGE_NAME, "");

我们上节用这种方式绑定了queue和交换机,其中""对应的参数是routingKey,这表明queue接收所有交换机的消息。

这里为了和为了和basic_publish 中的routingKey作区分,将这里的routingKey参数叫做binding key,下面展示了创建一个带有key的binding。

channel.queueBind(queueName, EXCHANGE_NAME, "black");

binding key的含义取决于交换机类型,例如fanout类型的交换机,则会忽略bingding key的值。

Direct exchange

Direct exchange(直接交换机)背后的路由算法很简单 - 消息进入其binding key与消息的routingKey完全匹配的队列。

直接交换机能够选择将部分消息法给部分队列,更灵活些,不像fanout一样无脑广播。

在这里插入图片描述

在上图中,我们可以看到直接交换机X被两个队列绑定。第一个队列绑定的binding key是orange,第二个队列有两个binding key,一个为black,另一个为green。
在这样的设置中,使用routingKey为 orange发布到交换机的消息将被路由到队列 Q1。routingKey为black或green的消息将转到 Q2。所有其他消息将被丢弃。

Multiple bindings多重绑定

在这里插入图片描述

使用相同的binding key绑定多个队列是完全合法的。这样direct交换机的行为就类似与fanout交换机了。

然后我们使用direct交换机替代上面的fanout交换机来实现日志系统,使得日志系统能选择性接收消息。

provider为EmitLogDirect.java 类,代码如下:

public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.235.128");
        factory.setUsername("admin");
        factory.setPassword("123456");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            
            String routingKey = "error";//
            String message = "error message";

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
    }
}

consumer类如下:

public class ReceiveLogsDirect {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.235.128");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        //bind queue with binding key
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {
        });
    }
}

如果provider先启动,consumer后启动,消息会丢失。consumer启动才会创建queue。

下面一节学习如何根据pattern来监听消息。

5.Topics

前面的fanout交换机只能做无脑的广播,而直接交换机虽然能够选择性接收消息,但是其routing不能基于多个标准。但是就日志系统来说,我们有时只要接收来自cron的机器上的error的消息,要实现这个功能,就要用到Topic 交换机。

Topic exchange

发送到Topic 交换机的消息不能具有任意的 routing_key - 它必须是一个以点分隔的字符串列表。 一般就是一些与消息相关的特征组成的字符串。例如:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。 routing_key中可以有任意多个word,最多 255 个字节。

bindingKey也必须采用和routingkey相同的形式。 Topic exchange背后的逻辑类似于direct交换机 - 使用特定routingkey发送的消息将被传递到bindingKey匹配的所有队列。 但是这里有两个重要的特殊情况:

  • *(星号)可以正好代替一个词。
  • # 可以代替零个或多个单词

下图很好的解释了这一点:

在这里插入图片描述

这里routingkey由三个单词组成,三部分的含义分别为"..".。现然Q1只接受orange颜色的消息,Q2接收任何兔子或者lazy的任何东西。

如果queue使用“#”作为binding key绑定topic交换机,那么会接受所有消息,就退化成fanout交换机了。

如果queue不适用*或者#符号来绑定,就退化的和direct交换机一样了。

代码部分和上一部分基本相同,就修改交换机类型,然后交换机名字改一下,然后provider类routingkey设置成特定pattern模式(eg:“error.user1”),consumer类的bindingKey也设置成对应的pattern模式来接收(eg:“error.*”)。

6.Remote procedure call (RPC)

RPC:在一台远程电脑上运行一个函数并等待返回结果,这就叫远程程序调用。

在本教程中,我们将使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有任何值得分配的耗时任务,我们将创建一个返回斐波那契数列的虚拟 RPC 服务。

Client interface

为了说明如何使用 RPC 服务,我们将创建一个简单的客户端类。 它将暴露一个名为 call 的方法,该方法发送一个 RPC 请求并阻塞直到收到结果:

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

A note on RPC

滥用 RPC 不仅不会简化软件,还会导致难以维护。

建议:

- 区分哪个函数调用是本地的,哪个是远程的。

- 记录您的系统。明确组件之间的依赖关系。

- 处理错误情况。当RPC服务器长时间宕机时,客户端应该如何反应?

如有疑问,请避免使用 RPC。如果可以,您应该使用异步管道——而不是类似 RPC 的阻塞,结果被异步推送到下一个计算阶段

Callback queue

一般来说,通过 RabbitMQ 进行 RPC 很容易。客户端发送请求消息,服务器回复响应消息。为了接收响应,我们需要随请求一起发送“回调”队列地址。我们可以使用默认队列(在 Java 客户端中是exclusive )。

allbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue

Message properties

The AMQP 0-9-1 协议预定义了 14 个与消息一起使用的属性。大多数属性很少使用,但以下情况除外:

  • deliveryMode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
  • contentType: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
  • replyTo: Commonly used to name a callback queue.
  • correlationId: Useful to correlate RPC responses with requests.

Correlation Id

在上面介绍的方法中,我们建议为每个 RPC 请求创建一个回调队列。 这是非常低效的.有一个更好的方法 - 为每个客户端创建一个回调队列。

这引发了一个新问题,在该队列中收到响应后,不清楚该响应属于哪个请求。 而这就是使用correlationId 属性的时候。 我们将为每个请求将其设置为唯一值。 稍后,当我们在回调队列中收到一条消息时,我们将查看此属性,并基于此属性将响应与请求进行匹配。 如果我们看到未知的correlationId 值,我们可以安全地丢弃该消息——它不属于我们的请求。

总结
在这里插入图片描述

RPC 工作流程:

  • 对于 RPC 请求,客户端发送具有两个属性的消息:replyTo,它被设置为仅为该请求创建的匿名独占队列,以及correlationId,它被设置为每个请求的唯一值。
  • 请求被发送到 rpc_queue 队列。
  • RPC 工作者(又名:服务器)正在等待该队列上的请求。 当一个请求出现时,它会执行工作并将带有结果的消息发送回客户端,使用来自 replyTo 字段的队列。
  • 客户端等待回复队列中的数据。 当出现一条消息时,它会检查correlationId 属性。 如果它与请求中的值匹配,则它将响应返回给应用程序。

7.Publisher Confirms发布确认

**Publisher Confirms(发布确认)**是实现可靠Publishing的 RabbitMQ 扩展。 当在通道上启用发布者确认时,客户端发布的消息由代理异步确认,这意味着它们已在服务器端得到处理。

Overview

在本教程中,我们将使用发布者确认来确保发布的消息已安全到达代理。 我们将介绍几种使用发布者确认的策略并解释它们的优缺点。

Enabling Publisher Confirms on a Channel

Publisher confirms是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下不启用它们。 使用 confirmSelect 方法在频道级别启用发布确认

Channel channel = connection.createChannel();
channel.confirmSelect();

确认应该只启用一次,而不是为每个发布的消息启用。

具体而言,确认有三种方式:

  1. 单个确认发布

发布一个消息就确认一次

while (thereAreMessagesToPublish()) {
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.basicPublish(exchange, queue, properties, body);
    // uses a 5 second timeout
    //超时还没有确认就报exception,处理该异常需要log错误,并重新发送消息
    channel.waitForConfirmsOrDie(5_000); 
    
}
  1. 批量确认发布

批量确认,一批消息发送成功后确认一次。

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地 提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现 问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种 方案仍然是同步的,也一样阻塞消息的发布。

int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.basicPublish(exchange, queue, properties, body);
    outstandingMessageCount++;
    if (outstandingMessageCount == batchSize) {
        ch.waitForConfirmsOrDie(5_000);
        outstandingMessageCount = 0;
    }
}
if (outstandingMessageCount > 0) {
    ch.waitForConfirmsOrDie(5_000);
}
  1. 异步确认发布

    异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。

Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
    // code when message is confirmed
}, (sequenceNumber, multiple) -> {
    // code when message is nack-ed
});

三种对比:

  • 单独发布消息 :同步等待确认,简单,但吞吐量非常有限。
  • 批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条 消息出现了问题。
  • 异步处理: 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

七、官网给的具体知识点如下

Message Store and Resource Management

关于队列和消费者特性

Queue and Consumer Features

比如

死信队列Dead Lettering

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:

  • 消息 TTL 过期

  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)

  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。

死信队列工作模式如下:

在这里插入图片描述

死信队列是我们实现可靠消息机制的重要方式。

在这里插入图片描述

延时队列

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

使用场景

1.订单在十分钟之内未支付则自动取消

2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

3.用户注册成功后,如果三天内没有登陆则进行短信提醒。

4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。

5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎 使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果 数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支 付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十 分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万 级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单 的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

RabbitMQ 中的 TTL

RabbitMQ 允许您为消息和队列设置 TTL(生存时间)。

设置队列中消息的TTL

在创建队列的时候设置队列的“x-message-ttl”属性

如果一条消息进入了设置TTL 属性的队列,那么这 条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。 请注意,路由到多个队列的消息在其所在的每个队列中可能会在不同时间消亡,或者根本不会消亡。 一个队列中消息的死亡不会影响其他队列中同一消息的生命周期。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

在发布方设置单条消息的TTL

通过在发送 basic.publish 时在基本 AMQP 0-9-1 类中设置过期字段,可以在每条消息的基础上指定 TTL。

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                                   .expiration("60000")
                                   .build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

虽然这两者都是设置消息的ttl,但是如果是设置队列的ttl,如果队列中消息过期还能进如死信队列, 而单个消息的则不能。

设置队列TTL

不仅仅是队列的内容,TTL 也可以在队列上设置。 队列在一段时间后只有在未被使用时才会过期(例如,没有消费者)。 此功能可以与自动删除队列属性一起使用。

此 Java 示例创建了一个队列,该队列在 30 分钟未使用后过期。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);

持久化

持久化分消息持久化和队列持久化,前面已经提到了。这里的持久化就是将消息或队列从内存中复制一份到磁盘中存储。但是实际上,当内存不足时,即使非持久化的消息和队列也可能会存储到磁盘中。

内存监控

当内存使用超过配置的阈值或者磁盘空间剩余空间对于配置的阈值时,RabbitMQ会暂时阻塞客户端的连接,并且停止接收从客户端发来的消息,以此避免服务器的崩溃,客户端与服务端的心态检测机制也会失效。
https://www.rabbitmq.com/configure.html

当出现警告的时候,可以通过配置去修改和调整

有命令方式和配置方式:

命令方式:

值的设定有两种,一种是相对比例,一种是绝对值。

#这两种二选一即可。
rabbitmqctl set_vm_memory_high_watermark 0.4
rabbitmqctl set_vm_memory_high_watermark absolute 2GB

fraction/value 为内存阈值。默认情况是:0.4/2GB,代表的含义是:当RabbitMQ的内存超过40%或者超过2GB时,就会产生警告并且阻塞所有生产者的连接。通过此命令修改阈值在Broker重启以后将会失效,通过修改配置文件方式设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启broker才会生效,这种命令的方式不需要重启就能生效

配置方式:

#默认
#vm_memory_high_watermark.relative = 0.4
# 使用relative相对值进行设置fraction,建议取值在04~0.7之间,不建议超过0.7.
vm_memory_high_watermark.relative = 0.6
# 使用absolute的绝对值的方式,但是是KB,MB,GB对应的命令如下
vm_memory_high_watermark.absolute = 2GB

磁盘预警

当磁盘的剩余空间低于确定的阈值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务器崩溃。

可以通过设置来调整预警阈值。具体看官网配置,这里知道有这些东西即可。

集群

Clustering Guide — RabbitMQ

RabbitMQ broker往往需要设计成分布式的,而着有三种实现方式::使用集群、使用联合和使用 Shovel 插件。这里主要介绍集群方式。

集群将多台机器连接在一起形成一个集群。 节点间通信对客户端是透明的。 集群的设计假设网络连接相当可靠并提供类似于 LAN 的延迟。集群中的所有节点都必须运行兼容版本的 RabbitMQ 和 Erlang。节点使用通常由部署自动化工具安装的预共享密钥相互验证。虚拟主机、交换器、用户和权限会在集群中的所有节点之间自动复制。 队列可能位于单个节点上,或者复制它们的内容以获得更高的可用性。 仲裁队列是一种现代复制队列类型,专注于数据安全。 可以选择镜像经典队列。连接到集群中任何节点的客户端可以使用集群中的所有非独占队列,即使它们不在该节点上。集群节点有助于提高可用性、队列内容的数据安全性并维持更多并发客户端连接

RabbitMQ 集群是一个或多个节点的逻辑分组,每个节点共享users, virtual hosts, queues, exchanges, bindings、运行时参数和其他分布式状态。

待学习。

八 、RabbitMQ常见面试题

(15条消息) rabbitmq常见面试题_jeffry_ding的博客-CSDN博客_rabbitmq面试题

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-15 15:39:36  更:2021-08-15 15:42:36 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:02:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码