IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> PHP知识库 -> Laravel 8.63.0\b 之 RabbitMQ 生产&消费案例 -> 正文阅读

[PHP知识库]Laravel 8.63.0\b 之 RabbitMQ 生产&消费案例

场景:微服务中,会遇到这样的案例:用户申请提现,总后台(后台服务)审核通过,通知资金服务 更新数据;

1.安装 composer 包

composer requires php-amqplib/php-amqplib ^2.12

2. env 追加配置

RABBITMQ_HOST=xb_rabbitmq
RABBITMQ_PORT=5672
#通过15672创建的rabbitmq虚拟主机,默认是'/'
RABBITMQ_VHOST=/
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
#通过15672创建的rabbitmq队列
RABBITMQ_QUEUE=withdrawal-queue
RABBITMQ_EXCHANGE=withdrawal-exchange


QUEUE_CONNECTION=rabbitmq # 更新

3.追加配置 config/queue.php connections 下 后 执行 php artisan config:cache

'rabbitmq' => [
            'driver'                => 'rabbitmq',

            'host'                  => env('RABBITMQ_HOST', '127.0.0.1'),
            'port'                  => env('RABBITMQ_PORT', 5672),

            'vhost'                 => env('RABBITMQ_VHOST', '/'),
            'login'                 => env('RABBITMQ_LOGIN', 'guest'),
            'password'              => env('RABBITMQ_PASSWORD', 'guest'),

            'queue'                 => env('RABBITMQ_QUEUE'), // name of the default queue,
            'exchange_name'         => env('RABBITMQ_QUEUE'),

            'exchange_declare'      => env('RABBITMQ_EXCHANGE_DECLARE', true), // create the exchange if not exists
            'queue_declare_bind'    => env('RABBITMQ_QUEUE_DECLARE_BIND', true), // create the queue if not exists and bind to the exchange

            'queue_params'          => [
                'passive'           => env('RABBITMQ_QUEUE_PASSIVE', false),
                'durable'           => env('RABBITMQ_QUEUE_DURABLE', true),
                'exclusive'         => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
                'auto_delete'       => env('RABBITMQ_QUEUE_AUTODELETE', false),
            ],

            'exchange_params' => [
                'name'        => env('RABBITMQ_EXCHANGE_NAME', null),
                'type'        => env('RABBITMQ_EXCHANGE_TYPE', 'direct'), // more info at http://www.rabbitmq.com/tutorials/amqp-concepts.html
                'passive'     => env('RABBITMQ_EXCHANGE_PASSIVE', false),
                'durable'     => env('RABBITMQ_EXCHANGE_DURABLE', true), // the exchange will survive server restarts
                'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
            ],

        ],

4.审核通过方法

    public function adopt()
    {
        //我将用户提现申请,审核通过 我生产一条消息 等待资金服务消费去做资金改动
        $withdrawalId = 100;//提现id
        event(new WithdrawalEvent(['withdrawal_id' => $withdrawalId]));
    }

5.创建 监听事件 Event

<?php

namespace App\Events;

use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class WithdrawalEvent
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public $withdrawal;
    public function __construct($withdrawal)
    {
        $this->withdrawal= $withdrawal;
    }

    /**
     * Get the channels the event should broadcast on.
     *
     * @return \Illuminate\Broadcasting\Channel|array
     */
    public function broadcastOn()
    {
        return new PrivateChannel('channel-name');
    }
}


5.创建 监听事件 Listener

<?php

namespace App\Listeners;

use App\Events\WithdrawalEvent;
use Illuminate\Support\Facades\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class WithdrawalListener
{
    protected $config = [];
    public function __construct()
    {
        $this->config = config('queue.connections.rabbitmq');
    }

    /**
     * Handle the event.
     *
     * @param  WithdrawalEvent  $event
     * @return void
     */
    public function handle(WithdrawalEvent $event)
    {
        try {
            $connect = new AMQPStreamConnection( //建立生产者与mq之间的连接
                $this->config['host'],$this->config['port'],$this->config['login'],$this->config['password'], '/'
            );
            $channel = $connect->channel(); //在已连接基础上建立生产者与mq之间的通道
            $channel->exchange_declare($this->config['exchange_name'], 'direct', false, true, false); //声明初始化交换机
            $channel->queue_declare($this->config['queue'], false, true, false, false); //声明初始化一条队列
            $channel->queue_bind($this->config['queue'], $this->config['exchange_name']); //将队列与某个交换机进行绑定,并使用路由关键字
            $msgBody = json_encode($event->withdrawal);
            $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息
            $channel->basic_publish($msg, $this->config['exchange_name']); //推送消息到某个交换机
            $channel->close();
            $connect->close();
        }catch (\Exception $exception){
            Log::info($exception->getMessage());
        }
    }
}

6.创建服务层

<?php

namespace App\Services;

