<?php
require_once __DIR__ . '/lib/rabbitmq/rabbitmq.php';
require_once __DIR__ . '/lib/general/general.php';
define('RUNENV', getenv('ENV') ? getenv('ENV') : 'local');
$url = 'https://local.banggood.com/?com=test&t=getShipment'; //业务操作接口
$workerNum = $argv[1] ?: 1; //设置swoole工作进程池的数量,工作进程的数量(当一个进程退出后,Pool会及时拉取另一个进程进行补充)
$pool = new Swoole\Process\Pool($workerNum);
$pool->on('WorkerStart', function ($pool, $workerId) use ($url) { //子进程启动,回调函数
//**************************业务操作处理****************************
try {
$zmq = new ZRabbitmq(RUNENV); //实例一个mq
$zmq->setChannel() // 建立连接
->setExchange('bg_shipment_channel_limit_batch') //设置交换机
->setQueue('bg_shipment_channel_limit_batch', AMQP_DURABLE); //设置队列
$zmq->consume(function ($msg) use ($url,$zmq) { //消费bg_shipment_channel_limit_batch队列
$data = json_decode($msg,true); //解析$msg(队列中取得的数据)
$url.='&token=27b60a7c267cb2fb75017eaa2afa3460&id='.(int)$data['id'].'&tag_ids='.json_encode($data['tag_ids'])
.'&key='.$data['key'].'&last='.$data['last'].'&is_batch='.$data['is_batch']; //把数据凭借到接口url
ZGeneral::curlGet($url, '', 300, $httpCode); //把队列中的数据,逐个请求到指定业务操作的接口(包含sql操作,redis操作等等)
if($httpCode != 200){
//此处可加上,记录接口请求失败的错误日志逻辑
}
$ret = $httpCode == 200 ? true : false;
return $ret;
}, 1);
} catch (AMQPQueueException $ex) {
$errorMsg = $ex->getMessage(); //mq抛出异常
} catch (Exception $ex) {
$errorMsg = $ex->getMessage(); //php抛出异常
}
if($errorMsg){
//此处可加上,记录接口请求失败的错误日志逻辑
}
});
$pool->start(); //启动swoole工作进程
|