代码
package com.ruoyi.utils.websocket;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.core.domain.model.LoginUser;
import com.ruoyi.system.service.ISysUserService;
import com.ruoyi.utils.domain.MessageLog;
import com.ruoyi.utils.service.IMessageLogService;
import com.ruoyi.utils.service.IMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.utils.domain.Message;
import com.ruoyi.framework.web.service.TokenService;
@Controller
@ServerEndpoint(value = "/websocket")
public class MyWebSocketController {
private Session session;
private static ConcurrentMap<String, Map<String, List<Object>>> messageMap = new ConcurrentHashMap<>();
private String sender;
private static IMessageService messageService;
private static TokenService tokenService;
private static IMessageLogService messageLogService;
private static ISysUserService userService;
@Autowired
public void setIMessageService(IMessageService messageService){
MyWebSocketController.messageService = messageService;
}
@Autowired
public void setISysUserService(ISysUserService userService){
MyWebSocketController.userService = userService;
}
@Autowired
public void setTokenService(TokenService tokenService){
MyWebSocketController.tokenService = tokenService;
}
@Autowired
public void setMessageLogService(IMessageLogService messageLogService){
MyWebSocketController.messageLogService = messageLogService;
}
String key = "";
@OnOpen
public void onOpen(Session session) throws Exception {
this.session = session;
String token ;
String keString = session.getQueryString();
if(!StringUtils.isEmpty(keString)) {
token = keString;
LoginUser loginUser = tokenService.getLoginUser(token);
if(loginUser == null){
sendMessage(AjaxResult.error("登录已失效"),session);
return;
}
key = loginUser.getUsername();
Map<String, Session> stringSessionMap = WebSocketMapUtil.get(token);
if(stringSessionMap == null){
stringSessionMap = new HashMap<>();
}
stringSessionMap.put(token,session);
WebSocketMapUtil.put(key, stringSessionMap);
}
dbOfflineMsgSend(key);
}
public void dbOfflineMsgSend(String key){
Message message = new Message();
message.setReceive(key);
List<Message> offlineMsg = messageService.selectOfflineMsg(message);
if (offlineMsg != null){
offlineMsg.forEach(msgobj -> {
String receive = msgobj.getReceive();
String sender = msgobj.getSender();
String msg = msgobj.getContent();
try {
onMessage(msg, receive, sender,true);
} catch (IOException e) {
e.printStackTrace();
}
});
if(offlineMsg.size() > 0){
messageService.deleteOfflineMessageByIds(
offlineMsg
.stream()
.map(Message::getId)
.collect(Collectors.toList()).toArray(new Long[]{}));
}
}else {
System.out.println("沒有离线消息");
}
}
@OnMessage
public void onMessage(String message) {
List<String> userList = new ArrayList<>();
userList.add("ry");
userList.add("cg");
String userId=message.substring(message.lastIndexOf(";")+1,message.lastIndexOf(","));
System.out.println("发给:"+userId);
String sendUserId=message.substring(message.lastIndexOf("[")+1,message.lastIndexOf(";"));
System.err.println("发消息的用户:"+sendUserId);
this.sender = sendUserId;
message=message.substring(0, message.lastIndexOf("["));
System.err.println("客户端发来的信息:"+message);
if (userList.size() > 1){
System.out.println("群发");
sendMessageAll(message,userList);
}else {
System.out.println("单发");
try {
onMessage(message,userList.get(0),sendUserId, true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void onMessage(String message,String userId,String sendUserId,boolean flag) throws IOException {
Map<String, Session> sessionMap = WebSocketMapUtil.get(userId);
if(!StringUtils.isEmpty(sessionMap)){
System.out.println("在线");
System.out.println("当前用户登录在线数 = " + sessionMap.size());
JSONObject jsonObject = new JSONObject();
jsonObject.put("message",message);
jsonObject.put("sender",sendUserId);
jsonObject.put("user",userId);
for (Map.Entry<String, Session> entry : sessionMap.entrySet()) {
sendMessage(AjaxResult.success(jsonObject),entry.getValue());
if(flag)
insertMsg(message,Arrays.asList(userId),sendUserId);
}
}else {
System.out.println("不在线");
insertDbOfflineMessage(message,sendUserId,userId);
}
}
public void insertDbOfflineMessage(String message,String sendUserId,String userId){
Message msg = new Message();
msg.setContent(message);
msg.setSender(sendUserId);
msg.setReceive(userId);
messageService.insertOfflineMsg(msg);
}
@OnError
public void onError(Session session, Throwable error){
System.out.println("错误");
error.printStackTrace();
}
@OnClose
public void onClose() {
String queryString = session.getQueryString();
if (!StringUtils.isEmpty(queryString)){
String token = queryString;
System.out.println("关闭");
Map<String, Session> stringSessionMap = WebSocketMapUtil.get(key);
System.out.println("stringSessionMap = " + stringSessionMap);
stringSessionMap.remove(token);
WebSocketMapUtil.put(key, stringSessionMap);
}
}
public void sendMessage(AjaxResult ajaxResult,Session session) {
try {
System.out.println("session.isOpen = " + session.isOpen());
if(session.isOpen()){
session.getBasicRemote().sendText(ajaxResult.toString());
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void insertMsg(String msg,List<String> receives,String sender){
Message message = new Message();
message.setContent(msg);
message.setPhshType("2");
message.setReferState(0l);
message.setType(1l);
message.setSender(sender);
message.setReceive(String.join(",",receives));
messageService.insertMessage(message);
for (String receive : receives) {
Map<String, Session> sessionMap = WebSocketMapUtil.get(receive);
if(!StringUtils.isEmpty(sessionMap)){
MessageLog messageLog = new MessageLog();
messageLog.setReceive(receive);
messageLog.setSender(sender);
messageLog.setReferState(0l);
messageLog.setmId(message.getId());
messageLogService.insertMessageLog(messageLog);
}
}
}
public void sendMessageAll(String message,List<String> users) {
try {
for (String user : users) {
onMessage(message,user,this.sender,false);
}
insertMsg(message,users,this.sender);
}catch (IOException e) {
e.printStackTrace();
}
}
public int getMessageCount(String userId,String objectUserId) {
Map<String, List<Object>> listMap=messageMap.get(userId);
if(listMap != null) {
List<Object> list=listMap.get(objectUserId);
if(list!=null) {
return listMap.get(objectUserId).size();
}else {
return 0;
}
}else {
return 0;
}
}
}
|