use Illuminate\Support\Facades\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RmqClientService
{
    /**
     * @var object $instance 单例对象
     */
    private static $instance = null;

    /**
     * @var object $connection 队列连接对象
     */
    private $connection = null;

    /**
     * @var object $channel 队列通道对象
     */
    private $channel = null;

    /**
     * @var object $message 队列消息对象
     */
    private $message = null;

    /**
     * 构造函数
     *
     */
    private function __construct()
    {
        //dd(config('queue.connections.rabbitmq.vhost'));
        $this->connection = new AMQPStreamConnection(
            config('queue.connections.rabbitmq.host'),
            config('queue.connections.rabbitmq.port'),
            config('queue.connections.rabbitmq.login'),
            config('queue.connections.rabbitmq.password'),
            config('queue.connections.rabbitmq.vhost')
        );
        $this->channel = $this->connection->channel();
        $this->message = new AMQPMessage('', ['content_type' => 'json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    }

    /**
     * FunctionName:__clone
     * Description:克隆
     * Author:lwl
     */
    private function __clone()
    {}

    /**
     * 析构函数
     */
    public function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
        self::$instance = null;
    }

    /**
     * FunctionName:getInstance
     * Description:单例实例化入口
     * Author:lwl
     * @return RmqClientService|object|null
     */
    public static function getInstance()
    {
        if (!self::$instance instanceof self) {
            self::$instance = new self();
        }
        return self::$instance;
    }

    /**
     * FunctionName:consumer
     * Description:消费队列
     * Author:lwl
     * @param string $queue 队列名称
     * @param boolean $bForceDelete 是否取后即删
     * @return AMQPMessage|null
     */
    public function consumer(string $queue, bool $bForceDelete = false)
    {
        try {
            // 取数据
            // 声明队列
            // 不检测同名队列,持久化,不允许其他队列访问,不自动删除队列
            $this->channel->queue_declare($queue, false, true, false, false);
            $message = $this->channel->basic_get($queue);
            if ($message && $bForceDelete) {
                // 回复确认信息
                $this->channel->basic_ack($message->delivery_info['delivery_tag']);
            }
        } catch (\Exception $exception) {
            Log::info($exception->getMessage());
        }
        return $message;
    }

    /**
     * FunctionName:ack
     * Description:
     * Author:lwl
     * @param $nTag 消息传递标签
     * @return mixed
     */
    public function ack($nTag)
    {
        try {
            $this->channel->basic_ack($nTag);
        } catch (\Exception $e) {
            dd($e->getMessage());
        }
        return null;//success 个人响应
    }
}

7.创建 Command

<?php
namespace App\Console\Commands;

use App\Services\RmqClientService;
use Illuminate\Console\Command;

class WithdrawalConsumerCommand extends Command
{
    protected $signature = 'withdrawal:consumer';
    protected $description = '消费提现审核通过后的消息';

    public function handle()
    {
        while (true) {
            $service = RmqClientService::getInstance();
            $queue = config('queue.connections.rabbitmq.queue');
            $response = $service->consumer($queue, true);
            if ($response) {
                $result = json_decode($response->body,1);
                dd($result);
                //资金服务后续操作
            }
            dd('service error');
        }
    }
}

8.测试

1.通过 ‘提现审核通过‘ 的路由 生产消息
	http://test.test/api/user/adopt
	
2.执行 php artisan withdrawal:consumer #如下图 生产环境使用 Supvervisor 等进程管理 常驻监听 (请查看 10)

在这里插入图片描述

规则说明
direct精准推送
fanout 广播推送到绑定到此交换机下的所有队列
topic 组播比如上面我绑定的关键字是sms_send,那么他可以推送到*.sms_send的所有队列
headers这个目前不知道是如何推送的

9.在创建交换机和队列的时候各个常用参数说明 地址



   name: $queue    // should be unique in fanout exchange. [队列名称]
   passive: false  // don't check if a queue with the same name exists [是否检测同名队列]
   durable: false // the queue will not survive server restarts [是否开启队列持久化]
   exclusive: false // the queue might be accessed by other channels [队列是否可以被其他队列访问]
   auto_delete: true //the queue will be deleted once the channel is closed. [通道关闭后是否删除队列]
   
   
   
   name: $exchange [交换机名称]
   type: direct [路由类型]
   passive: false []
   durable: true [交换机是否开启持久化]
   auto_delete: false //the exchange won't be deleted once the channel is closed.

10.Supvervisor 守护进程 消费 ,进入容器内

10-1.安装 supervisor

apt-get install supervisor

10-2.切换到配置目录

cd /etc/supervisor/conf.d

10-3.写入配置 vim mq.conf

[program:withdrawal_consumer]                                       #管理进程的命名
   command=php artisan withdrawal:consumer     #执行的命令
   stderr_logfile=/var/log/supervisor/error.log      #错误日志输出路径
   stdout_logfile=/var/log/supervisor/supervisor.log   #日志输出路径
   directory=/workspace/xiaoba/oldLiuCms      #命令执行的工作空间
   autostart=true                 #自动启动
   user=root                   #指定用户
   autorestart=true                #自动重启

10-4.重新加载

service  supervisor   force-reload

10-5.启动进程

 service  supervisor   start withdrawal_consumer
  PHP知识库 最新文章
Laravel 下实现 Google 2fa 验证
UUCTF WP
DASCTF10月 web
XAMPP任意命令执行提升权限漏洞(CVE-2020-
[GYCTF2020]Easyphp
iwebsec靶场 代码执行关卡通关笔记
多个线程同步执行,多个线程依次执行,多个
php 没事记录下常用方法 (TP5.1)
php之jwt
2021-09-18
上一篇文章      下一篇文章      查看所有文章
加:2021-11-12 19:22:57  更:2021-11-12 19:23:58 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 18:36:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码