引言
本文是 RabbitMQ in Depth 的学习笔记。
AMQ
AMQ(Advanced Message Queuing) 模型定义了三个抽象组件:
- Exchange—将消息路由到队列的消息代理(broker)组件
- Queue—存储消息的数据结构
- Binding—告诉Exchange消息应该存储在哪个Queue的规则


除了可以绑定队列到Exchange,RabbitMQ实现了AMQP能允许Exchange绑定到其他Exchange。
RabbitMQ如何通信
作为RPC传输的AMQP
开启会话

转换到正确的Channel
AMQP的RPC Frame结构

AMQP的类定义了功能的范围,每个类包含执行不同任务的方法。
在连接建立过程中,RabbitMQ服务断发送了一个Connection.Start 命令,编入一个frame,发送到客户端。
AMQP中frame的组件
frame编码了待传输的信息。

AMQP的frame包括了5个组件:
- Frame类型
- 通道(Channel)编号
- Frame大小(以字节编码的)
- Frame的货物(payload)
- 结束标志
Frame的类型
AMQP规范定义了5中frame:协议头frame、方法frame、消息头frame、消息体(body) frame、心跳frame。
- 协议头frame只会发送一次,只有连接到RabbitMQ的时候才使用
- 方法frame包含要发送或接受到的RPC请求或响应
- 消息头frame包含了消息的大小和属性
- 消息体frame包含消息的内容
将消息编码到frame
当发送一条消息到RabbitMQ,方法、头和消息体frame会被使用。第一个发送的frame是方法frame,它包含需要执行的命令和参数,比如exchange和路由key。接下来是消息frame:包含消息头和消息体。消息头包含消息属性与消息体大小。AMQP有一个最大的frame大小,如果你的消息内容超过了该大小,那么会被分成多个消息体frame发送。
这些frame都以同样的顺序发送:方法frame、消息头和一个或多个消息体frame。

为了传输的效率,方法和消息头frame中的内容以二进制编码。
而消息体frame中的内容可以是纯文本也可以是二进制图像数据。
剖析方法frame
方法frame包含你的RPC的方法和传输所需的参数。在下图中,方法frame包含一个Basic.Publish 命令,包含描述命令的二进制数据和请求参数。
首先的两个字段是描述Basic 类和Publish 方法的数字表示,接着是exchange名称和路由key的字符串值。mandatory 标识描述是否必须路由此消息,或是否发送一个Basic.Return frame来告知此消息无法被路由。

消息头frame
消息头frame告诉RabbitMQ你的消息有多大。同时也包含描述你消息的属性。这些属性作为Basic.Properties 表中的值,但可能为空。

消息体frame
消息体frame是包含实际消息数据的数据结构。根据你的需求,可以使任何格式的数据。

使用协议
在你发送消息到队列之前,需要进行一些配置。
至少需要建立一个exchange和队列,然后绑定它们。
声明一个exchange
Exchange通过Exchange.Declare 命令创建,包含名称、类型和其他元数据。
一旦RabbitMQ接收到此消息,并创建exchange成功,会返回一个Exchange.DeclareOk 方法frame;否则创建失败的话,返回Channel.Close 命令。

声明一个队列
一旦exchange建立,下面需要通过Queue.Declare 命令来创建队列。

绑定队列到exchange
通过Queue.Bind 命令绑定,该命令一次指定指定一个队列。

发布消息
 当发布消息到RabbitMQ,至少需要发送3个frame:Basic.Public 方法frame,一个消息头frame和一个消息体frame
