首先我是用的tp5,安装扩展的时候发现tp5的框架只支持think-worker:1.0版本,如果不知道下载哪个版本的话就直接 后面跟上*号得了 会自己下载框架支持的版本。
扩展安装命令:composer require topthink/think-worker:*; 冒号后面可自定义版本。
然后在框架应用目录下新建一个push目录,下面创建一个controller文件夹,在下面创建一个Worker.php内容如下:
<?php
namespace app\push\controller;
use think\worker\Server;
use Workerman\Lib\Timer;
use think\Db;
class Worker extends Server{
protected $socket = 'http://0.0.0.0:2346';
protected static $heartbeat_time=55;
/**
* 收到信息
* @param $connection
* @param $data
*/
public function onMessage($connection, $data)
{
if($data=="ping"&&$data==0){
}else{
//接收的参数
}
$connection->send("pong");
$connection->lastMessageTime=time();
}
/**
* 每个进程启动
* @param $worker
*/
public function onWorkerStart($worker){
//查看是否有新的充值或提现订单,有就推送给所有用户
Timer::add(3, function()use($worker){
$time_now=time();
$hasNewDepositOrder = Db::name('testaaa')->where('is_push',0)->order('id desc')->count('id');
$system_listener = Db::name('testaaa')->cache(true)->order('id desc')->select();
if($hasNewDepositOrder){
$depositOrderInfo = Db::name('testaaa')->where('is_push',0)->order('id desc')->find();
$data = ['creatTime'=>$depositOrderInfo['create_time'],'name'=>$depositOrderInfo['name']];
foreach($worker->connections as $connection) {
if(empty($connection->lastMessageTime)){
$connection->lastMessageTime = $time_now;
}
if($time_now-$connection->lastMessageTime > self::$heartbeat_time){
$connection->close();
}
$connection->send(json_encode($data));
}
Db::name('testaaa')->where('id',$depositOrderInfo['id'])->update(['is_push'=>1]);
}else{
foreach($worker->connections as $connection) {
if(empty($connection->lastMessageTime)){
$connection->lastMessageTime = $time_now;
continue;
}
if($time_now-$connection->lastMessageTime > self::$heartbeat_time){ //连接超时
$connection->close();
}
}
}
});
}
}
然后还需要在框架应用目录下创建一个server.php文件内容如下:
<?php
define('APP_PATH', __DIR__ . '/../application/');
define('BIND_MODULE','push/Worker');
// 加载框架引导文件
require __DIR__ . '/../thinkphp/start.php';
好了到这里就可以用命令来测试是否搭建成功了。
php server.php start 来开启;stop关闭;restart重启;
启动成功后就可以创建一个html来测试连接;内容如下:
<!DOCTYPE html>
<html lang="en">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>Title</title>
</head>
<body>
<script>
var lockReconnect = false; //避免ws重复连接
var ws = null; // 判断当前浏览器是否支持WebSocket
var wsUrl = "ws:服务器ip:端口";
createWebSocket(wsUrl); //连接ws
function createWebSocket(url) {
try {
if ('WebSocket' in window) {
ws = new WebSocket(url);
}
initEventHandle();
} catch (e) {
reconnect(url);
console.log(e);
}
}
function initEventHandle() {
ws.onclose = function() {
reconnect(wsUrl);
console.log("llws连接关闭!" + new Date().toLocaleString());
};
ws.onerror = function() {
reconnect(wsUrl);
console.log("llws连接错误!");
};
ws.onopen = function() {
heartCheck.reset().start(); //心跳检测重置
console.log("llws连接成功!" + new Date().toLocaleString());
};
ws.onmessage = function(event) { //如果获取到消息,心跳检测重置
heartCheck.reset().start(); //拿到任何消息都说明当前连接是正常的
// console.log(event);
if (event.data != 'pong') {
let data = JSON.parse(event.data);
console.log(data);
}
};
}
// 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function() {
ws.close();
}
function reconnect(url) {
if (lockReconnect) return;
lockReconnect = true;
setTimeout(function() { //没连接上会一直重连,设置延迟避免请求过多
createWebSocket(url);
lockReconnect = false;
}, 2000);
}
//心跳检测
var heartCheck = {
timeout: 5000, //5秒发一次心跳
timeoutObj: null,
serverTimeoutObj: null,
reset: function() {
clearTimeout(this.timeoutObj);
clearTimeout(this.serverTimeoutObj);
return this;
},
start: function() {
var self = this;
this.timeoutObj = setTimeout(function() {
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
ws.send("ping");
// console.log("ping!")
self.serverTimeoutObj = setTimeout(function() { //如果超过一定时间还没重置,说明后端主动断开了
ws
.close(); //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
}, self.timeout)
}, this.timeout)
}
}
</script>
</body>
</html>
以上就可以进行连接通讯啦!!!
|