IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> PHP知识库 -> RabbitMQ的简单理解及PHP使用RabbitMQ(附yii2使用方法) -> 正文阅读

[PHP知识库]RabbitMQ的简单理解及PHP使用RabbitMQ(附yii2使用方法)

最近问了几个小伙伴,发现大家都仅仅是会调用操作RabbitMQ的方法,或者仅停留在使用RabbitMQ的层次上。稍微深入一点的知识,就不太清楚了。所以在此记录一下个人对它的理解及PHP怎么使用。

注:才疏学浅,若有不对之处,敬请指正!

首先谈一下RabbitMQ的整体架构(图示):

docker安装RabbitMQ的教程可以参考:使用docker安装RabbitMQ

windows环境PHP安装amqp扩展可以参考:windows环境PHP使用RabbitMq安装amqp扩展

关于rabbitMq的四种交换机类型(Direct exchange、Fanout exchange、Topic exchange、Headers exchange)在此就不做一一说明了,网上资料有很多。文中示例使用较常用的Direct类型。 可以参考这篇博客:RabbitMQ四种交换机类型介绍

在日常开发中需要注意关于生产者(producter )和消费者(consumer )下面几点注意点:

  1. 虚拟地址(vhost) 用于逻辑的隔离,是最上层的消息路由,用来表明某个虚拟机。一个vhost里面可以有很多exchange和queue,但不能有相同名称的exchange或queue。
  2. 网络信道(channel ),进行消息读写的通道。个人理解:它类似于MySQL里面的会话(Session)。
  3. 如果虚拟机中不存在路由键(RouteKey)绑定的队列(queue)名,则生产者发出的消息会被抛弃。
  4. 无论生产者与消费者两者谁先启动,都需要先保证vhost中routeKey存在。否则生产者发出的消息会被抛弃。
  5. 一般看到视频讲解中或者别人的代码示例,都是先运行consumer,这是因为consumer中指定了队列和路由键(路由键不指定也可以,可以看第一个示例)。
  6. 两者都不指定交换机,消费者只指定队列名、 生产者只指定路由键,交换机会默认为“Exchange: (AMQP default) ” ,消息会发送到 名称 和生产者中指定的路由键的名称相同的队列 中去。(这句话有点绕,可以结合下面第一个示例来看)
  7. 对于同一个交换机,生产者与消费者的路由键一样,生产者的消息会分发到各队列中。假如两个消费者,一个监听队列A,一个监听队列B,绑定的路由键都是key1,生产者的路由键也是key1,那么生产者发送10条消息,队列AB 各收到10条,两个消费者分别消费10条来自生产者的消息。

第一个示例(不指定交换机、生产者只指定路由键、消费者只指定队列名)

consumer 代码(consumer.php ):

<?php 
//配置信息
$configParams = array( 
    'host' => '127.0.0.1',  
    'port' => '5672',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/'
);   

$queueName = 'queueTest'; //队列名 

//创建连接
$connection = new AMQPConnection($configParams);   
if (!$connection->connect()) {   
    die("Cannot connection rabbitMq!\n");   
}

//创建channel ,进行消息读写的通道 类似于MySQL里面的会话(Session)
$channel = new AMQPChannel($connection);    
   
//创建队列    
$queue = new AMQPQueue($channel); 
$queue->setName($queueName);   
$queue->setFlags(AMQP_DURABLE); //队列持久化  
echo "Message Total:".$queue->declareQueue()."\n";   
 
//阻塞模式接收消息 
echo "Waiting for message...:\n";   

while(True){ 
    $queue->consume('processMessage');   
    //$queue->consume('processMessage', AMQP_AUTOACK); //加上参数AMQP_AUTOACK,自动ACK应答  
} 

//断开连接
$conn->disconnect();   
 
/**
 * 消费回调函数
 * 处理消息
 */ 
function processMessage($envelope, $queue) { 
    $msg = $envelope->getBody();
    // $envelope->getHeaders(); // 获取消息头
    echo "Received:".$msg."\n"; //处理消息 
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答  $envelope->getDeliveryTag())获取消息传递标签
}

?>

producter 代码(producter.php):

<?php

date_default_timezone_set("Asia/Shanghai");

//配置信息 
$configParams = array( 
    'host' => '127.0.0.1',  
    'port' => '5672',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/' 
);   

$routeKey = 'queueTest'; //路由键

//创建连接
$connection = new AMQPConnection($configParams);   
if (!$connection->connect()) {   
    die("Cannot connection rabbitMq!\n");   
}

//创建channel
$channel = new AMQPChannel($connection);
   
//创建交换机
$exchange = new AMQPExchange($channel);

//发送消息 
for($i=0; $i<10; ++$i){
    sleep(2);
    //消息内容 
    $message = "Hello RabbitMq! send time:".date("h:i:sa");   
    echo "Send Message:".$exchange->publish($message, $routeKey)."\n";  
}

$connection->disconnect();
?>

然后在命令行中运行消费者代码 : php ./consumer.php

