?一、生产消息
<?php
namespace app\controllers;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use yii\web\Controller;
class QueueController extends Controller
{
private $connect = '';
public function init()
{
parent::init();
$config = [
'host' => '192.168.56.102',
'port' => 5672,
'user' => 'admin',
'pwd' => 123456,
'vhost' => '/'
];
$this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['pwd'], $config['vhost']);
return $this->connect;
}
/**
* 简单队列
* 无需申明交换机(默认交换机)
* 发送消息 routingKey需要与队列名称一致
*/
public function actionSimple()
{
$conn = $this->connect;
$queueName = 'queue_simple';
$channel = $conn->channel();
$channel->queue_declare($queueName, false, true, false, false);
$msgBody = json_encode(['code' => 200, 'data' => ['name' => '里斯', 'age' => 18]]);
$msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]);
$channel->basic_publish($msg, '', 'queue_simple');
$channel->close();
}
/**
* fanout 广播消息
* 队列绑定交换机
* 消息发送到所有绑定的队列
* 无需申明routingKey 申明了也无效
*/
public function actionFanout()
{
$conn = $this->connect;
$conn->channel();
$exchangeName = 'exchange_fanout';
$queueName1 = 'queue_fanout1';
$queueName2 = 'queue_fanout2';
$channel = $conn->channel();
$channel->exchange_declare($exchangeName, 'fanout', false, true, false, false);
$channel->queue_declare($queueName1, false, true, false, false);
$channel->queue_declare($queueName2, false, true, false, false);
$channel->queue_bind($queueName1, $exchangeName, '');
$channel->queue_bind($queueName2, $exchangeName, '');
$msgBody = 'I am fanout message';
$msg = new AMQPMessage($msgBody, ['delivery_mode' => 2]);
$channel->basic_publish($msg, $exchangeName, '');
}
/**
* 直连模式
* 队列需绑定交换机和路由
*/
public function actionDirect()
{
$conn = $this->connect;
$channel = $conn->channel();
$exchangeName = 'exchange_direct';
$rk = 'success';
$channel->exchange_declare($exchangeName, 'direct', false, true, false, false);
$msgBody = 'This is success message';
$msg = new AMQPMessage($msgBody, ['delivery_mode' => 2]);
$channel->basic_publish($msg, $exchangeName, $rk);
}
/**
* 主题模式
* [*表示一个单词 #表示零个或多个]
*/
public function actionTopic()
{
$conn = $this->connect;
$channel = $conn->channel();
$exchangeName = 'exchange_topic';
$rk = 'mail.success';
$channel->exchange_declare($exchangeName, 'topic', false, true, false, false);
$msgBody = 'This is send.mail.success message';
$msg = new AMQPMessage($msgBody, ['delivery_mode' => 2]);
$channel->basic_publish($msg, $exchangeName, $rk);
}
/**
* 发送消息确认
* confirm模式 交换机向队列发送消息
*/
public function actionConfirm()
{
$exchangeName = 'exchange_confirm';
$queueName = 'queue_confirm';
$routingKey = 'confirm';
$conn = $this->connect;
$channel = $conn->channel();
$channel->exchange_declare($exchangeName, 'direct', false, true, false);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $routingKey);
//发送成功回调
$channel->set_ack_handler(function (AMQPMessage $message) {
echo $message->body . ' send succeed';
});
//发送失败回调
$channel->set_nack_handler(function (AMQPMessage $message) {
echo $message->body . ' send failed';
});
//开启发送监听
$channel->confirm_select();
$msg = new AMQPMessage('测试消息', ['delivery_mode' => 2]);
$channel->basic_publish($msg, $exchangeName, $routingKey);
//阻塞等待消息确认
$channel->wait_for_pending_acks();
}
/**
* 死信队列 延迟队列
* 当队列和消息同时设置过期时间 以最短时间为准
* 消费死信队列消息
*/
public function actionDead()
{
$exchangeName = 'exchange_ttl';
$queueName = 'queue_ttl';
$routingKey = 'ttl';
$deadExchangeName = 'exchange_dead';
$deadQueueName = 'queue_dead';
$deadRoutingKey = 'dead_key';
$conn = $this->connect;
$channel = $conn->channel();
$channel->exchange_declare($exchangeName, 'direct', false, true, false);
$channel->exchange_declare('exchange_dead', 'direct', false, true, false);
//死信队列参数
$arguments = new AMQPTable([
'x-dead-letter-exchange' => $deadExchangeName,
'x-dead-letter-routing-key' => $deadRoutingKey,
'x-message-ttl' => 10000
]);
$channel->queue_declare($queueName, false, true, false, false, false, $arguments);
$channel->queue_declare('queue_dead', false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $routingKey);
$channel->queue_bind($deadQueueName, $deadExchangeName, $deadRoutingKey);
$message = new AMQPMessage('5s后该消息将过期', ['delivery_mode' => 2, 'expiration' => 5000]);
$channel->basic_publish($message, $exchangeName, $routingKey);
}
public function __destruct()
{
$conn = $this->connect;
$conn->close();
}
}
单单设置队列的ttl,或者单单设置相同的消息过期时间,死信队列是能正常工作的。但是设置不同的消息过期时间,就可能无法正常使用死信队列了。
当MQ检查队列中的第一个消息时,发现其并未过期,则不会继续检查之后的消息了。即使之后的消息过期了,也会因为没在队列头部而无法流转到其他队列,这是MQ队列的特性决定的。你不能去消费队列中间的消息,队列必须先进先出。
对于设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而设置消息头部属性,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期时在即将投递到消费者之前判定的,为什么两者得处理方法不一致?因为第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期消息即可,而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息,势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期,再进行删除。
二 、消费消息
<?php
namespace app\commands;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use yii\console\Controller;
class ReadController extends Controller
{
protected $connect = '';
public function init()
{
parent::init();
$config = [
'host' => '192.168.56.102',
'port' => 5672,
'user' => 'admin',
'pwd' => 123456,
'vhost' => '/'
];
$this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['pwd'], $config['vhost']);
return $this->connect;
}
public function process_message($message)
{
$message = $message->body;
var_dump($message);
}
public function actionSimple()
{
$conn = $this->connect;
$queueName = 'queue_simple';
$channel = $conn->channel();
$channel->basic_consume($queueName, '', false, true, false, false,
function ($msg) {
$this->process_message($msg);
});
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionFan1()
{
$conn = $this->connect;
$queueName = 'queue_fanout1';
$channel = $conn->channel();
$channel->basic_consume($queueName, '', false, true, false, false, function ($msg) {
$this->process_message($msg);
});
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionFan2()
{
$conn = $this->connect;
$queueName = 'queue_fanout2';
$channel = $conn->channel();
$callback = function ($message) {
echo 'I am fanout2;' . PHP_EOL;
print_r($message->body);
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionDirect()
{
$conn = $this->connect;
$exchangeName = 'exchange_direct';
$queueName = 'queue_direct';
$channel = $conn->channel();
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, 'info');
$callback = function ($message) {
echo 'INFO:' . PHP_EOL;
print_r($message->body);
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionDirect1()
{
$conn = $this->connect;
$exchangeName = 'exchange_direct';
$queueName = 'queue_direct1';
$channel = $conn->channel();
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, 'error');
$channel->queue_bind($queueName, $exchangeName, 'warning');
$callback = function ($message) {
echo 'ERROR or WARNING:' . PHP_EOL;
print_r($message->body);
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionTopic1()
{
$conn = $this->connect;
$exchangeName = 'exchange_topic';
$queueName = 'queue_topic1';
$channel = $conn->channel();
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, '*.mail');
$callback = function ($message) {
echo $message->body . PHP_EOL;
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionTopic2()
{
$conn = $this->connect;
$exchangeName = 'exchange_topic';
$queueName = 'queue_topic2';
$channel = $conn->channel();
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, 'mail.#');
$callback = function ($message) {
echo $message->body . PHP_EOL;
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
/**
* 消费消息 消费成功确认
*/
public function actionAck()
{
$queueName = 'queue_confirm';
$conn = $this->connect;
$channel = $conn->channel();
$channel->queue_declare($queueName, false, true, false, false);
$callback = function ($message) {
$msg = $message->body;
echo 'message==' . $msg . PHP_EOL;
if ($msg == '200') {
//消费成功 手动应答
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} else {
/**
* 消费失败 basic_nack($delivery_tag, $multiple = false, $requeue = false)
* requeue 是否重新发送 false 否 true是
*/
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, false);
}
};
//消费成功后再发送下一条消息
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionDead()
{
$conn = $this->connect;
$channel = $conn->channel();
$callback = function ($message) {
echo $message->body . PHP_EOL;
sleep(2);
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
};
$channel->basic_consume('queue_dead', '', false, false, false, false, $callback);
$channel->basic_qos(null, 1, null);
while (count($channel->callbacks)) {
$channel->wait();
}
}
}
参数说明:
建立连接
$conn = new AMQPStreamConnection(
$host,//RabbitMQ服务器主机IP地址
$port,//RabbitMQ服务器端口
$user,//连接RabbitMQ服务器的用户名
$password,//连接RabbitMQ服务器的用户密码
$vhost,//连接RabbitMQ服务器的vhost(服务器可以有多个vhost,虚拟主机,类似nginx的vhost)
$insist = false,
$login_method = 'AMQPLAIN',
$login_response = null,
$locale = 'en_US',
$connection_timeout = 3.0,//连接超时
$read_write_timeout = 3.0,//读写超时
$context = null,//上下文
$keepalive = false, //是否开启长连接 常驻进程消费者需要
$heartbeat = 0,//心跳检测间隔 单位秒 0不检测 根据情形酌情填写
$channel_rpc_timeout = 0.0,
$ssl_protocol = null
);
申明交换机
$channel->exchange_declare($exhcange_name, $type, $passive, $durable,
$auto_delete,$internal,$nowait,$arguments,$ticket);
参数:
$exhcange_name 交换器名字
$type 交换器类型
$passive 是否检测同名队列
$durable 交换机是否开启持久化
$auto_detlete 通道关闭后是否删除队列
(1)交换器类型
枚举 [
direct: (默认)直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue,
fanout: 广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue,
topic: 主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。(* 表是匹配一个任意词组,#表示匹配0个或多个词组),
headers:根据消息体的header匹配
]
申明队列?
$channel->queue_declare($queue_name, $passive, $durable, $exclusive, $auto_delete,$nowait,$arguments,$ticket);
参数:
$queue_name 队列名称
$passive 是否检测同名队列
$durable 是否开启队列持久化
$exclusive 队列是否可以被其他队列访问
$auto_delete 通道关闭后是否删除队列
推送消息?
$channel->basic_publish($msg,
$exchange = '',
$routing_key = '',
$mandatory = false,
$immediate = false,
$ticket = null);
参数:
$msg 消息内容
$exchange 交换器
$routing_key routing_key
$mandatory 匹配不到队列时,是否立即丢弃消息
$immediate 队列无消费者时,是否立即丢弃消息
$ticket
消费消息
$channel->basic_consume(
$queue = '',
$consumer_tag = '',
$no_local = false,
$no_ack = false,
$exclusive = false,
$nowait = false,
$callback = null,
$ticket = null,
$arguments = array()
)
参数:
$queue 队列名
$consumer_tag
$no_local
$no_ack 是否自动ack true自动 false手动
$exclusive
$nowait
$callback 消息回调函数
$ticket
$arguments
|