IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ学习笔记 -> 正文阅读

[大数据]RabbitMQ学习笔记

引言

本文是 RabbitMQ in Depth 的学习笔记。

AMQ

AMQ(Advanced Message Queuing) 模型定义了三个抽象组件:

  • Exchange—将消息路由到队列的消息代理(broker)组件
  • Queue—存储消息的数据结构
  • Binding—告诉Exchange消息应该存储在哪个Queue的规则

img

img

除了可以绑定队列到Exchange,RabbitMQ实现了AMQP能允许Exchange绑定到其他Exchange。

RabbitMQ如何通信

作为RPC传输的AMQP

开启会话

img

转换到正确的Channel

AMQP的RPC Frame结构

img

AMQP的类定义了功能的范围,每个类包含执行不同任务的方法。

在连接建立过程中,RabbitMQ服务断发送了一个Connection.Start命令,编入一个frame,发送到客户端。

AMQP中frame的组件

frame编码了待传输的信息。

img

AMQP的frame包括了5个组件:

  • Frame类型
  • 通道(Channel)编号
  • Frame大小(以字节编码的)
  • Frame的货物(payload)
  • 结束标志

Frame的类型

AMQP规范定义了5中frame:协议头frame、方法frame、消息头frame、消息体(body) frame、心跳frame。

  • 协议头frame只会发送一次,只有连接到RabbitMQ的时候才使用
  • 方法frame包含要发送或接受到的RPC请求或响应
  • 消息头frame包含了消息的大小和属性
  • 消息体frame包含消息的内容

将消息编码到frame

当发送一条消息到RabbitMQ,方法、头和消息体frame会被使用。第一个发送的frame是方法frame,它包含需要执行的命令和参数,比如exchange和路由key。接下来是消息frame:包含消息头和消息体。消息头包含消息属性与消息体大小。AMQP有一个最大的frame大小,如果你的消息内容超过了该大小,那么会被分成多个消息体frame发送。

这些frame都以同样的顺序发送:方法frame、消息头和一个或多个消息体frame。

img

为了传输的效率,方法和消息头frame中的内容以二进制编码。

而消息体frame中的内容可以是纯文本也可以是二进制图像数据。

剖析方法frame

方法frame包含你的RPC的方法和传输所需的参数。在下图中,方法frame包含一个Basic.Publish命令,包含描述命令的二进制数据和请求参数。

首先的两个字段是描述Basic类和Publish方法的数字表示,接着是exchange名称和路由key的字符串值。mandatory标识描述是否必须路由此消息,或是否发送一个Basic.Returnframe来告知此消息无法被路由。

img

消息头frame

消息头frame告诉RabbitMQ你的消息有多大。同时也包含描述你消息的属性。这些属性作为Basic.Properties表中的值,但可能为空。

img

消息体frame

消息体frame是包含实际消息数据的数据结构。根据你的需求,可以使任何格式的数据。

img

使用协议

在你发送消息到队列之前,需要进行一些配置。

至少需要建立一个exchange和队列,然后绑定它们。

声明一个exchange

Exchange通过Exchange.Declare命令创建,包含名称、类型和其他元数据。

一旦RabbitMQ接收到此消息,并创建exchange成功,会返回一个Exchange.DeclareOk方法frame;否则创建失败的话,返回Channel.Close命令。

img

声明一个队列

一旦exchange建立,下面需要通过Queue.Declare命令来创建队列。

img

绑定队列到exchange

通过Queue.Bind命令绑定,该命令一次指定指定一个队列。

img

发布消息

img
当发布消息到RabbitMQ,至少需要发送3个frame:Basic.Public方法frame,一个消息头frame和一个消息体frame

当RabbitMQ收到了一个消息的所有frame之后,它会首先查看方法frame来决定下一步。匹配Basic.Public方法frame中包含的change名称和路由key。

当消息匹配到任何绑定的队列,RabbitMQ会以FIFO的顺序将此消息入队。但不是将实际的消息入队,而是消息的引用。当RabbitMQ准备传输此消息时,它将使用引用组合需要发送的消息并通过网络发送。这在需要发送到多个队列时提供了重大的优化,当需要发送到多个目的地时仅保留消息的一个实例大大降低了内存的占用。

