server服务端
<?php
//创建Server对象,监听 127.0.0.1:9510端口
$serv = new swoole_server("127.0.0.1", 9510);
$serv->on('start', function () {
echo "start success.\n";
});
$serv->set([
// 'daemonize' => 1,
// 'open_cpu_affinity' => 1,
// 'task_worker_num' => 1,
//'open_cpu_affinity' => 1,
//'task_worker_num' => 100,
//'enable_port_reuse' => true,
// 'http_compression' => false,
'worker_num' => 10,
//'log_file' => __DIR__.'/swoole.log',
// 'reactor_num' => 24,
//'dispatch_mode' => 3,
//'discard_timeout_request' => true,
// 'open_tcp_nodelay' => true,
// 'open_mqtt_protocol' => true,
//'task_worker_num' => 1,
//'user' => 'www-data',
//'group' => 'www-data',
//'daemonize' => true,
// 'ssl_cert_file' => $key_dir.'/ssl.crt',
// 'ssl_key_file' => $key_dir.'/ssl.key',
// 'enable_static_handler' => true,
// 'document_root' => '/home/htf/workspace/php/www.swoole.com/web/'
]);
//监听连接进入事件
$serv->on('connect', function ($serv, $fd) {
echo "Client: Connect.\n";
});
//监听数据接收事件
$serv->on('receive', function ($serv, $fd, $from_id, $data) {
$redis = new redis();
$redis->connect('127.0.0.1', 6379);
$redis->select(1);
//只扔数据到redis 队列处理到数据库
$result = $redis->zAdd('queue',microtime(true),$data);
$serv->send($fd, "queue success");
});
//监听连接关闭事件
$serv->on('close', function ($serv, $fd) {
echo "Client: Close.\n";
});
//启动服务器
$serv->start();
?>
client 客户端
<?php
$cli = new swoole_client(SWOOLE_SOCK_TCP);
$cli->connect('127.0.0.1', 9510);
while(true){
$_sendStr=rand(0,10000)."log";
$cli->send($_sendStr);
sleep(1);
}
$data = $cli->recv();
echo $data;
queue 队列消费
<?php
$redis = new Redis();
$redis->connect('localhost',6379);
$redis->select(1);
swoole_timer_tick(3000,function ()use($redis){
$result = $redis->zRange('queue',0,-1);
if($result){
foreach ($result as $res){
$redis->zRem("queue", $res);
echo "队列中的".$res."处理完毕:".date("Y-m-d H:i:s")."\n";
}
}else{
echo "目前没有任务:".date("Y-m-d H:i:s")."\n";
}
});
|