当RabbitMQ收到了一个消息的所有frame之后,它会首先查看方法frame来决定下一步。匹配Basic.Public 方法frame中包含的change名称和路由key。
当消息匹配到任何绑定的队列,RabbitMQ会以FIFO的顺序将此消息入队。但不是将实际的消息入队,而是消息的引用。当RabbitMQ准备传输此消息时,它将使用引用组合需要发送的消息并通过网络发送。这在需要发送到多个队列时提供了重大的优化,当需要发送到多个目的地时仅保留消息的一个实例大大降低了内存的占用。
并且某个队列对该消息的处理不会影响到其他队列。当所有消息的拷贝(引用)被传输或移除后,RabbitMQ会从内存中移除该消息实例。
消费消息
为了消费消息,一个消费者应用需要注册(subscribe)到该队列,通过Basic.Consume 命令。消费者将开始以 Basic.Deliver 命令接收消息。传递消息头frame和消息体frame。

如果消费者想停止接收消息,它需要发送Basic.Cancel 命令。当消费消息时,需要指定一些设置。其中一个是no_ack 参数,当为true 时,RabbitMQ会持续发送消息直到消费者发送了Basic.Cancel 命令或掉线;当为false 时,消费者必须发送Basic.Ack RPC请求来确定每条收到的消息。

用Python编写一个消息生产者
import rabbitpy
url = 'amqp://guest:guest@192.168.89.38:30595/%2F'
connection = rabbitpy.Connection(url)
channel = connection.channel()
exchange = rabbitpy.Exchange(channel, 'chapter2-example')
exchange.declare()
queue = rabbitpy.Queue(channel, 'example')
queue.declare()
queue.bind(exchange, 'example-routing-key')
此时在管理控制台可以看到10条消息。
channel = connection.channel()
queue = rabbitpy.Queue(channel, 'example')
while len(queue) > 0:
message = queue.get()
print('Message')
print(f' ID: {message.properties["message_id"]}')
print(f' Time: {message.properties["timestamp"].isoformat()}')
print(f' Body: {message.body}')
message.ack()
Message
ID: 6c873463-4b2d-42bb-a824-6771a8551af9
Time: 2021-06-19T04:01:23
Body: bytearray(b'Test message 0')
Message
ID: 2e9c7ef1-d2fa-4725-adc6-447572444939
Time: 2021-06-19T04:01:23
Body: bytearray(b'Test message 1')
Message
ID: 4152aa9e-9a82-487d-bf4c-f062061632a2
Time: 2021-06-19T04:01:23
Body: bytearray(b'Test message 2')
Message
ID: 524572b7-1982-4e63-994b-2163d6464519
Time: 2021-06-19T04:01:23
Body: bytearray(b'Test message 3')
Message
ID: 31e282ab-4150-46a4-b56e-e645e8abcb5f
Time: 2021-06-19T04:01:23
Body: bytearray(b'Test message 4')
Message
ID: 1a97d253-cb19-4cbb-b778-f6a562478059
Time: 2021-06-19T04:01:23
Body: bytearray(b'Test message 5')
Message
ID: a78b3fd5-abd0-4f53-87f4-edcb6fa92652
Time: 2021-06-19T04:01:23
Body: bytearray(b'Test message 6')
Message
ID: ef2b9114-8ce5-449c-8283-2ee0f4d893c4
Time: 2021-06-19T04:01:23
Body: bytearray(b'Test message 7')
Message
ID: 7c4e951e-c50e-4806-a710-43435fa2dc5d
Time: 2021-06-19T04:01:23
Body: bytearray(b'Test message 8')
Message
ID: 348109cc-4776-43cc-9ee2-0b756fbefe25
Time: 2021-06-19T04:01:23
Body: bytearray(b'Test message 9')
消息属性
我们会探讨AMQP的Basic.Properties ,是一种每条消息都会携带的数据结构。
使用Properties的属性

在消息头frame中携带的消息属性是一个通过Basic.Properties 预定义的数据结构。

