出现场景 使用 php-amqplib/php-amqplib 自定义进程进行消费,当程序运行1天左右样子再消费会出现 SQLSTATE[HY000] [2006] MySQL server has gone away,再次消费会出现SQLSTATE[HY000] [2002] Connection reset by peer or Transport endpoint is not connected
原因分析 easyswoole 使用的是数据库连接池,php-amqplib/php-amqplib 组件是同步阻塞io,无法触发协成切换,导致自定义进程中获取的mysql 链接脱离了连接池的控制,在时间到达8小时后被mysql 主动切断链接
目前的解决方案 在官方给出解决方案之前,目前的解决方案就是当在消费者中捕获到mysql 链接异常后 直接kill掉当前进程,easyswoole 框架会自动重启一个新的进程,因为rabbitmq 使用的是ack 确认机制,当消费者未提交ack确认信息的情况下,重新启动进程消息依然会被消费,从而不用担心消息丢失的问题 上代码
<?php
namespace App\Process\RabbitMqConsumer;
use App\Models\ClientModel;
use App\Service\LogService;
use App\Service\MqXDelayService;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\EasySwoole\Logger;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPAbstractCollection;
use PhpAmqpLib\Wire\AMQPTable;
class Test extends AbstractProcess
{
protected function run($arg)
{
try {
$pid = $this->getPid();
Logger::getInstance()->info("延时队列消费进程===============pid={$pid}");
$exchange = 'delay.myExchange';
$queue = 'delay.myQueue';
$config = \EasySwoole\EasySwoole\Config::getInstance()->getConf("rabbitmq");
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'],$config['vhost'], false, 'AMQPLAIN', null, 'en_US', 3.0, 120.0, null, true, 60);
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'x-delayed-message', false, true, false, false, false, new AMQPTable(['x-delayed-type' => 'direct']));
$channel->queue_declare($queue, false, true, false, false);
$callback = function ($msg) use ($pid){
$headersObject = $msg->get_properties()['application_headers'];
try {
Logger::getInstance()->info("延时队列消费进程,data={$msg->body}");
$data = json_decode($msg->body,true);
$msgId = $data['msgId'];
if (isset($data['listenerKey']) && isset($data['data']) && isset($data['msgId'])){
print_r($data['data']);
ClientModel::create()->get();
Logger::getInstance()->info("完成消费");
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
}catch (\Throwable $e){
$message = $e->getMessage();
Logger::getInstance()->error("延时队列消费异常:".$message);
Logger::getInstance()->error("延时队列消费异常:".$e->getTraceAsString());
if (strrpos(strtoupper($message),'SQLSTATE') !== false){
\Co::sleep(1);
Logger::getInstance()->error("数据库异常:".$message);
$cmd = "php easyswoole process kill --pid={$pid} -f";
Logger::getInstance()->error("已完成进程重启cmd:\r\n".$cmd);
shell_exec($cmd);
}
}
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queue, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}catch (\Throwable $exception){
Logger::getInstance()->error("延时队列异常:".$exception->getMessage());
Logger::getInstance()->error("延时队列异常:".$exception->getTraceAsString());
}
}
}
|