需求
1、用户登录成功后通过消息队列写入mysql数据库 2、用户下单,付款成功和付款失败都会通过延时队列写入mysql数据库,处理掉该订单信息
环境准备
下载tp6框架并下载指定版本rabbitmq扩展包
composer create-project topthink/think=6.0 tp6
D:\phpstudy_pro\WWW\thinkphp6>composer require php-amqplib/php-amqplib=^3.0
D:\phpstudy_pro\WWW\thinkphp6>composer require topthink/think-view
配置虚拟主机后访问: http://www.rabbitmq.test/
需求一
用户登录成功后通过消息队列写入mysql数据库
用户登录成功后消息推送到消息队列 添加vhost 为vhost分配用户和权限 生产端:控制器代码 D:\phpstudy_pro\WWW\thinkphp6\app\controller\Login.php
<?php
declare (strict_types=1);
namespace app\controller;
use app\Request;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class Login
{
public function index()
{
return view('login');
}
public function login(Request $request)
{
$user_name = $request->param('user_name');
$password = $request->param('password');
if ($user_name == 'root' && $password == '12345678') {
$this->sendMsg();
}
}
public function sendMsg()
{
$queue_name = 'login_msg';
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'demo');
$channel = $connection->channel();
$channel->queue_declare($queue_name, false, true, false, false);
$data = 'root login success-'.time();
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
$channel->basic_publish($msg, $exchange = '', $queue_name);
$channel->close();
$connection->close();
}
}
前端代码: D:\phpstudy_pro\WWW\thinkphp6\view\login\login.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="">
<meta name="author" content="">
<title>用户登录</title>
<link rel="stylesheet" href="http://cdn.bootcss.com/twitter-bootstrap/3.0.1/css/bootstrap.min.css">
<link rel="stylesheet" href="http://cdn.bootcss.com/twitter-bootstrap/3.0.1/css/bootstrap-theme.min.css">
</head>
<body class="bs-docs-home">
<div class="container theme-showcase">
<h1 style=" line-height:2em;"> </h1><br />
<div class="row">
<div class="col-sm-3"></div>
<div class="col-sm-12">
<div class="panel panel-primary">
<div class="panel-heading">
<h3 class="panel-title"><strong>用户登录</strong></h3>
</div>
<div class="panel-body">
<div class="alert alert-dismissable">
<form action="/login" method="post" role="form" name="form1">
<div class="form-group">
<div class="input-group">
<input type="text" name="user_name" class="form-control" placeholder="用户名" >
</div>
</div>
<div class="form-group">
<div class="input-group">
<input type="text" name="password" class="form-control" placeholder="密码" >
</div>
</div>
<span class="input-group-btn">
<button class="btn btn-success" type="submit" >登录</button>
</span>
</form>
</div>
</div>
</div>
<div class="col-sm-3"></div>
</div>
</div>
<script src="https://code.jquery.com/jquery-1.10.2.min.js"></script>
<script src="http://cdn.bootcss.com/twitter-bootstrap/3.0.1/js/bootstrap.min.js"></script>
</body>
</html>
访问结果:
消费者消费消息到数据库
命令行生成消费端脚本文件
php think make:command Login login_msg
配置执行脚本文件 D:\phpstudy_pro\WWW\thinkphp6\config\console.php
return [
'commands' => [
'login_msg' => 'app\command\Login',
],
];
运行login_msg脚本文件
php think login_msg
消费端代码
<?php
declare (strict_types=1);
namespace app\command;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\facade\Db;
class Login extends Command
{
protected function configure()
{
$this->setName('login_msg')
->setDescription('the login_msg command');
}
protected function execute(Input $input, Output $output)
{
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'demo');
$channel = $connection->channel();
$queue_name = 'login_msg';
$channel->queue_declare($queue_name, false, true, false, false);
$callback = function ($msg) use ($output) {
$output->writeln($msg->body);
echo 'received = ', $msg->body . "\n";
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}
需求二
用户下单,付款成功和付款失败都会通过延时队列写入mysql数据库,处理掉该订单信息
订单数据推送到延迟队列 添加交换器 生产端代码:
<?php
declare (strict_types=1);
namespace app\controller;
use think\facade\Db;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Goods
{
public function index()
{
return view('goods');
}
public function paySuccess()
{
$id = Db::table('goods_order')->insertGetId(['is_pay' => 1]);
$this->sendMsg($id);
return 1;
}
public function payFail()
{
$id = Db::table('goods_order')->insertGetId(['is_pay' => 0]);
$this->sendMsg($id);
return 1;
}
public function sendMsg($id)
{
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'demo');
$channel = $connection->channel();
$exc_name = 'delay_exc_order';
$routing_key = 'delay_route_order';
$queue_name = 'delay_queue_order';
$ttl = 20000;
$channel->exchange_declare($exc_name, 'x-delayed-message', false, true, false);
$args = new AMQPTable(['x-delayed-type' => 'direct']);
$channel->queue_declare($queue_name, false, true, false, false, false, $args);
$data = $id;
$channel->queue_bind($queue_name, $exc_name, $routing_key);
$arr = ['delivery_mode' => AMQPMEssage::DELIVERY_MODE_PERSISTENT, 'application_headers' => new AMQPTable(['x-delay' => $ttl])];
$msg = new AMQPMessage($data, $arr);
$channel->basic_publish($msg, $exc_name, $routing_key);
$channel->close();
$connection->close();
}
}
延迟队列消息未付款订单处理
命令行生成消费端脚本文件
php think make:command Order order_msg
配置执行脚本文件 D:\phpstudy_pro\WWW\thinkphp6\config\console.php
return [
'commands' => [
'order_msg' => 'app\command\Order',
],
];
运行login_msg脚本文件
php think login_msg
消费端代码
<?php
declare (strict_types=1);
namespace app\command;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\facade\Db;
class Order extends Command
{
protected function configure()
{
$this->setName('order_msg')
->setDescription('the order_msg command');
}
protected function execute(Input $input, Output $output)
{
$connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'demo');
$channel = $connection->channel();
$exc_name = 'delay_exc_order';
$routing_key = 'delay_route_order';
$queue_name = 'delay_queue_order';
$channel->exchange_declare($exc_name, 'x-delayed-message', false, true, false);
$channel->queue_bind($queue_name, $exc_name, $routing_key);
$callback = function ($msg) use ($output) {
Db::table('goods_order')->where(['id' => $msg->body, 'is_pay' => 0])->delete();
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}
|