RabbitMQ会使用定义好的的属性来实现消息的特定行为。比如delivery-mode 属性,该属性的值告诉RabbitMQ当消息入队时是否将先消息保存到内存中还是先保存到磁盘中。
AMQP消息属性提供了定义和携带消息元数据的一个好的开端。我们会探讨上图中展示的每个基本的属性:
-
使用 content-type 属性让使用者知道如何解释消息体 -
使用content-encoding 来指示消息体可能以某种特殊的方式被压缩或编码 -
填充消息message-id 和相关 correlation-id 以惟一地标识消息和消息响应,并通过你的工作流跟踪消息 -
利用 timestamp 属性减小消息大小并建立消息创建时间的规范定义 -
使用expiration 属性指定消息过期时间 -
告诉 RabbitMQ 使用delivery-mode 将消息写入磁盘或内存中的队列 -
使用 app-id 和 user-id 来帮助跟踪有问题的消息发布者 -
使用 type 属性定义发布者和使用者的契约 -
使用 reply-to 属性路由应答消息 -
使用headers 表属性来进行自由格式的属性定义和RabbitMQ路由
上面消息的契约指的是消息格式和内容的定义。
使用Content-Type创建显示的消息契约
Basic.Properties 数据结构的content-type 属性指定了每个消息体的数据格式。

通常是类似HTTP协议使用的MIME type,比如JSON格式指定为application/json 。
通过GZIP压缩和Content-Encoding减小消息大小
AMQP的消息默认是没有压缩的。通过指定content-encoding 为gzip ,你的消息发布者可以压缩消息,并且在收到消息时自动解压。

通过Message-id和Correlation-id引用消息

Message-id
是消息的唯一标识。
有些消息,比如登录事件是不需要唯一的消息ID;但有些消息需要,如订单消息。
Correlation-id
保存相关消息的Message-id。
产生时间:Timestamp属性
和message-id 与correlation-id 一样,timestamp 也被指定为“为应用而设计”。

自动过期消息
expiration 属性告诉RabbitMQ未消费的消息什么时候可以被抛弃。有奇怪的是,它是字符串类型的,不过保存的是时间戳(单位秒)。

使用DELIVERY-MODE平衡速度与安全
delivery-mode 的大小为一个字节,2 表示你想在将消息传递个消费者之前优先存储它到磁盘。
存储消息意味着哪怕RabbitMQ服务器重启了,消息还会留在队列中,只要它没有被消费。
delivery-mode 只有两个值:1 表示不存储消息(只是留在内存);2 表示存储消息(存储到磁盘)。

如下图所示,如果你的消息是非存储消息,那么RabbitMQ会使用基于内存的队列。

因为内存的速度是比磁盘快的多的,指定delivery-mode 为1 会使你的消息尽可能的没有延迟。

当指定为2 ,消息会存储在基于磁盘的队列。
通过APP-ID和USER-ID验证消息源
app-id 和user-id 提供了消息可能需要的其他信息。

app-id
app-id 可以保存你应用相关新,比如用来指定你应用的API版本。还可以根据app-id 属性来抛弃未知来源的消息。
user-id
通常使用user-id 保存登录用户信息。
使用消息的TYPE属性获得特定信息
type 属性通常用于指定消息类型名。

REPLY-TO用于动态工作流
reply-to 可能用于指定响应消息的一个私有队列。

使用HEADER属性保存自定义属性
headers 属性是一个键值对表,能保存任意的、用户定义的键和值。键可以使字符串,而值可以是任意有效的AMQP值类型。

PRIORITY属性
priority 定义为从0到9的整数,可以指定队列中消息的优先级。数字越小,优先级越高。

你不能使用的属性:CLUSTER-ID/RESERVED