?此时,看一下rabbitMq后台,可以看到:

?

?然后再起一个命令行窗口运行生产者代码 : php ./producter.php

?再看运行消费者代码的命令行:

?点击队列名,queueTest:可以发现

?现在,再回头看上面的第6点,两者都不指定交换机,消费者指定的队列名和生产者指定的路由键名称相同都是“queueTest”,交换机会默认为“AMQP default” ,生产者的消息会发送到队列名称 为queueTest的队列 中去。可以点击(AMQP default)交换机,可以看到bindings下面有句话,其实就是第6点的描述。


第二个示例(指定各个参数,使用一个direct类型的交换机)

注:代码中的类中的方法可以在此查询:http://docs.php.net/manual/da/book.amqp.php

consumer 代码(consumer.php ):

<?php 
//配置信息
$configParams = array( 
    'host' => '127.0.0.1',  
    'port' => '5672',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/'
); 
  
$exchangeName = 'test'; //交换机名  交换机会根据路由键转发消息到绑定的队列
$queueName = 'queue1'; //队列名 
$routeKey = 'key1'; //路由键 

//创建连接 类中的方法可以从http://docs.php.net/manual/da/book.amqp.php 查询
$connection = new AMQPConnection($configParams);   
if (!$connection->connect()) {   
    die("Cannot connection rabbitMq!\n");   
}

//创建channel ,进行消息读写的通道,类似于MySQL里面的会话(Session)
$channel = new AMQPChannel($connection);   
 
//创建交换机    
$exchange = new AMQPExchange($channel);   
$exchange->setName($exchangeName); 
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型  
$exchange->setFlags(AMQP_DURABLE); //持久化 
echo "Exchange Status:".$exchange->declareExchange()."\n";   
   
//创建队列    
$queue = new AMQPQueue($channel); 
$queue->setName($queueName);   
$queue->setFlags(AMQP_DURABLE); //队列持久化  
echo "Message Total:".$queue->declareQueue()."\n";   
 
//绑定交换机与队列,并指定路由键 
echo 'Queue Bind: '.$queue->bind($exchangeName, $routeKey)."\n"; 
 
//阻塞模式接收消息 
echo "Waiting for message...:\n";   
while(True){ 
    $queue->consume('processMessage');   
    //$queue->consume('processMessage', AMQP_AUTOACK); //加上参数AMQP_AUTOACK,自动ACK应答  
} 

//断开连接
$conn->disconnect();   
 
/**
 * 消费回调函数
 * 处理消息
 */ 
function processMessage($envelope, $queue) { 
    $msg = $envelope->getBody();
    // $envelope->getHeaders(); // 获取消息头
    echo "Received:".$msg."\n"; //处理消息 
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答  $envelope->getDeliveryTag())获取消息传递标签
}

?>

producter 代码(producter.php):

<?php
date_default_timezone_set("Asia/Shanghai");

//配置信息 
$configParams = array( 
    'host' => '127.0.0.1',  
    'port' => '5672',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/' 
);   

$exchangeName = 'test'; //交换机名  可以不选择
$routeKey = 'key1'; //路由键  如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃

//创建连接
$connection = new AMQPConnection($configParams);   
if (!$connection->connect()) {   
    die("Cannot connection rabbitMq!\n");   
}

//创建channel
$channel = new AMQPChannel($connection);   
 
//创建交换机对象    
$exchange = new AMQPExchange($channel);   
$exchange->setName($exchangeName);    
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$exchange->setFlags(AMQP_DURABLE); //持久化

//发送消息 
for($i=0; $i<10; ++$i){
    sleep(2);
    //消息内容 
    $message = "Hello RabbitMq! send time:".date("h:i:sa");   
    echo "Send Message:".$exchange->publish($message, $routeKey)."\n";  
}

$connection->disconnect();
?>

这个运行结果就不一一截图了,有兴趣的可以自己尝试一下(还有其他类型的交换机)。


yii2中怎么使用rabbitMq呢?先要用composer 加载扩展包

composer require php-amqplib/php-amqplib:*?

具体版本可以在此查询下面

https://packagist.org/packages/php-amqplib/php-amqplib

引入包后,可以参照这篇博客->YII2 框架如何使用rabbitMq,要注意不要把消费者写成函数放到postman中请求,因为肯定会报“Error: Response timed out”,最好写成脚本运行。

时间有限,我会抽空慢慢修改或增加关于rabbitMq的内容。

  PHP知识库 最新文章
Laravel 下实现 Google 2fa 验证
UUCTF WP
DASCTF10月 web
XAMPP任意命令执行提升权限漏洞(CVE-2020-
[GYCTF2020]Easyphp
iwebsec靶场 代码执行关卡通关笔记
多个线程同步执行,多个线程依次执行,多个
php 没事记录下常用方法 (TP5.1)
php之jwt
2021-09-18
上一篇文章      下一篇文章      查看所有文章
加:2022-03-13 21:32:24  更:2022-03-13 21:32:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/26 15:58:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计