并且某个队列对该消息的处理不会影响到其他队列。当所有消息的拷贝(引用)被传输或移除后,RabbitMQ会从内存中移除该消息实例。

消费消息

为了消费消息,一个消费者应用需要注册(subscribe)到该队列,通过Basic.Consume命令。消费者将开始以 Basic.Deliver命令接收消息。传递消息头frame和消息体frame。

img

如果消费者想停止接收消息,它需要发送Basic.Cancel命令。当消费消息时,需要指定一些设置。其中一个是no_ack参数,当为true时,RabbitMQ会持续发送消息直到消费者发送了Basic.Cancel命令或掉线;当为false时,消费者必须发送Basic.AckRPC请求来确定每条收到的消息。

img

用Python编写一个消息生产者

import rabbitpy 

# rabbitmq             NodePort    10.43.114.17    <none>        5672:30595/TCP,15672:31847/TCP,15692:32074/TCP   15h
url = 'amqp://guest:guest@192.168.89.38:30595/%2F'
connection = rabbitpy.Connection(url)
channel = connection.channel()
# 声明一个exchange,指定channel名称
exchange = rabbitpy.Exchange(channel, 'chapter2-example')
# 使用declare方法来发送Exchange.Declare命令
exchange.declare()
# 建立队列,指定队列名称
queue = rabbitpy.Queue(channel, 'example')
# 发送Queue.Declare命令,会返回(此队列中的消息数,此队列的消费者数)
queue.declare() 
# 绑定队列到exchange
# 会发送Queue.Bind命令,传入exchange和路由key
queue.bind(exchange, 'example-routing-key')

# 下面就可以发送消息了for message_number in range(10):    message = rabbitpy.Message(channel, # 指定channel                               f'Test message {message_number}', # 消息体                              {'content_type':'text/plain'}, # 消息属性(字典)                              opinionated=True)    # 创建Basic.Public方法帧(frame),消息头帧,和一个消息体帧,然后传输到RabbitMQ    message.publish(exchange, 'example-routing-key')

此时在管理控制台可以看到10条消息。

# 下面来消费这些消息

channel = connection.channel()
queue = rabbitpy.Queue(channel, 'example')

while len(queue) > 0:
    message = queue.get()
    print('Message')
    print(f' ID: {message.properties["message_id"]}')
    print(f' Time: {message.properties["timestamp"].isoformat()}')
    print(f' Body: {message.body}')
    message.ack() # 确认消息
    Message
     ID: 6c873463-4b2d-42bb-a824-6771a8551af9
     Time: 2021-06-19T04:01:23
     Body: bytearray(b'Test message 0')
    Message
     ID: 2e9c7ef1-d2fa-4725-adc6-447572444939
     Time: 2021-06-19T04:01:23
     Body: bytearray(b'Test message 1')
    Message
     ID: 4152aa9e-9a82-487d-bf4c-f062061632a2
     Time: 2021-06-19T04:01:23
     Body: bytearray(b'Test message 2')
    Message
     ID: 524572b7-1982-4e63-994b-2163d6464519
     Time: 2021-06-19T04:01:23
     Body: bytearray(b'Test message 3')
    Message
     ID: 31e282ab-4150-46a4-b56e-e645e8abcb5f
     Time: 2021-06-19T04:01:23
     Body: bytearray(b'Test message 4')
    Message
     ID: 1a97d253-cb19-4cbb-b778-f6a562478059
     Time: 2021-06-19T04:01:23
     Body: bytearray(b'Test message 5')
    Message
     ID: a78b3fd5-abd0-4f53-87f4-edcb6fa92652
     Time: 2021-06-19T04:01:23
     Body: bytearray(b'Test message 6')
    Message
     ID: ef2b9114-8ce5-449c-8283-2ee0f4d893c4
     Time: 2021-06-19T04:01:23
     Body: bytearray(b'Test message 7')
    Message
     ID: 7c4e951e-c50e-4806-a710-43435fa2dc5d
     Time: 2021-06-19T04:01:23
     Body: bytearray(b'Test message 8')
    Message
     ID: 348109cc-4776-43cc-9ee2-0b756fbefe25
     Time: 2021-06-19T04:01:23
     Body: bytearray(b'Test message 9')