cluster-id 属性已经被移除,并且改名为reserved ,同时它应该为空。你不应该使用它。
总结
Property | Type | For use by | Suggested or specified use |
---|
app-id | short-string | Application | Useful for defining the application publishing the messages. | content-encoding | short-string | Application | Specify whether your message body is encoded in some special way, such as zlib, deflate, or Base64. | content-type | short-string | Application | Specify the type of the message body using mime-types. | correlation-id | short-string | Application | If the message is in reference to some other message or uniquely identifiable item, the correlation-id is a good way to indicate what the message is referencing. | delivery-mode | octet | RabbitMQ | A value of 1 tells RabbitMQ it can keep the message in memory; 2 indicates it should also write it to disk. | expiration | short-string | RabbitMQ | An epoch or Unix timestamp value as a text string that indicates when the message should expire. | headers | table | Both | A free-form key/value table that you can use to add additional metadata about your message; RabbitMQ can route based upon this if desired. | message-id | short-string | Application | A unique identifier such as a UUID that your application can use to identify the message. | priority | octet | RabbitMQ | A property for priority ordering in queues. | timestamp | timestamp | Application | An epoch or Unix timestamp value that can be used to indicate when the message was created. | type | short-string | Application | A text string your application can use to describe the message type or payload. | user-id | short-string | Both | A free-form string that, if used, RabbitMQ will validate against the connected user and drop messages if they don’t match. |
性能权衡
平衡传送速度与可靠传输

使用传输保证机制可以保障传输的可靠性,但同时也会降低传输的速度。下面的这些问题能帮助你找到高性能与可靠性的一个平衡。
-
保证消息在发布后加入队列有多重要? -
如果消息不能路由,是否应该将其返回给发布者? -
如果消息不能被路由,是否应该将其发送到其他地方以便日后能够调和? -
当 RabbitMQ 服务器崩溃时,消息丢失可以吗? -
RabbitMQ 在处理新消息时是否应该确认它已经向发布者执行了所有请求的路由和持久性任务? -
发布者是否应该能够批量消息传递,然后接收来自 RabbitMQ 的确认,即所有请求的路由和持久性任务已应用于批量中的所有消息? -
如果要批量发布需要确认路由和持久性的消息,是否需要对消息的目标队列进行真正的原子提交? -
在可靠的交付方面是否存在可接受的权衡,发布者可以使用这些权衡来实现更高的性能和消息吞吐量? -
消息发布的其他方面会影响消息吞吐量和性能吗?
设置mandatory让RabbitMQ不接收不可路由的消息
如果你需要服务器监控数据在收集之前总是路由到 RabbitMQ,那么所有 collectd 需要做的就是告诉 RabbitMQ 发布的消息是强制性的(mandatory=True )。mandatory 标志随着Basic.Publish RPC命令发布,告诉RabbitMQ如果某个消息不能路由,它应该通过一个Basic.Return RPC返回该消息给发布者。

import rabbitpy,datetime
url = 'amqp://guest:guest@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
body = 'server.cpu.utilization 25.5 1350884514'
message = rabbitpy.Message(channel,
body,
{'content_type': 'text/plain',
'timestamp': datetime.datetime.now(),
'message_type': 'graphite metric'})
message.publish('chapter2-example',
'server-metrics',
mandatory=True)
当执行这段代码后,会收到类似下面的异常。Basic.Return 是异步的,在消息发送后随时可能被调用。
Message was returned by RabbitMQ: (312) for exchange NO_ROUTE
下面我们为它加上异常捕获代码。
import rabbitpy,datetime
url = 'amqp://guest:guest@192.168.89.38:30595/%2F'
import datetime
import rabbitpy
connection = rabbitpy.Connection(url)
try:
with connection.channel() as channel:
properties = {'content_type': 'text/plain',
'timestamp': datetime.datetime.now(),
'message_type': 'graphite metric'}
body = 'server.cpu.utilization 25.5 1350884514'
message = rabbitpy.Message(channel, body, properties)
message.publish('chapter2-example',
'server-metrics',
mandatory=True)
except rabbitpy.exceptions.MessageReturnedException as error:
print('Publish failure: %s' % error)
输出:
Publish failure: Message was returned by RabbitMQ: (312) for exchange NO_ROUTE
发布确认作为事务的轻量级替代品
发布确认(Publisher Confirms)是RabbitMQ对AMQP规约的增强。在发布任何消息之前,消息发布者必须发出Confirm.Select RPC 请求到RabbitMQ 并等待 Confirm.SelectOk 响应以获取传输确认是否被启用。此时,对于发布者发送给 RabbitMQ 的每条消息,服务器将返回一个ack响应(Basic.Ack )或否定ack响应(Basic.Nack ) ,或者包含一个整数值,该值指定它正在确认的消息的偏移量。确认值按照在 Confirm.select RPC 请求之后接收消息的顺序引用该消息。

