引入pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
代码
@Component
@Service
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//接收sid
private Long userId = null;
private static MessageService messageService;
//webSocket中无法使用Autowired直接注入,需要通过方法注入的方式注入。
@Autowired
public void setChatService(MessageService messageService) {
WebSocketServer.messageService= messageService;
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") Long userId) {
this.session = session;
webSocketSet.add(this); //加入set中
this.userId = userId;
addOnlineCount(); //在线数加1
try {
//通过userId查询是否该用户有待办的消息。然后调用发送消息方法发送给前端。
//sendMessage("conn_success");
List<MessageEntity> list = messageService.getUnreadByUserId(userId);
String message = "";
if(list!=null&&list.size()>0){
message = JSONObject.toJSONString(list);
}
sendInfo(message,userId);
log.info("有新窗口开始监听:" + userId + ",当前在线人数为:" + getOnlineCount());
} catch (IOException e) {
log.error("websocket IO Exception");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
//断开连接情况下,更新主板占用情况为释放
log.info("释放的sid为:"+userId);
//这里写你 释放的时候,要处理的业务
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
* @ Param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自窗口" + userId + "的信息:" + message);
//群发消息
for (WebSocketServer item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @ Param session
* @ Param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 群发自定义消息
*/
public static void sendInfo(String message, @PathParam("userId") Long userId) throws IOException {
log.info("推送消息到窗口" + userId + ",推送内容:" + message);
for (WebSocketServer item : webSocketSet) {
System.out.println(item.userId);
System.out.println(webSocketSet.size());
try {
//这里可以设定只推送给这个sid的,为null则全部推送
if (userId == null) {
} else if (item.userId.equals(userId)) {
item.sendMessage(message);
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
public static CopyOnWriteArraySet<WebSocketServer> getWebSocketSet() {
return webSocketSet;
}
}
测试
@Controller("web_Scoket_system")
@RequestMapping("/api/socket")
public class TestWebSocketController {
//页面请求
@GetMapping("/index/{userId}")
public ModelAndView socket(@PathVariable String userId) {
ModelAndView mav = new ModelAndView("/socket1");
mav.addObject("userId", userId);
return mav;
}
//推送数据接口
@ResponseBody
@RequestMapping("/socket/push/{userId}")
public Map pushToWeb(@PathVariable Long userId, String message) {
Map<String,Object> result = new HashMap<>();
try {
WebSocketServer.sendInfo(message, userId);
result.put("code", userId);
result.put("msg", message);
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
}
|