最近问了几个小伙伴,发现大家都仅仅是会调用操作RabbitMQ的方法,或者仅停留在使用RabbitMQ的层次上。稍微深入一点的知识,就不太清楚了。所以在此记录一下个人对它的理解及PHP怎么使用。
注:才疏学浅,若有不对之处,敬请指正!
首先谈一下RabbitMQ的整体架构(图示):
docker安装RabbitMQ的教程可以参考:使用docker安装RabbitMQ
windows环境PHP安装amqp扩展可以参考:windows环境PHP使用RabbitMq安装amqp扩展
关于rabbitMq的四种交换机类型(Direct exchange、Fanout exchange、Topic exchange、Headers exchange)在此就不做一一说明了,网上资料有很多。文中示例使用较常用的Direct类型。 可以参考这篇博客:RabbitMQ四种交换机类型介绍
在日常开发中需要注意关于生产者(producter )和消费者(consumer )下面几点注意点:
- 虚拟地址(vhost) 用于逻辑的隔离,是最上层的消息路由,用来表明某个虚拟机。一个vhost里面可以有很多exchange和queue,但不能有相同名称的exchange或queue。
- 网络信道(channel ),进行消息读写的通道。个人理解:它类似于MySQL里面的会话(Session)。
- 如果虚拟机中不存在路由键(RouteKey)绑定的队列(queue)名,则生产者发出的消息会被抛弃。
- 无论生产者与消费者两者谁先启动,都需要先保证vhost中routeKey存在。否则生产者发出的消息会被抛弃。
- 一般看到视频讲解中或者别人的代码示例,都是先运行consumer,这是因为consumer中指定了队列和路由键(路由键不指定也可以,可以看第一个示例)。
- 两者都不指定交换机,消费者只指定队列名、 生产者只指定路由键,交换机会默认为“Exchange: (AMQP default) ” ,消息会发送到 名称 和生产者中指定的路由键的名称相同的队列 中去。(这句话有点绕,可以结合下面第一个示例来看)
- 对于同一个交换机,生产者与消费者的路由键一样,生产者的消息会分发到各队列中。假如两个消费者,一个监听队列A,一个监听队列B,绑定的路由键都是key1,生产者的路由键也是key1,那么生产者发送10条消息,队列AB 各收到10条,两个消费者分别消费10条来自生产者的消息。
第一个示例(不指定交换机、生产者只指定路由键、消费者只指定队列名):
consumer 代码(consumer.php ):
<?php
//配置信息
$configParams = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
$queueName = 'queueTest'; //队列名
//创建连接
$connection = new AMQPConnection($configParams);
if (!$connection->connect()) {
die("Cannot connection rabbitMq!\n");
}
//创建channel ,进行消息读写的通道 类似于MySQL里面的会话(Session)
$channel = new AMQPChannel($connection);
//创建队列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE); //队列持久化
echo "Message Total:".$queue->declareQueue()."\n";
//阻塞模式接收消息
echo "Waiting for message...:\n";
while(True){
$queue->consume('processMessage');
//$queue->consume('processMessage', AMQP_AUTOACK); //加上参数AMQP_AUTOACK,自动ACK应答
}
//断开连接
$conn->disconnect();
/**
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $queue) {
$msg = $envelope->getBody();
// $envelope->getHeaders(); // 获取消息头
echo "Received:".$msg."\n"; //处理消息
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 $envelope->getDeliveryTag())获取消息传递标签
}
?>
producter 代码(producter.php):
<?php
date_default_timezone_set("Asia/Shanghai");
//配置信息
$configParams = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
$routeKey = 'queueTest'; //路由键
//创建连接
$connection = new AMQPConnection($configParams);
if (!$connection->connect()) {
die("Cannot connection rabbitMq!\n");
}
//创建channel
$channel = new AMQPChannel($connection);
//创建交换机
$exchange = new AMQPExchange($channel);
//发送消息
for($i=0; $i<10; ++$i){
sleep(2);
//消息内容
$message = "Hello RabbitMq! send time:".date("h:i:sa");
echo "Send Message:".$exchange->publish($message, $routeKey)."\n";
}
$connection->disconnect();
?>
然后在命令行中运行消费者代码 : php ./consumer.php
?此时,看一下rabbitMq后台,可以看到:
?
?然后再起一个命令行窗口运行生产者代码 : php ./producter.php
?再看运行消费者代码的命令行:
?点击队列名,queueTest:可以发现
?现在,再回头看上面的第6点,两者都不指定交换机,消费者指定的队列名和生产者指定的路由键名称相同都是“queueTest”,交换机会默认为“AMQP default” ,生产者的消息会发送到队列名称 为queueTest的队列 中去。可以点击(AMQP default)交换机,可以看到bindings下面有句话,其实就是第6点的描述。
第二个示例(指定各个参数,使用一个direct类型的交换机):
注:代码中的类中的方法可以在此查询:http://docs.php.net/manual/da/book.amqp.php
consumer 代码(consumer.php ):
<?php
//配置信息
$configParams = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
$exchangeName = 'test'; //交换机名 交换机会根据路由键转发消息到绑定的队列
$queueName = 'queue1'; //队列名
$routeKey = 'key1'; //路由键
//创建连接 类中的方法可以从http://docs.php.net/manual/da/book.amqp.php 查询
$connection = new AMQPConnection($configParams);
if (!$connection->connect()) {
die("Cannot connection rabbitMq!\n");
}
//创建channel ,进行消息读写的通道,类似于MySQL里面的会话(Session)
$channel = new AMQPChannel($connection);
//创建交换机
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$exchange->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$exchange->declareExchange()."\n";
//创建队列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE); //队列持久化
echo "Message Total:".$queue->declareQueue()."\n";
//绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$queue->bind($exchangeName, $routeKey)."\n";
//阻塞模式接收消息
echo "Waiting for message...:\n";
while(True){
$queue->consume('processMessage');
//$queue->consume('processMessage', AMQP_AUTOACK); //加上参数AMQP_AUTOACK,自动ACK应答
}
//断开连接
$conn->disconnect();
/**
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $queue) {
$msg = $envelope->getBody();
// $envelope->getHeaders(); // 获取消息头
echo "Received:".$msg."\n"; //处理消息
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 $envelope->getDeliveryTag())获取消息传递标签
}
?>
producter 代码(producter.php):
<?php
date_default_timezone_set("Asia/Shanghai");
//配置信息
$configParams = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
$exchangeName = 'test'; //交换机名 可以不选择
$routeKey = 'key1'; //路由键 如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃
//创建连接
$connection = new AMQPConnection($configParams);
if (!$connection->connect()) {
die("Cannot connection rabbitMq!\n");
}
//创建channel
$channel = new AMQPChannel($connection);
//创建交换机对象
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$exchange->setFlags(AMQP_DURABLE); //持久化
//发送消息
for($i=0; $i<10; ++$i){
sleep(2);
//消息内容
$message = "Hello RabbitMq! send time:".date("h:i:sa");
echo "Send Message:".$exchange->publish($message, $routeKey)."\n";
}
$connection->disconnect();
?>
这个运行结果就不一一截图了,有兴趣的可以自己尝试一下(还有其他类型的交换机)。
yii2中怎么使用rabbitMq呢?先要用composer 加载扩展包
composer require php-amqplib/php-amqplib:*?
具体版本可以在此查询下面
https://packagist.org/packages/php-amqplib/php-amqplib
引入包后,可以参照这篇博客->YII2 框架如何使用rabbitMq,要注意不要把消费者写成函数放到postman中请求,因为肯定会报“Error: Response timed out”,最好写成脚本运行。
时间有限,我会抽空慢慢修改或增加关于rabbitMq的内容。
|