消息属性

我们会探讨AMQP的Basic.Properties,是一种每条消息都会携带的数据结构。

使用Properties的属性

img

在消息头frame中携带的消息属性是一个通过Basic.Properties预定义的数据结构。

img

RabbitMQ会使用定义好的的属性来实现消息的特定行为。比如delivery-mode属性,该属性的值告诉RabbitMQ当消息入队时是否将先消息保存到内存中还是先保存到磁盘中。

AMQP消息属性提供了定义和携带消息元数据的一个好的开端。我们会探讨上图中展示的每个基本的属性:

  • 使用 content-type 属性让使用者知道如何解释消息体

  • 使用content-encoding来指示消息体可能以某种特殊的方式被压缩或编码

  • 填充消息message-id和相关 correlation-id以惟一地标识消息和消息响应,并通过你的工作流跟踪消息

  • 利用 timestamp 属性减小消息大小并建立消息创建时间的规范定义

  • 使用expiration属性指定消息过期时间

  • 告诉 RabbitMQ 使用delivery-mode将消息写入磁盘或内存中的队列

  • 使用 app-iduser-id 来帮助跟踪有问题的消息发布者

  • 使用 type 属性定义发布者和使用者的契约

  • 使用 reply-to 属性路由应答消息

  • 使用headers表属性来进行自由格式的属性定义和RabbitMQ路由

上面消息的契约指的是消息格式和内容的定义。

使用Content-Type创建显示的消息契约

Basic.Properties数据结构的content-type属性指定了每个消息体的数据格式。

img

通常是类似HTTP协议使用的MIME type,比如JSON格式指定为application/json

通过GZIP压缩和Content-Encoding减小消息大小

AMQP的消息默认是没有压缩的。通过指定content-encodinggzip,你的消息发布者可以压缩消息,并且在收到消息时自动解压。

img

通过Message-id和Correlation-id引用消息

img

Message-id

是消息的唯一标识。

有些消息,比如登录事件是不需要唯一的消息ID;但有些消息需要,如订单消息。

Correlation-id

保存相关消息的Message-id。

产生时间:Timestamp属性

message-idcorrelation-id一样,timestamp也被指定为“为应用而设计”。

img

自动过期消息

expiration属性告诉RabbitMQ未消费的消息什么时候可以被抛弃。有奇怪的是,它是字符串类型的,不过保存的是时间戳(单位秒)。

img

使用DELIVERY-MODE平衡速度与安全

delivery-mode的大小为一个字节,2表示你想在将消息传递个消费者之前优先存储它到磁盘。

存储消息意味着哪怕RabbitMQ服务器重启了,消息还会留在队列中,只要它没有被消费。

delivery-mode只有两个值:1表示不存储消息(只是留在内存);2表示存储消息(存储到磁盘)。

img

如下图所示,如果你的消息是非存储消息,那么RabbitMQ会使用基于内存的队列。

在这里插入图片描述

因为内存的速度是比磁盘快的多的,指定delivery-mode1会使你的消息尽可能的没有延迟。

img

当指定为2,消息会存储在基于磁盘的队列。

通过APP-ID和USER-ID验证消息源

app-iduser-id提供了消息可能需要的其他信息。

img

app-id

app-id可以保存你应用相关新,比如用来指定你应用的API版本。还可以根据app-id属性来抛弃未知来源的消息。

user-id

通常使用user-id保存登录用户信息。

使用消息的TYPE属性获得特定信息

type属性通常用于指定消息类型名。

img

REPLY-TO用于动态工作流

reply-to可能用于指定响应消息的一个私有队列。

img

使用HEADER属性保存自定义属性

headers属性是一个键值对表,能保存任意的、用户定义的键和值。键可以使字符串,而值可以是任意有效的AMQP值类型。

img

PRIORITY属性

priority定义为从0到9的整数,可以指定队列中消息的优先级。数字越小,优先级越高。

img

你不能使用的属性:CLUSTER-ID/RESERVED

img

cluster-id属性已经被移除,并且改名为reserved,同时它应该为空。你不应该使用它。

总结

