IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 开发测试 -> php-amqplib测试备份 -> 正文阅读

[开发测试]php-amqplib测试备份

?一、生产消息

<?php

namespace app\controllers;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use yii\web\Controller;

class QueueController extends Controller
{
    private $connect = '';

    public function init()
    {
        parent::init();
        $config = [
            'host' => '192.168.56.102',
            'port' => 5672,
            'user' => 'admin',
            'pwd' => 123456,
            'vhost' => '/'
        ];
        $this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['pwd'], $config['vhost']);
        return $this->connect;
    }

    /**
     * 简单队列
     * 无需申明交换机(默认交换机)
     * 发送消息 routingKey需要与队列名称一致
     */
    public function actionSimple()
    {
        $conn = $this->connect;
        $queueName = 'queue_simple';
        $channel = $conn->channel();
        $channel->queue_declare($queueName, false, true, false, false);
        $msgBody = json_encode(['code' => 200, 'data' => ['name' => '里斯', 'age' => 18]]);
        $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]);
        $channel->basic_publish($msg, '', 'queue_simple');
        $channel->close();
    }

    /**
     * fanout 广播消息
     * 队列绑定交换机
     * 消息发送到所有绑定的队列
     * 无需申明routingKey 申明了也无效
     */
    public function actionFanout()
    {
        $conn = $this->connect;
        $conn->channel();
        $exchangeName = 'exchange_fanout';
        $queueName1 = 'queue_fanout1';
        $queueName2 = 'queue_fanout2';
        $channel = $conn->channel();
        $channel->exchange_declare($exchangeName, 'fanout', false, true, false, false);
        $channel->queue_declare($queueName1, false, true, false, false);
        $channel->queue_declare($queueName2, false, true, false, false);
        $channel->queue_bind($queueName1, $exchangeName, '');
        $channel->queue_bind($queueName2, $exchangeName, '');
        $msgBody = 'I am fanout message';
        $msg = new AMQPMessage($msgBody, ['delivery_mode' => 2]);
        $channel->basic_publish($msg, $exchangeName, '');
    }

    /**
     * 直连模式 
     * 队列需绑定交换机和路由
     */
    public function actionDirect()
    {
        $conn = $this->connect;
        $channel = $conn->channel();
        $exchangeName = 'exchange_direct';
        $rk = 'success';
        $channel->exchange_declare($exchangeName, 'direct', false, true, false, false);
        $msgBody = 'This is success message';
        $msg = new AMQPMessage($msgBody, ['delivery_mode' => 2]);
        $channel->basic_publish($msg, $exchangeName, $rk);
    }
 
    /**
     * 主题模式 
     * [*表示一个单词 #表示零个或多个]
     */
    public function actionTopic()
    {
        $conn = $this->connect;
        $channel = $conn->channel();
        $exchangeName = 'exchange_topic';
        $rk = 'mail.success';
        $channel->exchange_declare($exchangeName, 'topic', false, true, false, false);
        $msgBody = 'This is send.mail.success message';
        $msg = new AMQPMessage($msgBody, ['delivery_mode' => 2]);
        $channel->basic_publish($msg, $exchangeName, $rk);
    }


    /**
     * 发送消息确认
     * confirm模式 交换机向队列发送消息
     */
    public function actionConfirm()
    {
        $exchangeName = 'exchange_confirm';
        $queueName = 'queue_confirm';
        $routingKey = 'confirm';
        $conn = $this->connect;
        $channel = $conn->channel();
        $channel->exchange_declare($exchangeName, 'direct', false, true, false);
        $channel->queue_declare($queueName, false, true, false, false);
        $channel->queue_bind($queueName, $exchangeName, $routingKey);
        //发送成功回调
        $channel->set_ack_handler(function (AMQPMessage $message) {
            echo $message->body . ' send succeed';
        });
        //发送失败回调
        $channel->set_nack_handler(function (AMQPMessage $message) {
            echo $message->body . ' send failed';
        });
        //开启发送监听
        $channel->confirm_select();

        $msg = new AMQPMessage('测试消息', ['delivery_mode' => 2]);
        $channel->basic_publish($msg, $exchangeName, $routingKey);

        //阻塞等待消息确认
        $channel->wait_for_pending_acks();
    }

    /**
     * 死信队列  延迟队列
     * 当队列和消息同时设置过期时间 以最短时间为准
     * 消费死信队列消息
     */
    public function actionDead()
    {
        $exchangeName = 'exchange_ttl';
        $queueName = 'queue_ttl';
        $routingKey = 'ttl';
        $deadExchangeName = 'exchange_dead';
        $deadQueueName = 'queue_dead';
        $deadRoutingKey = 'dead_key';
        $conn = $this->connect;
        $channel = $conn->channel();
        $channel->exchange_declare($exchangeName, 'direct', false, true, false);
        $channel->exchange_declare('exchange_dead', 'direct', false, true, false);
        //死信队列参数
        $arguments = new AMQPTable([
            'x-dead-letter-exchange' => $deadExchangeName,
            'x-dead-letter-routing-key' => $deadRoutingKey,
            'x-message-ttl' => 10000
        ]);
        $channel->queue_declare($queueName, false, true, false, false, false, $arguments);
        $channel->queue_declare('queue_dead', false, true, false, false);
        $channel->queue_bind($queueName, $exchangeName, $routingKey);
        $channel->queue_bind($deadQueueName, $deadExchangeName, $deadRoutingKey);
        $message = new AMQPMessage('5s后该消息将过期', ['delivery_mode' => 2, 'expiration' => 5000]);
        $channel->basic_publish($message, $exchangeName, $routingKey);
    }

    public function __destruct()
    {
        $conn = $this->connect;
        $conn->close();
    }
}

