前端
```html
//这是去获取未读消息的条数(这个函数是自定义的)
function getNotifyInfo() {
$.ajax({
cache: true,
type: "get",
url: ctx + "tbs/notice/unreadMessage",
async: false,
success: function (result) {
if (result.code == 0) {
if(result.data > 0){
//设置在小铃铛上面
$("#noticeNum").text(result.data);
$("#noticeNum").show();
$("#noticeDetail").text([[#{tbs.meaasge.notice.unread}]].format(result.data));
}else{
$("#noticeNum").text("");
$("#noticeNum").hide();
$("#noticeDetail").text([[#{tbs.meaasge.notice.read}]]);
}
}
},
error: function (error) {
}
});
}
var websocket;
//避免重复连接
var lockReconnect = false, tt;
/**
* websocket启动
*/
function createWebSocket() {
try {
var userId = $("#userId").val();
var url = $("#url").val() + "/websocket/message/" + userId;
if ('WebSocket' in window) {
websocket = new WebSocket(url);
init();
} else if ('MozWebSocket' in window) {
websocket = new MozWebSocket(url);
init();
} else {
//websocket = new SockJS(url);
}
} catch (e) {
console.log('catch' + e);
reconnect();
}
}
function init() {
//连接成功建立的回调方法
websocket.onopen = function (event) {
//console.log("WebSocket:onopen");
//心跳检测重置
heartCheck.reset().start();
};
//接收到消息的回调方法
websocket.onmessage = function (event) {
//console.log("WebSocket:onmessage,", event.data);
heartCheck.reset().start();
if(event.data != "ok"){
getNotifyInfo();
}
};
//连接发生错误的回调方法
websocket.onerror = function (event) {
//console.log("WebSocket:error");
reconnect();
};
//连接关闭的回调方法
websocket.onclose = function (event) {
//console.log("WebSocket:closed");
heartCheck.reset();//心跳检测
reconnect();
};
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function () {
websocket.close();
};
//关闭连接
function closeWebSocket() {
websocket.close();
}
//发送消息
function send(message) {
websocket.send(message);
}
}
/**
* websocket重连
*/
function reconnect() {
if (lockReconnect) {
return;
}
lockReconnect = true;
tt && clearTimeout(tt);
tt = setTimeout(function () {
//console.log('reconnect...');
lockReconnect = false;
createWebSocket();
}, 10000);
}
/**
* websocket心跳检测
*/
var heartCheck = {
timeout: 60000,
timeoutObj: null,
serverTimeoutObj: null,
reset: function () {
clearTimeout(this.timeoutObj);
clearTimeout(this.serverTimeoutObj);
return this;
},
start: function () {
var self = this;
this.timeoutObj && clearTimeout(this.timeoutObj);
this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
this.timeoutObj = setTimeout(function () {
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
websocket.send("ping");
//console.log('ping');
self.serverTimeoutObj = setTimeout(function () { // 如果超过一定时间还没重置,说明后端主动断开了
websocket.close();//如果onclose会执行reconnect,我们执行 websocket.close()就行了.如果直接执行 reconnect 会触发onclose导致重连两次
}, self.timeout)
}, this.timeout)
}
};
java
controller
import java.util.concurrent.Semaphore;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import com.dcdzsoft.tbs.utils.SemaphoreUtils;
import com.dcdzsoft.tbs.utils.WebSocketUsers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@ServerEndpoint("/websocket/message/{uid}")
public class WebSocketServer {
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
public static int socketMaxOnlineCount = 100000;
private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);
@OnOpen
public void onOpen(Session session, @PathParam("uid") String uid) throws Exception {
boolean semaphoreFlag = false;
semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
if (!semaphoreFlag) {
WebSocketUsers.sendMessageToUserByText(session, "Online Limit:" + socketMaxOnlineCount);
session.close();
} else {
WebSocketUsers.put(uid, session);
LOGGER.info("\n online number - {}", WebSocketUsers.getUsers().size());
WebSocketUsers.sendMessageToUserByText(session, "success");
}
}
@OnClose
public void onClose(Session session) {
LOGGER.info("\n close connect - {}", session);
WebSocketUsers.remove(session);
SemaphoreUtils.release(socketSemaphore);
}
@OnError
public void onError(Session session, Throwable exception) throws Exception {
if (session.isOpen()) {
session.close();
}
LOGGER.info("\n connect error - {}", exception);
WebSocketUsers.remove(session);
SemaphoreUtils.release(socketSemaphore);
}
@OnMessage
public void onMessage(String message, Session session) {
WebSocketUsers.sendMessageToUserByText(session, "ok");
}
}
WebSocketConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
WebSocketUsers
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.Session;
import com.dcdzsoft.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebSocketUsers {
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class);
private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();
public static void put(String key, Session session) {
USERS.put(key, session);
}
public static boolean remove(Session session) {
String key = null;
boolean flag = USERS.containsValue(session);
if (flag) {
Set<Map.Entry<String, Session>> entries = USERS.entrySet();
for (Map.Entry<String, Session> entry : entries) {
Session value = entry.getValue();
if (value.equals(session)) {
key = entry.getKey();
break;
}
}
} else {
return true;
}
return remove(key);
}
public static boolean remove(String key) {
Session remove = USERS.remove(key);
if (remove != null) {
boolean containsValue = USERS.containsValue(remove);
return containsValue;
} else {
return true;
}
}
public static Map<String, Session> getUsers() {
return USERS;
}
public static Session getUserSession(String uid) {
return USERS.get(uid);
}
public static void sendMessageToUsersByText(String message) {
Collection<Session> values = USERS.values();
for (Session value : values) {
sendMessageToUserByText(value, message);
}
}
public static void sendMessageToUserByText(Session session, String message) {
if (session != null) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
LOGGER.error("\n[msg error]", e);
}
} else {
LOGGER.info("\n[already offline]");
}
}
public static void sendMessageToUserByText(String uid, String message) {
try {
Session session = getUserSession(uid);
if(session == null){
LOGGER.info("\n[already offline]");
}else{
session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
LOGGER.error("\n[msg error]", e);
}
}
}
SemaphoreUtils
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SemaphoreUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class);
public static boolean tryAcquire(Semaphore semaphore) {
boolean flag = false;
try {
flag = semaphore.tryAcquire();
} catch (Exception e) {
LOGGER.error("获取信号量异常", e);
}
return flag;
}
public static void release(Semaphore semaphore) {
try {
semaphore.release();
} catch (Exception e) {
LOGGER.error("释放信号量异常", e);
}
}
}
哪里需要,哪里调 WebSocketUsers.sendMessageToUserByText(userId.toString(), “internalMessage”);这个方法来告诉前端,你执行一下获取数量的函数
|