PropertyTypeFor use bySuggested or specified use
app-idshort-stringApplicationUseful for defining the application publishing the messages.
content-encodingshort-stringApplicationSpecify whether your message body is encoded in some special way, such as zlib, deflate, or Base64.
content-typeshort-stringApplicationSpecify the type of the message body using mime-types.
correlation-idshort-stringApplicationIf the message is in reference to some other message or uniquely identifiable item, the correlation-id is a good way to indicate what the message is referencing.
delivery-modeoctetRabbitMQA value of 1 tells RabbitMQ it can keep the message in memory; 2 indicates it should also write it to disk.
expirationshort-stringRabbitMQAn epoch or Unix timestamp value as a text string that indicates when the message should expire.
headerstableBothA free-form key/value table that you can use to add additional metadata about your message; RabbitMQ can route based upon this if desired.
message-idshort-stringApplicationA unique identifier such as a UUID that your application can use to identify the message.
priorityoctetRabbitMQA property for priority ordering in queues.
timestamptimestampApplicationAn epoch or Unix timestamp value that can be used to indicate when the message was created.
typeshort-stringApplicationA text string your application can use to describe the message type or payload.
user-idshort-stringBothA free-form string that, if used, RabbitMQ will validate against the connected user and drop messages if they don’t match.

性能权衡

平衡传送速度与可靠传输

img

使用传输保证机制可以保障传输的可靠性,但同时也会降低传输的速度。下面的这些问题能帮助你找到高性能与可靠性的一个平衡。

  • 保证消息在发布后加入队列有多重要?

  • 如果消息不能路由,是否应该将其返回给发布者?

  • 如果消息不能被路由,是否应该将其发送到其他地方以便日后能够调和?

  • 当 RabbitMQ 服务器崩溃时,消息丢失可以吗?

  • RabbitMQ 在处理新消息时是否应该确认它已经向发布者执行了所有请求的路由和持久性任务?

  • 发布者是否应该能够批量消息传递,然后接收来自 RabbitMQ 的确认,即所有请求的路由和持久性任务已应用于批量中的所有消息?

  • 如果要批量发布需要确认路由和持久性的消息,是否需要对消息的目标队列进行真正的原子提交?

  • 在可靠的交付方面是否存在可接受的权衡,发布者可以使用这些权衡来实现更高的性能和消息吞吐量?

  • 消息发布的其他方面会影响消息吞吐量和性能吗?

设置mandatory让RabbitMQ不接收不可路由的消息

如果你需要服务器监控数据在收集之前总是路由到 RabbitMQ,那么所有 collectd 需要做的就是告诉 RabbitMQ 发布的消息是强制性的(mandatory=True)。mandatory标志随着Basic.PublishRPC命令发布,告诉RabbitMQ如果某个消息不能路由,它应该通过一个Basic.ReturnRPC返回该消息给发布者。

img

import rabbitpy,datetime

url = 'amqp://guest:guest@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    with connection.channel() as channel:
        body = 'server.cpu.utilization 25.5 1350884514'
        message = rabbitpy.Message(channel,                           
                                   body,                          
                                   {'content_type': 'text/plain',
                                   'timestamp': datetime.datetime.now(),
                                    'message_type': 'graphite metric'})
        message.publish('chapter2-example',                               
                        'server-metrics',             
                        mandatory=True)

当执行这段代码后,会收到类似下面的异常。Basic.Return是异步的,在消息发送后随时可能被调用。

Message was returned by RabbitMQ: (312) for exchange NO_ROUTE

下面我们为它加上异常捕获代码。

import rabbitpy,datetime

url = 'amqp://guest:guest@192.168.89.38:30595/%2F'

import datetime
import rabbitpy

connection = rabbitpy.Connection(url)                                
try:
    with connection.channel() as channel:                         
        properties = {'content_type': 'text/plain',              
                      'timestamp': datetime.datetime.now(), 
                      'message_type': 'graphite metric'}
        body = 'server.cpu.utilization 25.5 1350884514'          
        message = rabbitpy.Message(channel, body, properties)     
        message.publish('chapter2-example',                       
                        'server-metrics',
                        mandatory=True)
except rabbitpy.exceptions.MessageReturnedException as error:     
    print('Publish failure: %s' % error)            

输出:

Publish failure: Message was returned by RabbitMQ: (312) for exchange NO_ROUTE

发布确认作为事务的轻量级替代品

发布确认(Publisher Confirms)是RabbitMQ对AMQP规约的增强。在发布任何消息之前,消息发布者必须发出Confirm.Select RPC 请求到RabbitMQ 并等待 Confirm.SelectOk 响应以获取传输确认是否被启用。此时,对于发布者发送给 RabbitMQ 的每条消息,服务器将返回一个ack响应(Basic.Ack)或否定ack响应(Basic.Nack) ,或者包含一个整数值,该值指定它正在确认的消息的偏移量。确认值按照在 Confirm.select RPC 请求之后接收消息的顺序引用该消息。

img

当发布者发布的消息被消费后,或需要持久化的消息入队并且持久化,它就会收到一个Basic.Ack请求。如果某条消息不能被路由,broker会发送一个Basic.NackRPC表示这种错误。此时由发布者决定接下来该怎么做。

import rabbitpy,datetime

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    with connection.channel() as channel:
        exchange = rabbitpy.Exchange(channel, 'chapter4-example')
        exchange.declare()
        channel.enable_publisher_confirms() # 开启发布确认
        body = 'This is an important message'
        message = rabbitpy.Message(channel,                           
                                   body,                          
                                   {'content_type': 'text/plain',
                                    'message_type': 'very important'})
        if message.publish('chapter4-example','important.message'):
            print('The message was confirmed')

为不能路由的消息使用备用的exchange

img

为了使用备用的exchange,首先需要建立一个exchange做为备用。当主要的exchange被建立后,通过增加alternate-exchange参数到Exchange.Declare指定该备用的exchange。下面用代码描述这个过程。

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    with connection.channel() as channel:
        my_ae = rabbitpy.Exchange(channel, 'my_ae') # 首先创建备份exchange
        my_ae.declare()
        
        args = {'alternate-exchange': my_ae.name}   # 指定参数为备份exchange的名称               

        exchange = rabbitpy.Exchange(channel,                     
                                     'graphite', 
                                     exchange_type='topic', 
                                     arguments=args)  # 通过arguments指定
        exchange.declare()                                        

        queue = rabbitpy.Queue(channel, 'unroutable-messages')    # 为不可路由消息创建一个队列
        queue.declare()                                           
        if queue.bind(my_ae, '#'):                                # '#' 接收所有消息
            print('Queue bound to alternate-exchange')

带事务的批处理

AMQP事务,或TX,提供了一个机制,消息能批量的发布到RabbitMQ,然后提交(commit)到队列或回滚。下面的例子,展示了编写事务的代码是相当简单。

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    with connection.channel() as channel:
        tx = rabbitpy.Tx(channel)
        tx.select() # 开启事务
         
        message = rabbitpy.Message(channel,                           
                                   'This is an important message',    
                                   {'content_type': 'text/plain',
                                    'delivery_mode': 2,               
                                    'message_type': 'important'}) 
        message.publish('chapter4-example', 'important.message')      
        try:
            if tx.commit():    # 提交事务                                   
                print('Transaction committed')             
        except rabbitpy.exceptions.NoActiveTransactionError:          
            print('Tried to commit without active transaction')
           

为了开始一个事务,发布者发送一个TX.SelectRPC请求到RabbitMQ,然后RabbitMQ会返回一个TX.SelectOk响应。一旦事务被开启,发布者就可以发送一条或多条消息到RabbitMQ。

img

使用高可用队列避免节点故障

高可用队列(Highly avilable,HA)是RabbitMQ的增强,并不是AMQP的规约,它允许队列有跨RaibbtMQ集群的冗余的副本。

下面,我们建立一个新的队列,该队列跨越 RabbitMQ 集群中的每个节点。

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    try:
        with connection.channel() as channel:
            queue = rabbitpy.Queue(channel,
                                  'my-ha-queue',
                                    arguments={'x-ha-policy': 'all'}) # 传入x-ha-policy

            if queue.declare():
                print('Queue declared')
    except rabbitpy.exceptions.RemoteClosedChannelException as error:      
        print('Queue declare failed: %s' % error)

           