当发布者发布的消息被消费后,或需要持久化的消息入队并且持久化,它就会收到一个Basic.Ack 请求。如果某条消息不能被路由,broker 会发送一个Basic.Nack RPC表示这种错误。此时由发布者决定接下来该怎么做。
import rabbitpy,datetime
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
exchange = rabbitpy.Exchange(channel, 'chapter4-example')
exchange.declare()
channel.enable_publisher_confirms()
body = 'This is an important message'
message = rabbitpy.Message(channel,
body,
{'content_type': 'text/plain',
'message_type': 'very important'})
if message.publish('chapter4-example','important.message'):
print('The message was confirmed')
为不能路由的消息使用备用的exchange

为了使用备用的exchange,首先需要建立一个exchange做为备用。当主要的exchange被建立后,通过增加alternate-exchange 参数到Exchange.Declare 指定该备用的exchange。下面用代码描述这个过程。
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
my_ae = rabbitpy.Exchange(channel, 'my_ae')
my_ae.declare()
args = {'alternate-exchange': my_ae.name}
exchange = rabbitpy.Exchange(channel,
'graphite',
exchange_type='topic',
arguments=args)
exchange.declare()
queue = rabbitpy.Queue(channel, 'unroutable-messages')
queue.declare()
if queue.bind(my_ae, '#'):
print('Queue bound to alternate-exchange')
带事务的批处理
AMQP事务,或TX,提供了一个机制,消息能批量的发布到RabbitMQ,然后提交(commit)到队列或回滚。下面的例子,展示了编写事务的代码是相当简单。
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
tx = rabbitpy.Tx(channel)
tx.select()
message = rabbitpy.Message(channel,
'This is an important message',
{'content_type': 'text/plain',
'delivery_mode': 2,
'message_type': 'important'})
message.publish('chapter4-example', 'important.message')
try:
if tx.commit():
print('Transaction committed')
except rabbitpy.exceptions.NoActiveTransactionError:
print('Tried to commit without active transaction')
为了开始一个事务,发布者发送一个TX.Select RPC请求到RabbitMQ,然后RabbitMQ会返回一个TX.SelectOk 响应。一旦事务被开启,发布者就可以发送一条或多条消息到RabbitMQ。

使用高可用队列避免节点故障
高可用队列(Highly avilable,HA)是RabbitMQ的增强,并不是AMQP的规约,它允许队列有跨RaibbtMQ集群的冗余的副本。
下面,我们建立一个新的队列,该队列跨越 RabbitMQ 集群中的每个节点。
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
try:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel,
'my-ha-queue',
arguments={'x-ha-policy': 'all'})
if queue.declare():
print('Queue declared')
except rabbitpy.exceptions.RemoteClosedChannelException as error:
print('Queue declare failed: %s' % error)
当一条消息被发送到HA队列,该消息会发送到集群中的负责HA队列的每个服务器。一旦该消息被集群中任意节点确认,集群中其他节点的消息副本会立即从各自队列中移除。

