问题场景描述
基于Swoole的WebSocket服务对站内的消息进行的推送,有个全站进行站内消息的推送很棘手,因为峰值的用户服务1600+/QPS,服务器的配置2核8G(的配置)。
难点在于:
- Http的服务是接收主站的请求,需要及时返回,响应时间不能久。
- Redis的执行时间不能太久,(Redis是单进程)慢请求会卡主其他的使用。
- 百万级用户场景,全站用户发送时间不定
- 旧版本是Crontab实现的,因为后台直接请求websocket服务改动大
以上诸多难题。
解决思路
难题有两个思路,先从业务场景分析,在从技术角度下手,因为全站消息通知是很久才会使用一次,有站内信做兜底,需要以保证业务可用性和性能,需要找到一个适中的策略。
需要在接收消息、执行任务(Crontab)的时候分别想办法。
1.Http接收服务中加分布式锁,粒度15分钟,15分钟不管提交多少次,都执行有且只有一次。
public function systemMessage(){
//验证参数...略
$lock = \EasySwoole\RedisPool\RedisPool::invoke(function (\EasySwoole\Redis\Redis $redis){
$lock = $redis->get('systemMessageLock');
return $lock;
},self::REDIS_CONN_NAME );
if($lock){
return $this->writeJson(Status::CODE_OK,[],Status::getReasonPhrase(Status::CODE_OK));
}
//没有锁,继续下面的操作
}
2.Crontab 的执行粒度是1分钟,也就是说最大延迟59s是可以接收的,这里的难于处理的在于第一次没有执行完毕,又执行了第二次,以此类推,这样是不可控的,下面是测试的结果,debug超出内存大小限制了。
[2022-02-22 03:00:23][debug][error] : [Allowed memory size of 134217728 bytes exhausted
还有task重复执行的问题,说明第一次没有执行完,又执行了下一次…果然和之前设想的一样…
$taskId:5workIndex:1
$taskId:3workIndex:2
$taskId:3workIndex:2
$taskId:8workIndex:0
$taskId:8workIndex:0
$taskId:1workIndex:3
解决思路:对脚本执行时间和使用内存进行设置,以防止执行任务时处于可控中,业务代码中加锁,以防止计算任务的重复执行。
对锁回收进行补偿处理,如果15分钟没有回收,锁主动释放,附上Demo。
function run( $taskId, $workerIndex)
{
ini_set('memory_limit', '1024M');
set_time_limit(0);
//读取队列中的数据
$redis = \EasySwoole\RedisPool\RedisPool::defer('redis');
$taskIdLock = $redis->get($this->taskLock);
if($taskIdLock){
return ;
}
$json = $redis->lPop(self::PUSH_MSG_NOTICE_SYSTEM);
if($json && !$taskIdLock){
$redis->setEx($this->taskLock,900,$taskId);
}
if(isset($json) && !empty($json)){
$server = ServerManager::getInstance()->getSwooleServer();
$start_fd = 0;
while(true)
{
$conn_list = $server->getClientList( $start_fd, $this->limit );
if ($conn_list===false || count($conn_list) === 0 || empty($conn_list))
{
//删除锁
\EasySwoole\RedisPool\RedisPool::invoke(function (\EasySwoole\Redis\Redis $redis) {
$redis->del($this->taskLock);
}, self::REDIS_CONN_NAME);
break;
}
$start_fd = end($conn_list);
foreach ($conn_list as $fd){
$server->push($fd, json_encode($this->pushMsg));
}
}
}
}
|