单单设置队列的ttl,或者单单设置相同的消息过期时间,死信队列是能正常工作的。但是设置不同的消息过期时间,就可能无法正常使用死信队列了。

当MQ检查队列中的第一个消息时,发现其并未过期,则不会继续检查之后的消息了。即使之后的消息过期了,也会因为没在队列头部而无法流转到其他队列,这是MQ队列的特性决定的。你不能去消费队列中间的消息,队列必须先进先出。

对于设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而设置消息头部属性,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期时在即将投递到消费者之前判定的,为什么两者得处理方法不一致?因为第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期消息即可,而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息,势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期,再进行删除。

二 、消费消息

<?php

namespace app\commands;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use yii\console\Controller;

class ReadController extends Controller
{

    protected $connect = '';

    public function init()
    {
        parent::init();
        $config = [
            'host' => '192.168.56.102',
            'port' => 5672,
            'user' => 'admin',
            'pwd' => 123456,
            'vhost' => '/'
        ];
        $this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['pwd'], $config['vhost']);
        return $this->connect;
    }

    public function process_message($message)
    {
        $message = $message->body;
        var_dump($message);
    }


    public function actionSimple()
    {
        $conn = $this->connect;
        $queueName = 'queue_simple';
        $channel = $conn->channel();
        $channel->basic_consume($queueName, '', false, true, false, false,
            function ($msg) {
                $this->process_message($msg);
            });
        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

    public function actionFan1()
    {
        $conn = $this->connect;
        $queueName = 'queue_fanout1';
        $channel = $conn->channel();
        $channel->basic_consume($queueName, '', false, true, false, false, function ($msg) {
            $this->process_message($msg);
        });
        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

    public function actionFan2()
    {
        $conn = $this->connect;
        $queueName = 'queue_fanout2';
        $channel = $conn->channel();
        $callback = function ($message) {
            echo 'I am fanout2;' . PHP_EOL;
            print_r($message->body);
        };
        $channel->basic_consume($queueName, '', false, true, false, false, $callback);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

    public function actionDirect()
    {
        $conn = $this->connect;
        $exchangeName = 'exchange_direct';
        $queueName = 'queue_direct';
        $channel = $conn->channel();
        $channel->queue_declare($queueName, false, true, false, false);
        $channel->queue_bind($queueName, $exchangeName, 'info');
        $callback = function ($message) {
            echo 'INFO:' . PHP_EOL;
            print_r($message->body);
        };
        $channel->basic_consume($queueName, '', false, true, false, false, $callback);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }


    public function actionDirect1()
    {
        $conn = $this->connect;
        $exchangeName = 'exchange_direct';
        $queueName = 'queue_direct1';
        $channel = $conn->channel();
        $channel->queue_declare($queueName, false, true, false, false);
        $channel->queue_bind($queueName, $exchangeName, 'error');
        $channel->queue_bind($queueName, $exchangeName, 'warning');
        $callback = function ($message) {
            echo 'ERROR or WARNING:' . PHP_EOL;
            print_r($message->body);
        };
        $channel->basic_consume($queueName, '', false, true, false, false, $callback);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

    public function actionTopic1()
    {
        $conn = $this->connect;
        $exchangeName = 'exchange_topic';
        $queueName = 'queue_topic1';
        $channel = $conn->channel();
        $channel->queue_declare($queueName, false, true, false, false);
        $channel->queue_bind($queueName, $exchangeName, '*.mail');
        $callback = function ($message) {
            echo $message->body . PHP_EOL;
        };
        $channel->basic_consume($queueName, '', false, true, false, false, $callback);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

    public function actionTopic2()
    {
        $conn = $this->connect;
        $exchangeName = 'exchange_topic';
        $queueName = 'queue_topic2';
        $channel = $conn->channel();
        $channel->queue_declare($queueName, false, true, false, false);
        $channel->queue_bind($queueName, $exchangeName, 'mail.#');
        $callback = function ($message) {
            echo $message->body . PHP_EOL;
        };
        $channel->basic_consume($queueName, '', false, true, false, false, $callback);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

    /**
     * 消费消息 消费成功确认
     */
    public function actionAck()
    {
        $queueName = 'queue_confirm';
        $conn = $this->connect;
        $channel = $conn->channel();
        $channel->queue_declare($queueName, false, true, false, false);
        $callback = function ($message) {
            $msg = $message->body;
            echo 'message==' . $msg . PHP_EOL;
            if ($msg == '200') {
                //消费成功 手动应答
                $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
            } else {
                /**
                 * 消费失败 basic_nack($delivery_tag, $multiple = false, $requeue = false)
                 * requeue 是否重新发送 false 否  true是
                 */
                $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, false);
            }
        };
        //消费成功后再发送下一条消息
        $channel->basic_qos(null, 1, null);
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

    public function actionDead()
    {
        $conn = $this->connect;
        $channel = $conn->channel();
        $callback = function ($message) {
            echo $message->body . PHP_EOL;
            sleep(2);
            $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        };
        $channel->basic_consume('queue_dead', '', false, false, false, false, $callback);
        $channel->basic_qos(null, 1, null);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

}

参数说明:

建立连接

$conn = new AMQPStreamConnection(
$host,//RabbitMQ服务器主机IP地址
$port,//RabbitMQ服务器端口
$user,//连接RabbitMQ服务器的用户名
$password,//连接RabbitMQ服务器的用户密码
$vhost,//连接RabbitMQ服务器的vhost(服务器可以有多个vhost,虚拟主机,类似nginx的vhost)
$insist = false,
$login_method = 'AMQPLAIN',
$login_response = null,
$locale = 'en_US',
$connection_timeout = 3.0,//连接超时
$read_write_timeout = 3.0,//读写超时
$context = null,//上下文
$keepalive = false, //是否开启长连接 常驻进程消费者需要
$heartbeat = 0,//心跳检测间隔 单位秒 0不检测 根据情形酌情填写
$channel_rpc_timeout = 0.0,
$ssl_protocol = null
);
申明交换机
$channel->exchange_declare($exhcange_name, $type, $passive, $durable,
$auto_delete,$internal,$nowait,$arguments,$ticket);
参数:
$exhcange_name 交换器名字
$type          交换器类型
$passive       是否检测同名队列
$durable       交换机是否开启持久化
$auto_detlete  通道关闭后是否删除队列
(1)交换器类型
枚举 [
    direct: (默认)直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue,
    fanout: 广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue,
    topic:  主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。(* 表是匹配一个任意词组,#表示匹配0个或多个词组),
    headers:根据消息体的header匹配
]

申明队列?

$channel->queue_declare($queue_name, $passive, $durable, $exclusive, $auto_delete,$nowait,$arguments,$ticket);
参数:
$queue_name  队列名称
$passive     是否检测同名队列
$durable     是否开启队列持久化
$exclusive   队列是否可以被其他队列访问
$auto_delete 通道关闭后是否删除队列

推送消息?

$channel->basic_publish($msg,
        $exchange = '',
        $routing_key = '',
        $mandatory = false,
        $immediate = false,
        $ticket = null);
参数:
$msg         消息内容
$exchange    交换器
$routing_key routing_key
$mandatory   匹配不到队列时,是否立即丢弃消息
$immediate   队列无消费者时,是否立即丢弃消息
$ticket      

消费消息

$channel->basic_consume(
        $queue = '',
        $consumer_tag = '',
        $no_local = false,
        $no_ack = false,
        $exclusive = false,
        $nowait = false,
        $callback = null,
        $ticket = null,
        $arguments = array()
    )
参数:
$queue        队列名
$consumer_tag 
$no_local 
$no_ack       是否自动ack true自动  false手动
$exclusive
$nowait
$callback     消息回调函数
$ticket
$arguments
  开发测试 最新文章
pytest系列——allure之生成测试报告(Wind
某大厂软件测试岗一面笔试题+二面问答题面试
iperf 学习笔记
关于Python中使用selenium八大定位方法
【软件测试】为什么提升不了?8年测试总结再
软件测试复习
PHP笔记-Smarty模板引擎的使用
C++Test使用入门
【Java】单元测试
Net core 3.x 获取客户端地址
上一篇文章      下一篇文章      查看所有文章
加:2022-04-18 18:12:40  更:2022-04-18 18:15:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/17 22:48:42-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码