AsynDelayQueueService.class.php文件
<?php
namespace App\Tool\AsynQueue\Service;
use Common\Service\RedisService;
class AsynDelayQueueService
{
protected $redis;
protected $queue;
public function __construct()
{
$redis_obj = RedisService::getInstance();
$this->redis = $redis_obj->redis;
$this->queue = 'tp3_asyn_delay_queue';
}
public function push($exec_time, $class, $method, $method_param = [], $class_param = [])
{
if (class_exists($class) === false) {
return false;
}
if (method_exists($class, $method) === false) {
return false;
}
$task_info = [
'class' => $class,
'class_param' => $class_param,
'method' => $method,
'method_param' => $method_param,
];
$task_info = serialize($task_info);
$task_id = 'delay_task_id_' . md5($task_info);
$timeout = $exec_time - time() + 600;
$this->redis->set($task_id, $task_info, $timeout);
$result = $this->redis->zAdd($this->queue, $exec_time, $task_id);
M('asyn_delay_queue_log')->add([
'task_id' => $task_id,
'class' => $class,
'method' => $method,
'exec_time' => $exec_time,
'task_info' => $task_info,
]);
return $result;
}
public function run()
{
while (true) {
usleep(300000);
$current_time = time();
$task_id_list = $this->redis->zRangeByScore($this->queue, $current_time - 2, $current_time);
if (empty($task_id_list)) {
continue;
}
$resource = [];
foreach ($task_id_list as $task_id) {
$delete_number = $this->redis->zRem($this->queue, $task_id);
if (empty($delete_number)) {
continue;
}
if (empty($this->redis->exists($task_id))) {
continue;
}
$resource[] = @proc_open('nohup php cli.php "home/task/asynDelayStart/id/"' . $task_id . ' >/dev/null 2>&1 &',
[], $pipes);
}
}
}
public function start($task_id)
{
$task_info = $this->redis->get($task_id);
if (empty($task_info)) {
return false;
}
$task_info = unserialize($task_info);
$result = false;
if (empty($task_info['class'])) {
} elseif (class_exists($task_info['class']) === false) {
} elseif (method_exists($task_info['class'], $task_info['method']) === false) {
} else {
try {
if (empty($task_info['class_param'])) {
$obj = new $task_info['class']();
} else {
$obj = new $task_info['class'](...$task_info['class_param']);
}
$result = call_user_func_array([$obj, $task_info['method']], $task_info['method_param']);
} catch (\Exception $exception) {
}
}
if ($result === false) {
$status = 3;
} else {
$status = 2;
}
M('asyn_delay_queue_log')->where(['task_id' => $task_id])->save([
'status' => $status,
]);
return $result;
}
}
task文件
$resource[] = @proc_open(‘nohup php cli.php “home/task/asynDelayStart/id/”’ . $task_id . ’ >/dev/null 2>&1 &’, [], $pipes);在这里调用asynDelayStart方法
public function asynDelayStart($id)
{
$asyn_delay_service = new AsynDelayQueueService();
$asyn_delay_service->start($id);
}
public function asynTimelyRun()
{
$asyn_timely_service = new AsynTimelyQueueService();
$asyn_timely_service->run();
}
$service = new AsynDelayQueueService();
$exec_time = time() + $broadcastData['duration']+10;
$service->push($exec_time, 'Wechat\Logic\ShakeLotteryLogic', 'doLottery', ['prize_id'=> $broadcastData['prize_id']]);
|