1.前端websocket开启方法
openSocket() {
if (typeof WebSocket == "undefined") {
console.log("您的浏览器不支持WebSocket");
} else {
var socketUrl = `http://localhost:8080/ws/` + this.user.id;
socketUrl = socketUrl.replace("https","wss")
.replace("http","ws");
if (this.socket != null) {
this.socket.close();
this.socket = null;
}
this.socket = new WebSocket(socketUrl);
this.socket.onopen = function () {
};
const _this = this
this.socket.onmessage = function (msg) {
console.log(msg.data);
_this.todoSomeThing(_this.user.id)
};
this.socket.onclose = function (e) {
};
this.socket.onerror = function (e) {
console.log('websocket发生了错误,错误: ')
console.log(e)
};
}
}
2.后台()
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
配置类WebSocketConfig.java
package com.test.websocket.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
服务类WebSocketServer.java
package com.test.websocket.service;
import com.test.common.copycat.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
@ServerEndpoint("/ws/{userId}")
@Component
@Slf4j
public class WebSocketServer {
private static int onlineCount = 0;
private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
private Session session;
private String userId="";
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
if(webSocketMap.containsKey(userId)){
webSocketMap.remove(userId);
webSocketMap.put(userId,this);
}else{
webSocketMap.put(userId,this);
addOnlineCount();
}
log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());
}
@OnClose
public void onClose() {
if(webSocketMap.containsKey(userId)){
webSocketMap.remove(userId);
subOnlineCount();
}
log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("用户消息:"+userId+",报文:"+message);
if(StringUtils.isNotBlank(message)){
try {
JSONObject jsonObject = JSON.parseObject(message);
jsonObject.put("fromUserId",this.userId);
String toUserId=jsonObject.getString("toUserId");
if(StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)){
webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
}else{
log.error("请求的userId:"+toUserId+"不在该服务器上");
}
}catch (Exception e){
e.printStackTrace();
}
}
}
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
error.printStackTrace();
}
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
public static void sendInfo(String message, @PathParam("userId") Long userId) throws IOException {
log.info("发送消息到:"+userId+",报文:"+message);
if(userId!= null &&webSocketMap.containsKey(userId+"")){
webSocketMap.get(userId+"").sendMessage(message);
}else{
log.error("用户"+userId+",不在线!");
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
其他业务类调用:
try {
WebSocketServer.sendInfo("发送测试信息", param.getUserId());
} catch (IOException e) {
e.printStackTrace();
}
|