当一条消息被发送到HA队列,该消息会发送到集群中的负责HA队列的每个服务器。一旦该消息被集群中任意节点确认,集群中其他节点的消息副本会立即从各自队列中移除。

img

HA队列的事务

如果你正在使用事务或传输确认机制,RabbitMQ不会发送一个成功响应,直到某条消息被HA队列中所有活动节点确认。这会产生一定延迟。

设置delivery-mode来存储消息到磁盘

delivery-mode是AMQP 规约指定的一个Basic.Properties属性。delivery-mode默认为1,意味着不需要存储消息到磁盘,此时,如果RabbitMQ重启,这些未消费的消息就丢失了。

如果设成2,RabbitMQ会确保消息会保存到磁盘。这会确保如果重启,消息不会丢失。

除了将delivery-mode设成2,还有一种方法能保证重启后消息存在,就是将队列声明为durable=True

RabbitMQ将消息存储到磁盘,然后通过引用追踪它们,直到它们不在任何队列中。一旦消息的所有引用都从队列中移除,RabbitMQ才会将消息从磁盘移除。

img

但是要注意的是,这可能会大大降低处理速度。

img

什么时候RabbitMQ返回消息

在RabbitMQ 3.2中,RabbitMQ扩展了AMQP规约,增加了通知机制,当某个连接的阈值触发后,会通知客户端连接已经被阻塞了。Connection.BlockedConnection.Unblocked是异步方法,可以随时通知客户端。

用rabbitpy检查连接状态

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    print('Connection is Blocked? %s' % connection.blocked)   
Connection is Blocked? False

不要Get,而是Consume

消息消费者应用通常用语接收并处理消息。比如,你通过RabbitMQ实现RPC模式,发布RPC请求的应用同时也可以消费RPC响应。

img

BASIC.GET VS BASIC.CONSUME

RabbitMQ实现了两种不同的AMQP RPC命令来从队列中检索消息:Basic.GetBasic.ConsumeBasic.Get并不是检索消息理想方式。Basic.Get是一种轮询模型,而Basic.Consume是一种推送模型。

Basic.Get

当应用程序使用 Basic.Get请求检索消息时,即使队列中有多个消息,它每次想要接收消息时也必须发送一个新请求。如果你正在检索消息的队列有一条消息,RabbitMQ返回Basic.GetOkRPC响应。

img

如果当你发送一个Basic.GetRPC请求时存在一条可用的消息,RabbitMQ会响应Basic.GetOk和该条消息。

如果队列中没有消息,那么会响应Basic.GetEmpty

img

如果没有消息可用,RabbitMQ会返回Basic.Emtpy

当使用 Basic.Get时。你的应用程序需要评估来自 RabbitMQ 的 RPC 响应,以确定是否已经接收到消息。对于大多数从 RabbitMQ 接收消息的长时间运行的程序来说,这不是接收和处理消息的有效方法。

首先运行以下代码产生消息:

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

for iteration in range(10):                                       
 rabbitpy.publish(url,'', 'test-messages', 'go')

rabbitpy.publish(url,'', 'test-messages', 'stop')

下面看一下使用Basic.Get消费消息的例子:

import rabbitpy

url = 'amqp://guest:guest@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'test-messages')
        queue.declare()
        
        while True: # 循环轮训
            message = queue.get()
            if message:
                message.pprint()
                message.ack()
                if message.body == 'stop':
                    break
        
       

Basic.Consume

通过Basic.ConsumeRPC命令消费消息,你会注册你的应用到RabbitMQ,然后告诉它异步地发送消息到你的消费者上。

img

当客户端发送Basic.Consume,RabbitMQ发送消息给客户端只要有消息可用,直到客户端发送Basic.Cancel命令

而且当你的应用收到消息时,也不需要评估此条消息是否为空响应。

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'


for message in rabbitpy.consume(url,'test-messages'):
    message.pprint() 
    message.ack()  

当你的应用发起Basic.Consume,一个唯一的字符串被创建来标志RabbitMQ上开启的通道上的应用。该字符串被称为消费者标签,会随着每条消息发送给你的应用。