HA队列的事务
如果你正在使用事务或传输确认机制,RabbitMQ不会发送一个成功响应,直到某条消息被HA队列中所有活动节点确认。这会产生一定延迟。
设置delivery-mode来存储消息到磁盘
delivery-mode 是AMQP 规约指定的一个Basic.Properties 属性。delivery-mode 默认为1,意味着不需要存储消息到磁盘,此时,如果RabbitMQ重启,这些未消费的消息就丢失了。
如果设成2,RabbitMQ会确保消息会保存到磁盘。这会确保如果重启,消息不会丢失。
除了将delivery-mode 设成2 ,还有一种方法能保证重启后消息存在,就是将队列声明为durable=True 。
RabbitMQ将消息存储到磁盘,然后通过引用追踪它们,直到它们不在任何队列中。一旦消息的所有引用都从队列中移除,RabbitMQ才会将消息从磁盘移除。

但是要注意的是,这可能会大大降低处理速度。

什么时候RabbitMQ返回消息
在RabbitMQ 3.2中,RabbitMQ扩展了AMQP规约,增加了通知机制,当某个连接的阈值触发后,会通知客户端连接已经被阻塞了。Connection.Blocked 和Connection.Unblocked 是异步方法,可以随时通知客户端。
用rabbitpy检查连接状态
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
print('Connection is Blocked? %s' % connection.blocked)
Connection is Blocked? False
不要Get,而是Consume
消息消费者应用通常用语接收并处理消息。比如,你通过RabbitMQ实现RPC模式,发布RPC请求的应用同时也可以消费RPC响应。

BASIC.GET VS BASIC.CONSUME
RabbitMQ实现了两种不同的AMQP RPC命令来从队列中检索消息:Basic.Get 和Basic.Consume 。Basic.Get 并不是检索消息理想方式。Basic.Get 是一种轮询模型,而Basic.Consume 是一种推送模型。
Basic.Get
当应用程序使用 Basic.Get 请求检索消息时,即使队列中有多个消息,它每次想要接收消息时也必须发送一个新请求。如果你正在检索消息的队列有一条消息,RabbitMQ返回Basic.GetOk RPC响应。

如果当你发送一个Basic.Get RPC请求时存在一条可用的消息,RabbitMQ会响应Basic.GetOk 和该条消息。
如果队列中没有消息,那么会响应Basic.GetEmpty 。

如果没有消息可用,RabbitMQ会返回Basic.Emtpy
当使用 Basic.Get 时。你的应用程序需要评估来自 RabbitMQ 的 RPC 响应,以确定是否已经接收到消息。对于大多数从 RabbitMQ 接收消息的长时间运行的程序来说,这不是接收和处理消息的有效方法。
首先运行以下代码产生消息:
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
for iteration in range(10):
rabbitpy.publish(url,'', 'test-messages', 'go')
rabbitpy.publish(url,'', 'test-messages', 'stop')
下面看一下使用Basic.Get 消费消息的例子:
import rabbitpy
url = 'amqp://guest:guest@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'test-messages')
queue.declare()
while True:
message = queue.get()
if message:
message.pprint()
message.ack()
if message.body == 'stop':
break
Basic.Consume
通过Basic.Consume RPC命令消费消息,你会注册你的应用到RabbitMQ,然后告诉它异步地发送消息到你的消费者上。

当客户端发送Basic.Consume ,RabbitMQ发送消息给客户端只要有消息可用,直到客户端发送Basic.Cancel 命令
而且当你的应用收到消息时,也不需要评估此条消息是否为空响应。
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
for message in rabbitpy.consume(url,'test-messages'):
message.pprint()
message.ack()
当你的应用发起Basic.Consume ,一个唯一的字符串被创建来标志RabbitMQ上开启的通道上的应用。该字符串被称为消费者标签,会随着每条消息发送给你的应用。
此消费者标签可用于取消从RabbitMQ中接收消息,只要发送一个Basic.Cancel 命令即可。然后,一般客户端library都能处理此标签的发送。
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
for message in rabbitpy.Queue(channel, 'test-messages'):
message.pprint()
message.ack()
if message.body == b'stop':
break
消费者性能调优
当发布消息后,在消费消息时需要对吞吐量和可靠传输做一个均衡考量。如下图所示,有一些可选项可用于提升消息传输的速度。但消息速度上来后,消息传输的可靠性就会降低。

