场景:微服务中,会遇到这样的案例:用户申请提现,总后台(后台服务)审核通过,通知资金服务 更新数据;
1.安装 composer 包
composer requires php-amqplib/php-amqplib ^2.12
2. env 追加配置
RABBITMQ_HOST=xb_rabbitmq
RABBITMQ_PORT=5672
#通过15672创建的rabbitmq虚拟主机,默认是'/'
RABBITMQ_VHOST=/
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
#通过15672创建的rabbitmq队列
RABBITMQ_QUEUE=withdrawal-queue
RABBITMQ_EXCHANGE=withdrawal-exchange
QUEUE_CONNECTION=rabbitmq # 更新
3.追加配置 config/queue.php connections 下 后 执行 php artisan config:cache
'rabbitmq' => [
'driver' => 'rabbitmq',
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'vhost' => env('RABBITMQ_VHOST', '/'),
'login' => env('RABBITMQ_LOGIN', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'queue' => env('RABBITMQ_QUEUE'), // name of the default queue,
'exchange_name' => env('RABBITMQ_QUEUE'),
'exchange_declare' => env('RABBITMQ_EXCHANGE_DECLARE', true), // create the exchange if not exists
'queue_declare_bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true), // create the queue if not exists and bind to the exchange
'queue_params' => [
'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
],
'exchange_params' => [
'name' => env('RABBITMQ_EXCHANGE_NAME', null),
'type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'), // more info at http://www.rabbitmq.com/tutorials/amqp-concepts.html
'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true), // the exchange will survive server restarts
'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
],
],
4.审核通过方法
public function adopt()
{
//我将用户提现申请,审核通过 我生产一条消息 等待资金服务消费去做资金改动
$withdrawalId = 100;//提现id
event(new WithdrawalEvent(['withdrawal_id' => $withdrawalId]));
}
5.创建 监听事件 Event
<?php
namespace App\Events;
use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class WithdrawalEvent
{
use Dispatchable, InteractsWithSockets, SerializesModels;
public $withdrawal;
public function __construct($withdrawal)
{
$this->withdrawal= $withdrawal;
}
/**
* Get the channels the event should broadcast on.
*
* @return \Illuminate\Broadcasting\Channel|array
*/
public function broadcastOn()
{
return new PrivateChannel('channel-name');
}
}
5.创建 监听事件 Listener
<?php
namespace App\Listeners;
use App\Events\WithdrawalEvent;
use Illuminate\Support\Facades\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class WithdrawalListener
{
protected $config = [];
public function __construct()
{
$this->config = config('queue.connections.rabbitmq');
}
/**
* Handle the event.
*
* @param WithdrawalEvent $event
* @return void
*/
public function handle(WithdrawalEvent $event)
{
try {
$connect = new AMQPStreamConnection( //建立生产者与mq之间的连接
$this->config['host'],$this->config['port'],$this->config['login'],$this->config['password'], '/'
);
$channel = $connect->channel(); //在已连接基础上建立生产者与mq之间的通道
$channel->exchange_declare($this->config['exchange_name'], 'direct', false, true, false); //声明初始化交换机
$channel->queue_declare($this->config['queue'], false, true, false, false); //声明初始化一条队列
$channel->queue_bind($this->config['queue'], $this->config['exchange_name']); //将队列与某个交换机进行绑定,并使用路由关键字
$msgBody = json_encode($event->withdrawal);
$msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息
$channel->basic_publish($msg, $this->config['exchange_name']); //推送消息到某个交换机
$channel->close();
$connect->close();
}catch (\Exception $exception){
Log::info($exception->getMessage());
}
}
}
6.创建服务层
<?php
namespace App\Services;
use Illuminate\Support\Facades\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RmqClientService
{
/**
* @var object $instance 单例对象
*/
private static $instance = null;
/**
* @var object $connection 队列连接对象
*/
private $connection = null;
/**
* @var object $channel 队列通道对象
*/
private $channel = null;
/**
* @var object $message 队列消息对象
*/
private $message = null;
/**
* 构造函数
*
*/
private function __construct()
{
//dd(config('queue.connections.rabbitmq.vhost'));
$this->connection = new AMQPStreamConnection(
config('queue.connections.rabbitmq.host'),
config('queue.connections.rabbitmq.port'),
config('queue.connections.rabbitmq.login'),
config('queue.connections.rabbitmq.password'),
config('queue.connections.rabbitmq.vhost')
);
$this->channel = $this->connection->channel();
$this->message = new AMQPMessage('', ['content_type' => 'json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
}
/**
* FunctionName:__clone
* Description:克隆
* Author:lwl
*/
private function __clone()
{}
/**
* 析构函数
*/
public function __destruct()
{
$this->channel->close();
$this->connection->close();
self::$instance = null;
}
/**
* FunctionName:getInstance
* Description:单例实例化入口
* Author:lwl
* @return RmqClientService|object|null
*/
public static function getInstance()
{
if (!self::$instance instanceof self) {
self::$instance = new self();
}
return self::$instance;
}
/**
* FunctionName:consumer
* Description:消费队列
* Author:lwl
* @param string $queue 队列名称
* @param boolean $bForceDelete 是否取后即删
* @return AMQPMessage|null
*/
public function consumer(string $queue, bool $bForceDelete = false)
{
try {
// 取数据
// 声明队列
// 不检测同名队列,持久化,不允许其他队列访问,不自动删除队列
$this->channel->queue_declare($queue, false, true, false, false);
$message = $this->channel->basic_get($queue);
if ($message && $bForceDelete) {
// 回复确认信息
$this->channel->basic_ack($message->delivery_info['delivery_tag']);
}
} catch (\Exception $exception) {
Log::info($exception->getMessage());
}
return $message;
}
/**
* FunctionName:ack
* Description:
* Author:lwl
* @param $nTag 消息传递标签
* @return mixed
*/
public function ack($nTag)
{
try {
$this->channel->basic_ack($nTag);
} catch (\Exception $e) {
dd($e->getMessage());
}
return null;//success 个人响应
}
}
7.创建 Command
<?php
namespace App\Console\Commands;
use App\Services\RmqClientService;
use Illuminate\Console\Command;
class WithdrawalConsumerCommand extends Command
{
protected $signature = 'withdrawal:consumer';
protected $description = '消费提现审核通过后的消息';
public function handle()
{
while (true) {
$service = RmqClientService::getInstance();
$queue = config('queue.connections.rabbitmq.queue');
$response = $service->consumer($queue, true);
if ($response) {
$result = json_decode($response->body,1);
dd($result);
//资金服务后续操作
}
dd('service error');
}
}
}
8.测试
1.通过 ‘提现审核通过‘ 的路由 生产消息
http:
2.执行 php artisan withdrawal:consumer
规则 | 说明 |
---|
direct | 精准推送 | fanout 广播 | 推送到绑定到此交换机下的所有队列 | topic 组播 | 比如上面我绑定的关键字是sms_send,那么他可以推送到*.sms_send的所有队列 | headers | 这个目前不知道是如何推送的 |
9.在创建交换机和队列的时候各个常用参数说明 地址
name: $queue // should be unique in fanout exchange. [队列名称]
passive: false // don't check if a queue with the same name exists [是否检测同名队列]
durable: false // the queue will not survive server restarts [是否开启队列持久化]
exclusive: false // the queue might be accessed by other channels [队列是否可以被其他队列访问]
auto_delete: true //the queue will be deleted once the channel is closed. [通道关闭后是否删除队列]
name: $exchange [交换机名称]
type: direct [路由类型]
passive: false []
durable: true [交换机是否开启持久化]
auto_delete: false //the exchange won't be deleted once the channel is closed.
10.Supvervisor 守护进程 消费 ,进入容器内
10-1.安装 supervisor
apt-get install supervisor
10-2.切换到配置目录
cd /etc/supervisor/conf.d
10-3.写入配置 vim mq.conf
[program:withdrawal_consumer] #管理进程的命名
command=php artisan withdrawal:consumer #执行的命令
stderr_logfile=/var/log/supervisor/error.log #错误日志输出路径
stdout_logfile=/var/log/supervisor/supervisor.log #日志输出路径
directory=/workspace/xiaoba/oldLiuCms #命令执行的工作空间
autostart=true #自动启动
user=root #指定用户
autorestart=true #自动重启
10-4.重新加载
service supervisor force-reload
10-5.启动进程
service supervisor start withdrawal_consumer
|