此消费者标签可用于取消从RabbitMQ中接收消息,只要发送一个Basic.Cancel命令即可。然后,一般客户端library都能处理此标签的发送。

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    with connection.channel() as channel:
        for message in rabbitpy.Queue(channel, 'test-messages'):     
            message.pprint()                                         
            message.ack()                                            
            if message.body == b'stop':                          
                break
        

消费者性能调优

当发布消息后,在消费消息时需要对吞吐量和可靠传输做一个均衡考量。如下图所示,有一些可选项可用于提升消息传输的速度。但消息速度上来后,消息传输的可靠性就会降低。

img

使用no-ack模式来提升吞吐量

当要消费消息时,你的应用注册自己到RabbitMQ。会发送一个Basic.Consume请求,同时包含no-ack标志。当此标志为True,这会告诉RabbitMQ你的消费者不会确认收到的消息,因此RabbitMQ应该尽可能快的发送消息。

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'test-messages')
        for message in queue.consume_messages(no_ack=True):
            message.pprint()

虽然这种方式发送消息很卡,但是在可靠传输上面就会有所欠缺。要知道为什么,在消费者应用程序接收到消息之前,考虑消息必须经过的每个步骤是很重要的。

img

当RabbitMQ在开启的连接上发送消息,它通过TCP socket连接与客户端通信。如果该连接被开启且可写,RabbitMQ假设一切都在正常工作状态,并且消息已经传递。如果存在网络错误,RabbitMQ会收到socker 错误信息并知道有网络问题存在。如果没有收到错误信息,RabbitMQ就会假设消息都已经被传输了。客户单通过发送Basic.AckRPC到RabbitMQ,RabbitMQ就知道客户端成功收到了消息,同时很可能已经处理了该消息。

如果关闭消息确认,RabbitMQ就会发送另一条消息而不会等待确认。这样,只要有足够的消息,RabbitMQ就会持续地发送消息到消费者,直到socket缓冲区被填满。

正是因为不需要等待消费消息的确认信息,才能达到最高的吞吐量。对于一次性的消息来说,这是最理想的方式来提高消息速度,但是并不意味着没有重大风险。考虑当一个消费者应用崩溃会发生什么,假设此时仍然还有1KB的消息在操作系统的socket接收缓冲区。

RabbitMQ认为已经发送了这些消息,并且不会得知当应用奔溃然后socket关闭后已经消费了多少消息。

如果这种消费消息的方法不适合你的应用架构,但是你需要更快的消息吞吐量而不是某个消息被发送然后被确认。

通过服务质量设置控制消费者预取

AMQP 规范要求channel有服务质量(QoS)设置,其中消费者可以在确认这些消息之前,设置待接收的特定消息数量。Qos设置可以让RabbitMQ更有效的发送消息,通过指定为消费者预分配多少消息。

与消费者禁用ack不同,如果你的消费者应用在确认消息之前崩溃了,所有预拉取的消息会在socket关闭时返回到队列之中。

在协议级别,在channel上发送一个Basic.QoSRPC请求来指定QoS。你可以指定该Qos设置是应用于此channel上还是连接打开的所有channel上。Basic.QoSRPC请求能在任何时候方法,但通过在消费者发送Basic.Consume请求之前。

在下图中以单个消费者为基准,显示此时,预设的prefetch数量为2500是对于高峰期消息速度而言是最佳的设置。

image-20210624095006643

使用QoS设置的一个好处之一是你不必确认收到的每条消息。Basic.AckRPC响应有一个叫作multiple的属性,当为True时,会让RabbitMQ知道你的应用想确认所有之前未确认的消息。

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    with connection.channel() as channel:
        channel.prefetch_count(10)
        unacknowledged =0
        for message in rabbitpy.Queue(channel, 'test-messages'):
            message.pprint()
            unacknowledged += 1
            if unacknowledged == 10:
                message.ack(all_previous=True)
                unacknowledged = 10

使用事务

事务能允许你的应用提交和回滚批量的操作。但事务对于消息的吞吐量有一个不好的影响,除了一个例外。如果你没有使用QoS设置,你可能在使用事务后,会发现有一点性能的提升。

img

事务在禁用ack后失效

拒绝消息

确认消息时一种很好的方式去确保RabbitMQ知道消费者已经收到并处理了某个消息,但是如果遇到了某个问题呢?可能是消息本身或处理消息时。此时,RabbitMQ提供了两种机制让消息回到broker:Basic.RejectBasic.Nack