使用no-ack模式来提升吞吐量
当要消费消息时,你的应用注册自己到RabbitMQ。会发送一个Basic.Consume 请求,同时包含no-ack 标志。当此标志为True ,这会告诉RabbitMQ你的消费者不会确认收到的消息,因此RabbitMQ应该尽可能快的发送消息。
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'test-messages')
for message in queue.consume_messages(no_ack=True):
message.pprint()
虽然这种方式发送消息很卡,但是在可靠传输上面就会有所欠缺。要知道为什么,在消费者应用程序接收到消息之前,考虑消息必须经过的每个步骤是很重要的。

当RabbitMQ在开启的连接上发送消息,它通过TCP socket连接与客户端通信。如果该连接被开启且可写,RabbitMQ假设一切都在正常工作状态,并且消息已经传递。如果存在网络错误,RabbitMQ会收到socker 错误信息并知道有网络问题存在。如果没有收到错误信息,RabbitMQ就会假设消息都已经被传输了。客户单通过发送Basic.Ack RPC到RabbitMQ,RabbitMQ就知道客户端成功收到了消息,同时很可能已经处理了该消息。
如果关闭消息确认,RabbitMQ就会发送另一条消息而不会等待确认。这样,只要有足够的消息,RabbitMQ就会持续地发送消息到消费者,直到socket缓冲区被填满。
正是因为不需要等待消费消息的确认信息,才能达到最高的吞吐量。对于一次性的消息来说,这是最理想的方式来提高消息速度,但是并不意味着没有重大风险。考虑当一个消费者应用崩溃会发生什么,假设此时仍然还有1KB的消息在操作系统的socket接收缓冲区。
RabbitMQ认为已经发送了这些消息,并且不会得知当应用奔溃然后socket关闭后已经消费了多少消息。
如果这种消费消息的方法不适合你的应用架构,但是你需要更快的消息吞吐量而不是某个消息被发送然后被确认。
通过服务质量设置控制消费者预取
AMQP 规范要求channel有服务质量(QoS)设置,其中消费者可以在确认这些消息之前,设置待接收的特定消息数量。Qos设置可以让RabbitMQ更有效的发送消息,通过指定为消费者预分配多少消息。
与消费者禁用ack不同,如果你的消费者应用在确认消息之前崩溃了,所有预拉取的消息会在socket关闭时返回到队列之中。
在协议级别,在channel上发送一个Basic.QoS RPC请求来指定QoS。你可以指定该Qos设置是应用于此channel上还是连接打开的所有channel上。Basic.QoS RPC请求能在任何时候方法,但通过在消费者发送Basic.Consume 请求之前。
在下图中以单个消费者为基准,显示此时,预设的prefetch 数量为2500是对于高峰期消息速度而言是最佳的设置。

使用QoS设置的一个好处之一是你不必确认收到的每条消息。Basic.Ack RPC响应有一个叫作multiple 的属性,当为True 时,会让RabbitMQ知道你的应用想确认所有之前未确认的消息。
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
channel.prefetch_count(10)
unacknowledged =0
for message in rabbitpy.Queue(channel, 'test-messages'):
message.pprint()
unacknowledged += 1
if unacknowledged == 10:
message.ack(all_previous=True)
unacknowledged = 10
使用事务
事务能允许你的应用提交和回滚批量的操作。但事务对于消息的吞吐量有一个不好的影响,除了一个例外。如果你没有使用QoS设置,你可能在使用事务后,会发现有一点性能的提升。

事务在禁用ack后失效
拒绝消息
确认消息时一种很好的方式去确保RabbitMQ知道消费者已经收到并处理了某个消息,但是如果遇到了某个问题呢?可能是消息本身或处理消息时。此时,RabbitMQ提供了两种机制让消息回到broker:Basic.Reject 和Basic.Nack 。

