目录
RabbitMQ的准备
QT的准备
AMQP
Exchange交换机
Queue队列
消息机制
消息
连接
虚拟主机
Publish/Subscribe发布与订阅模式
Routing路由模式
总结
RabbitMQ的准备
下载参考
安装完毕,是会自动启动RabbitMQ server服务的。
http://127.0.0.1:15672/
这是后台管理网页,默认登录名称与密码都为guest,注意登录时不要输入空格。在这里可已查看客户端与RabbitMQ server的连接情况。
如果无法打开管理界面,则使用管理员打开cmd执行此命令:net stop RabbitMQ && net start RabbitMQ,重新打开管理界面即可
添加使用者?
QT的准备
QAMQP库编译 下载地址:https://github.com/mbroadst/qamqp
解压之后得到:qamqp-master 打开qamqp-master/src/src.pro文件并编译,在编译后的Debug下得到libqamqpd.a和qamqpd.dll
打开 tutorials下的任意例子,在.pro下添加外部库libqamqpd.a,并在.pri注释一句,如图
编译成功后,在得到的.exe下放qamqpd.dll
AMQP
AMQP(高级消息队列协议)是一个进程间传递异步消息的网络协议,应用程序之间通讯
异步消息:producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。
发布者(Publisher)发布消息(Message),经由交换机(Exchange)。
交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。
最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取
Exchange交换机
交换机拿到消息之后将它路由给一个或零个队列。
它使用哪种路由算法是由交换机类型和绑定规则所决定
AMQP 0.9.1
交换机有两个状态:持久、暂存。
持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会
类型名称?? ?路由规则
Default?? ?自动命名的直交换机
Direct?? ?Routing Key==Binding Key,严格匹配
Fanout?? ?把发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中
Topic?? ?Routing Key==Binding Key,模糊匹配
Headers?? ?根据发送的消息内容中的 headers 属性进行匹配
?
默认交换机(default exchange)是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。
每个新建队列都会自动绑定到默认交换机上,绑定键名称与队列名称相同。
当声明了一个名为 “hello” 的队列,AMQP 代理会自动将其绑定到默认交换机上,绑定键名称也是为 “hello”。因此,当携带着名为 “hello” 的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为 “hello” 的队列中。
直连型交换机(direct exchange)根据消息携带的路由键(routing key)将消息投递给对应绑定键的队列
1)将一个队列绑定到某个交换机上时,赋予该绑定一个绑定键(Binding Key),假设为R; 2)当一个携带着路由键(Routing Key)为R的消息被发送给直连交换机时,交换机会把它路由给绑定键为R的队列。
直连交换机的队列通常是循环分发任务给多个消费者(轮询)。比如说有3个消费者,4个任务。分别分发每个消费者一个任务后,第4个任务又分发给了第一个消费者,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。
扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键
如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的 N 个队列
主题交换机(Topic exchange)??不同于direct 严格意义上的匹配,Topic 的路由规则是一种模糊匹配,可以通过通配符满足一部分规则就可以传送
binding key 与 routing key 是一个句点号 “.” 分隔的字符串,如“a.b.c”、“aaa.bbb.ccc”
binding key 可以存在两种特殊字符 “” 与“#”,用于做模糊匹配,其中 “” 用于匹配一个单词,“#”用于匹配多个单词(可以是零个) ?
头交换机headers Exchange根据发送的消息内容中的 headers 属性进行匹配。
1)绑定一个队列到头交换机上时,会同时绑定多个用于匹配的头(header)。 2)传来的消息会携带header,以及会有一个 “x-match” 参数。当 “x-match” 设置为 “any” 时,消息头的任意一个值被匹配就可以满足条件,而当 “x-match” 设置为 “all” 的时候,就需要消息头的所有值都匹配成功。
Queue队列
存储消息
队列属性
队列跟交换机共享某些属性,但是队列也有一些另外的属性。
队列创建 队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为 406 的通道级异常就会被抛出。
队列持久化 持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。
持久化的队列并不会使得路由到它的消息也具有持久性。只有经过持久化的消息在重启的过程中才能被重新恢复。
Consumer消费者
消息被应用消费掉有两种途径:
1)将消息投递给应用 (“push API”) 2)应用根据需要主动获取消息 (“pull API”)
使用 push API,应用需要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。一个队列可以被多个消费者订阅,也可以被一个独享的消费者订阅(当独享消费者存在时,其他消费者即被排除在外)。
每个消费者都有一个叫做消费者标签的标识符,实际上是一个字符串,它可以被用来退订。
消息机制
消息确认
消费者在处理消息的时候偶尔会失败或者有时会直接崩溃掉,而且网络原因也有可能引起各种问题,AMQP 0-9-1 规范给两种方式:
1)自动确认模式:当消息代理将消息发送给应用后立即删除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok)) 2)显式确认模式:待应用发送一个确认回执后再删除消息。(使用 AMQP 方法:basic.ack)
如果一个消费者在尚未发送确认回执的情况下挂掉了,那 AMQP 代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。
消息拒绝
当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明拒绝某条消息,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。
预取消息
当多个消费者共享一个队列,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息
消息
由属性和有效载荷 - Payload组成
消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。
消息属性
AMQP 模型中的消息(Message)对象是带有属性(Attributes)的
Content type(内容类型) Content encoding(内容编码) Routing key(路由键) Delivery mode (persistent or not) 投递模式(持久化 或 非持久化) Message priority(消息优先权) Message publishing timestamp(消息发布的时间戳) Expiration period(消息有效期) Publisher application id(发布应用的 ID)
消息持久化
简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式
连接
AMQP 连接通常是长连接。AMQP 是一个使用 TCP 提供可靠投递的应用层协议。AMQP 使用认证机制并且提供TLS(SSL)保护。当一个应用不再需要连接到 AMQP 代理的时候,需要释放掉 AMQP 连接,而不是直接将 TCP 连接关闭。
有些应用需要与 AMQP 代理建立多个连接,AMQP 0.9.1 提供了通道(channels)来处理多连接,可以把通道理解成共享一个 TCP 连接的多个轻量化连接。
在涉及多线程 / 进程的应用中,为每个线程 / 进程开启一个通道(channel)是很常见的,并且这些通道不能被线程 / 进程共享。
一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个 AMQP 方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。
虚拟主机
为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts - vhosts)的概念。这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机。
对官方例子的修改
Publish/Subscribe发布与订阅模式
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收 到消息
生产者创建三个队列(q1,q2,q3)并绑定到“log”交换机上,向“log”交换机发的消息被转发到与之绑定的队列上
#include <QCoreApplication>
#include <QStringList>
#include <QDebug>
#include "qamqpclient.h"
#include "qamqpexchange.h"
#include "qamqpqueue.h"
class LogEmitter : public QObject
{
Q_OBJECT
public:
LogEmitter(QObject *parent = 0) : QObject(parent) {}
public Q_SLOTS:
void start() {
connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected()));
connect(&m_client, SIGNAL(disconnected()), qApp, SLOT(quit()));
m_client.connectToHost();
}
private Q_SLOTS:
void clientConnected() {
q1=m_client.createQueue("");
q2=m_client.createQueue("");
q3=m_client.createQueue("");
q1->declare();
q2->declare();
q3->declare();
QAmqpExchange *exchange = m_client.createExchange("logs");
connect(exchange, SIGNAL(declared()), this, SLOT(exchangeDeclared()));
q1->bind(exchange,"");
q2->bind(exchange,"");
q3->bind(exchange,"");
exchange->declare(QAmqpExchange::FanOut);//Fanout 把发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中
}
void exchangeDeclared() {
QAmqpExchange *exchange = qobject_cast<QAmqpExchange*>(sender());
if (!exchange)
return;
QString message;
if (qApp->arguments().size() < 2)
message = "info: Hello World!";
else
message = qApp->arguments().at(1);
exchange->publish(message, "");
qDebug() << " [x] Sent " << message;
m_client.disconnectFromHost();
}
private:
QAmqpClient m_client;
QAmqpQueue *q1;
QAmqpQueue *q2;
QAmqpQueue *q3;
};
int main(int argc, char **argv)
{
QCoreApplication app(argc, argv);
LogEmitter logEmitter;
logEmitter.start();
return app.exec();
}
#include "main.moc"
监听“222”队列的消费者
class Receiver : public QObject
{
Q_OBJECT
public:
Receiver(QObject *parent = 0) : QObject(parent) {
m_client.setAutoReconnect(true);
}
public Q_SLOTS:
void start() {
connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected()));
m_client.connectToHost();
}
private Q_SLOTS:
void clientConnected() {
QAmqpQueue *queue = m_client.createQueue("222");
connect(queue, SIGNAL(declared()), this, SLOT(queueDeclared()));
queue->declare();
}
void queueDeclared() {
QAmqpQueue *queue = qobject_cast<QAmqpQueue*>(sender());
if (!queue)
return;
connect(queue, SIGNAL(messageReceived()), this, SLOT(messageReceived()));
queue->consume(QAmqpQueue::coNoAck);
qDebug() << " [*] Waiting for messages. To exit press CTRL+C";
}
void messageReceived() {
QAmqpQueue *queue = qobject_cast<QAmqpQueue*>(sender());
if (!queue)
return;
QAmqpMessage message = queue->dequeue();
qDebug() << " [x] Received " << message.payload();
}
private:
QAmqpClient m_client;
};
int main(int argc, char **argv)
{
QCoreApplication app(argc, argv);
Receiver receiver;
receiver.start();
return app.exec();
}
#include "main.moc"
小结:
交换机需要与队列进行绑定,绑定之后,一个消息可以被多个消费者都收到。
发布/订阅模式的生产方是面向交换机发送消息 ,“”表示交换机向所有被绑定的队列发
发布/订阅模式要设置队列和交换机的绑定 ,但可以不设置绑定键
Routing路由模式
路由模式特点:
-
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。 -
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列 -
C1:消费者,其所在队列指定了需要routing key 为 orange的消息 -
C2:消费者,其所在队列指定了需要routing key 为 blank,green?的消息
在编码上与 发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。
class LogEmitter : public QObject
{
Q_OBJECT
public:
LogEmitter(QObject *parent = 0) : QObject(parent) {}
public Q_SLOTS:
void start() {
connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected()));
connect(&m_client, SIGNAL(disconnected()), qApp, SLOT(quit()));
m_client.connectToHost();
}
private Q_SLOTS:
void clientConnected() {
q1=m_client.createQueue("111");
q2=m_client.createQueue("222");
q1->declare();
q2->declare();
QAmqpExchange *exchange = m_client.createExchange("direct_logs");
connect(exchange, SIGNAL(declared()), this, SLOT(exchangeDeclared()));
q1->bind(exchange,"orange");
q2->bind(exchange,"blank");
q2->bind(exchange,"green");
exchange->declare(QAmqpExchange::Direct);
}
void exchangeDeclared() {
QAmqpExchange *exchange = qobject_cast<QAmqpExchange*>(sender());
if (!exchange)
return;
exchange->publish("Hello world", "orange");
exchange->publish("Hello world", "blank");
exchange->publish("Hello world", "green");
qDebug() << " [x] Sent " ;
m_client.disconnectFromHost();
}
private:
QAmqpClient m_client;
QAmqpQueue *q1;
QAmqpQueue *q2;
};
总结
RabbitMQ工作模式:
1、简单模式 HelloWorld 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
2、工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
3、发布订阅模式 Publish/subscribe 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
4、路由模式 Routing 需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
5、通配符模式 Topic 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
对生产者而言,由于交换机和队列设置的性属而出现的工作模式。
对消费者而言有两种途径消费消息
1)将消息投递给应用 (“push API”) 2)应用根据需要主动获取消息 (“pull API”)
使用 pull API,可以不用交换机,直接获取队列中的消息
参考:
安装教程
AMQP
|