img

Basic.Reject

Basic.Reject是一个AMQP规定的RPC响应来告知broker某消息不能被处理。当消费者拒绝消息时,可以让RabbitMQ丢弃该消息或通过requeue标志让消息重新入队。

下面的代码,展示了一个消息时如何重新入队的,redelivered标志告知下一个消费者该消息已经被传递过。

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

for message in rabbitpy.consume(url,'test-messages'):
    message.pprint()
    print('Redelivered: %s' % message.redelivered)                      
    message.reject(True)

Basic.Ack一样,如果某消息没有设置no-ack并且传递后,使用Basic.Reject会释放该消息的引用。

使用Basic.Rejct一次只能拒绝一个消息。

Basic.Nack

RabbitMQ实现了一个AMQP规约没有的新响应,叫Basic.Nack。它能一次拒绝多个消息。

死信队列

RabbitMQ的死信队列(dead-letter exchange,DLX)也是对AMQP的扩展,它是在拒绝一个传输过的消息是可设置的可选项。当想知道为什么消费此类消息时会遇到问题DLK就会很有用。

DLX也是一个普通的exchange。让某个exchange成为DLX只要做一件事,就是在创建队列时,让该exchange用于拒绝的消息。一旦拒绝某个消息,并且它没有重新入队,RabbitMQ会路由该消息到队列的 x-dead-letter-exchange 参数中指定的exchange。

img

在声明队列时指定死信队列相当简单。只需在创建队列对象时将 exchange 名称作为dead_letter_exchange 参数传入,或在发送Queue.Declare请求时将其作为 x-dead-letter-exchange 参数传入。

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    with connection.channel() as channel:
        rabbitpy.Exchange(channel, 'rejected-messages').declare()
        queue = rabbitpy.Queue(channel, 'dlx-example',
                              dead_letter_exchange='rejected-messages')
        queue.declare()

队列控制

当定义一个队列时,可以很多设置来决定该队列的行为。比如队列可以做下面的事情:

  • 删除自己
  • 只允许一个消费者消费
  • 让消息过期
  • 限定消息数量
  • 移除旧消息

要注意的是,队列的设置是不可修改的,一旦你创建了一个队列,你后面就无法修改它的设置。除非删除重建。

临时队列

就像《碟中谍》里汤姆克鲁斯的公文包一样,RabbitMQ提供了能删除自己的队列,只要该队列被使用过并且不再需要时。这些队列能在创建时能填充一些消息,一旦消费者连接上来,检索了这些消息,然后断开连接,这些队列就会自动删除。

通过在Queue.DeclareRPC请求中设置auto-delete标志为True可以很容易的创建这些自删除队列。

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'ad-example',auto_delete=True)
        queue.declare()
       

注意的是,任意数量的消费者可以从自删除队列中消费,只有当没有消费者连接它时,该队列才会删除自己。

永久队列

如果你想让某个队列在RabbitMQ重启后仍然存在,那么可以设置durable标志为True。可能会和持久消息混淆。持久消息通过设置delivery-mode2。而durable标志,告诉RabbitMQ你想让队列一直存在,直到Queue.Delete请求被调用。

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'durable-queue',durable=True)
        if queue.declare():
            print('Queue declared')
       

对于那些不重要的消息,你可能想让它们经过一段时间后自动被抛弃。可以设置消息的TTL值,指定消息的最大存活时间。每个消息的过期时间可以不同。

与设置消息的过期属性相反,x-message-ttl队列设置可以队列中所有消息的过期时间。

import rabbitpy

url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F'

with rabbitpy.Connection(url) as connection:
    with connection.channel() as channel:
    	# 设置里面的消息10秒过期
        queue = rabbitpy.Queue(channel, 'expiring-msg-queue',arguments={'x-message-ttl':10000})
        if queue.declare():
            print('Queue declared')
       

在RabbitMQ 3.1.0后,队列可以设置最大消息数量x-max-length,一旦队列里面的元素达到最大值,RabbitMQ会丢掉这些消息。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-16 11:22:15  更:2021-07-16 11:24:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/9 3:59:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码