环境 PHP7,Thinkphp5,php-amqp类库
场景描述 生产的消息队列,其消费者总是无故断开,基本在五天左右就会断开,但是程序端消费者的进程却仍在继续,并且没有捕获到任何异常。经过多次尝试,最终将问题定位在mq的心跳问题。因为我们数据中心的防火墙正在终止空闲连接
mq心跳 1.rabbitmq使用心跳机制来保持连接,在正常场景下,客户端期望通过发送心跳包来告知服务端自己存活。如果服务端连续两次发送心跳客户端均无回应,服务端会断开与客户端的连接。心跳间隔可在每次连接时设置。 2.因php是同步语言,它无法在后台运行耗时任务时持续发送心跳包。这时候服务端就会断开连接,而客户端只有继续使用这个队列的时候才会发现已断开
查看消费者源码可以发现,heartbeat的默认值为0,表示不开启心跳检测
当heartbeat>0时表示开启心跳,此时需要注意的问题有:
- 需要将keepalive设置为true。
- 应该重新设定read_write_timeout的值,如果该属性低于心跳,则连接将在尝试写入时消失,一般来说,应该使用双倍心跳的值。我使用的是60/30。
- 网络问题可能会影响心跳包的发送与接收,导致脚本的异常中断。为防止该问题,应该在消费者外层进行异常捕获
此外,由于mq长时间等待的问题,可能会造成数据库连接的断开,因此我在程序中的处理方法是处理完一次业务后主动断开数据库连接(Db::close()),这样下次业务开始时会重新建立连接。
完整的消费者代码 代码基于thinkphp的命令行工具,即使用php think VisitorData启动消费者类,最下面的重启消费者基于Supervisor进程守护,因此杀掉进程后会自动重启。Supervisor进程守护具体实现参照:https://blog.csdn.net/xiaodong_526/article/details/120979919?spm=1001.2014.3001.5502
class VisitorData extends Command
{
protected function configure()
{
defined("ENV") or define("ENV", getenv('profile') ?: 'pre');
Config::load(env('config_path') . 'extra/' . ENV . '.php', 'extra');
$this->setName('VisitorData')->setDescription('rabbitMQ消费者');
}
protected function execute(Input $input, Output $output)
{
try {
$mqConfig = config("extra.aiVisitorDataMq");
AmqService::consume($mqConfig, true, function ($msg) {
echo "\n--------\n";
echo "time:" . time() . ": " . $msg->body;
echo "\n--------\n";
$body = json_decode($msg->body, true);
try {
$resultMsg = (new VisitorInfo())->dealVisitorData($body);
echo "处理完毕,msg:" . $resultMsg;
Db::close();
} catch (\Exception $e) {
Db::close();
echo "error-VisitorData,(" . time() . "):" . $e->getMessage() . PHP_EOL;
sleep(10);
}
sleep(0.2);
});
} catch (\Exception $e) {
echo "error-VisitorData-consume,(" . time() . "),即将重启消费者进程,原因:" . $e->getMessage() . PHP_EOL;
sleep(20);
$exec = "ps -ef | grep 'php think VisitorData' | grep -v grep | awk '{print $1}' | xargs -r kill -9";
exec($exec);
}
}
}
|