Basic.Reject
Basic.Reject 是一个AMQP规定的RPC响应来告知broker某消息不能被处理。当消费者拒绝消息时,可以让RabbitMQ丢弃该消息或通过requeue 标志让消息重新入队。
下面的代码,展示了一个消息时如何重新入队的,redelivered 标志告知下一个消费者该消息已经被传递过。
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
for message in rabbitpy.consume(url,'test-messages'):
message.pprint()
print('Redelivered: %s' % message.redelivered)
message.reject(True)
和Basic.Ack 一样,如果某消息没有设置no-ack 并且传递后,使用Basic.Reject 会释放该消息的引用。
使用Basic.Rejct 一次只能拒绝一个消息。
Basic.Nack
RabbitMQ实现了一个AMQP规约没有的新响应,叫Basic.Nack 。它能一次拒绝多个消息。
死信队列
RabbitMQ的死信队列(dead-letter exchange,DLX)也是对AMQP的扩展,它是在拒绝一个传输过的消息是可设置的可选项。当想知道为什么消费此类消息时会遇到问题DLK就会很有用。
DLX也是一个普通的exchange。让某个exchange成为DLX只要做一件事,就是在创建队列时,让该exchange用于拒绝的消息。一旦拒绝某个消息,并且它没有重新入队,RabbitMQ会路由该消息到队列的 x-dead-letter-exchange 参数中指定的exchange。

在声明队列时指定死信队列相当简单。只需在创建队列对象时将 exchange 名称作为dead_letter_exchange 参数传入,或在发送Queue.Declare 请求时将其作为 x-dead-letter-exchange 参数传入。
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
rabbitpy.Exchange(channel, 'rejected-messages').declare()
queue = rabbitpy.Queue(channel, 'dlx-example',
dead_letter_exchange='rejected-messages')
queue.declare()
队列控制
当定义一个队列时,可以很多设置来决定该队列的行为。比如队列可以做下面的事情:
- 删除自己
- 只允许一个消费者消费
- 让消息过期
- 限定消息数量
- 移除旧消息
要注意的是,队列的设置是不可修改的,一旦你创建了一个队列,你后面就无法修改它的设置。除非删除重建。
临时队列
就像《碟中谍》里汤姆克鲁斯的公文包一样,RabbitMQ提供了能删除自己的队列,只要该队列被使用过并且不再需要时。这些队列能在创建时能填充一些消息,一旦消费者连接上来,检索了这些消息,然后断开连接,这些队列就会自动删除。
通过在Queue.Declare RPC请求中设置auto-delete 标志为True 可以很容易的创建这些自删除队列。
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'ad-example',auto_delete=True)
queue.declare()
注意的是,任意数量的消费者可以从自删除队列中消费,只有当没有消费者连接它时,该队列才会删除自己。
永久队列
如果你想让某个队列在RabbitMQ重启后仍然存在,那么可以设置durable 标志为True 。可能会和持久消息混淆。持久消息通过设置delivery-mode 为2 。而durable 标志,告诉RabbitMQ你想让队列一直存在,直到Queue.Delete 请求被调用。
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'durable-queue',durable=True)
if queue.declare():
print('Queue declared')
对于那些不重要的消息,你可能想让它们经过一段时间后自动被抛弃。可以设置消息的TTL 值,指定消息的最大存活时间。每个消息的过期时间可以不同。
与设置消息的过期属性相反,x-message-ttl 队列设置可以队列中所有消息的过期时间。
import rabbitpy
url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'expiring-msg-queue',arguments={'x-message-ttl':10000})
if queue.declare():
print('Queue declared')
在RabbitMQ 3.1.0后,队列可以设置最大消息数量x-max-length ,一旦队列里面的元素达到最大值,RabbitMQ会丢掉这些消息。
|