前端
const BASE_URL = process.env.VUE_APP_REQUEST_BASE_URL
export default {
data() {
return {
socket: null
}
},
methods: {
openSocket() {
let base_url = BASE_URL.includes("http") ? BASE_URL : location.protocol + location.host + "/lop/";
let userid = this.$store.state.user.userInfo.id;
let socketUrl = base_url + "socket/" + userid;
socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
const socket = new WebSocket(socketUrl);
this.socket = socket;
socket.onopen = this.socketOpenCallback;
socket.onmessage = this.socketMessageCallback;
socket.onclose = this.socketCloseCallback;
socket.onerror = this.socketErrorCallback;
},
closeSocket() {
if (this.socket !== null) {
this.socket.close();
}
},
socketOpenCallback() {
console.log("socket连接成功");
},
socketMessageCallback(res) {
console.log(res);
},
socketCloseCallback() {
this.socket = null;
console.log("socket连接关闭");
},
socketErrorCallback() {
this.socket = null;
console.log("socket连接错误");
}
},
destoryed() {
this.closeSocket();
}
}
后端
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
@Service
@ServerEndpoint(value = "/socket/{userId}")
@Slf4j
public class SocketService {
private static AtomicInteger onlineClient = new AtomicInteger(0);
private static ConcurrentHashMap<String, Session> webSocketMap = new ConcurrentHashMap<>();
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
if (webSocketMap.containsKey(userId)) {
Session ts = webSocketMap.get(userId);
if (ts != null) {
closeSession(ts);
}
webSocketMap.remove(userId);
} else {
onlineClient.incrementAndGet();
}
webSocketMap.put(userId, session);
log.info("有新连接加入:{},当前在线人数为:{}", userId, onlineClient.get());
}
private void closeSession(Session session) {
if (session != null) {
try {
session.close();
session = null;
} catch (IOException e) {
e.printStackTrace();
}
}
}
@OnClose
public void onClose(Session session) {
String userId = findUser(session);
if (StringUtils.isNotBlank(userId)) {
Session ts = webSocketMap.get(userId);
webSocketMap.remove(userId);
if (ts != null) {
closeSession(ts);
}
}
}
private String findUser(Session session) {
Iterator it = webSocketMap.entrySet().iterator();
String userId = null;
while (it.hasNext()) {
Map.Entry<String, Session> entry = (Map.Entry<String, Session>) it.next();
if (entry.getValue() != null && entry.getValue() == session) {
userId = entry.getKey();
break;
}
}
return userId;
}
@OnMessage
public void onMessage(String message, Session session) {
String fromUserId = findUser(session);
if (StringUtils.isBlank(fromUserId)) {
log.error("can not find user");
return;
}
if (message != null && !"".equals(message)) {
try {
JSONObject jsonObject = JSON.parseObject(message);
jsonObject.put("fromUserId", fromUserId);
String toUserId = jsonObject.getString("toUserId");
if (toUserId != null && !"".equals(toUserId) && webSocketMap.containsKey(toUserId)) {
webSocketMap.get(toUserId).getBasicRemote().sendText(jsonObject.toJSONString());
} else {
log.error("请求的userId:" + toUserId + "不在该服务器上");
}
} catch (Exception e) {
}
}
}
public void sendMsgByUserId(String message, @PathParam("userId") String userId) throws IOException {
if (userId != null && !"".equals(userId) && webSocketMap.containsKey(userId)) {
Session session = webSocketMap.get(userId);
if (session != null) {
session.getBasicRemote().sendText(message);
}
}
}
public void sendMsgToAllUser(String message) throws IOException {
for(String k : webSocketMap.keySet()) {
Session s = webSocketMap.get(k);
s.getBasicRemote().sendText(message);
}
}
}
|