一 什么是WebSocket
1.1 首先分清几个概念
1.2 混淆点
WebSocket:基于TCP的,运行在应用层,替代http的一个协议。 网上说的WebSocket只有一次握手,指的是:客户端发送一个http请求到服务器,服务器响应后标志这个连接建立起来。而不是指TCP的三次握手。
1.3 优点
- 节约宽带。轮询服务端数据的方式,使用的是http协议,head的信息很大,有效数据占比低,而使用WebSocket,头信息很小,有效数据占比高。
- 无浪费。轮询方法可能轮询10次,才可能碰到服务端数据更新,那么前9次数据都浪费了。而WebSocket是由服务器主动发回,来的都是新数据。
- 实时性。当服务器完成协议升级后(HTTP->Websocket),服务端可以主动向客户端推送信息,省去了客户端发起请求的步骤,同时没有间隔时间,只要服务端内容有变化,就可以告知客户端。实时性大大提高。
二 Socket和WebSocket的区别
- 本质上:
Socket本身不是一个协议,而是一个调用接口(API),它工作在OSI模型中的会话层(第5层),是对TCP/IP协议的封装。websocket运行在应用层,是http升级的一个协议。 - 连接:
Socket连接需要一对套接字,一个运行于客户端,另一个运行于服务端。连接分为三个步骤:服务器监听,客户端请求,连接确认。 websocket在客户端,发送一个http请求到服务器,当服务器响应后,完成协议升级,连接建立。
三 WebSocket服务端搭建
3.1 导入jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
3.2 搭建websocket服务
package com.wyq.websocket.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
- WebSocketServer
用于接收客户端的webSocket请求,处理主要逻辑。代码如下: @ServerEndpoint注解中写上客户端连接的地址。
package io.freeyou.socket.web.server;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.freeyou.modules.sdwan.entity.WebsocketVo;
import io.freeyou.modules.sdwan.vo.SysUserVo;
import io.freeyou.modules.sys.service.SysUserTokenService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
@Component
@ServerEndpoint("/webSocket/{token}")
@Slf4j
@Getter
public class WebSocket {
private static Map<String, CopyOnWriteArraySet<WebSocket>> userSocketMap = new ConcurrentHashMap<>();
private String userId;
private Session session;
public static SysUserTokenService sysUserTokenService;
@OnOpen
public void onOpen(Session session, @PathParam(value = "token") String token) {
SysUserVo tokenEntity = sysUserTokenService.selectByToken(token);
if (tokenEntity == null) {
return;
}
this.userId = tokenEntity.getUserId();
session.getUserProperties().put( "org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT", 1000L);
this.session = session;
if (!exitUser(userId)) {
initUserInfo(userId);
} else {
CopyOnWriteArraySet<WebSocket> webSocketTestSet = getUserSocketSet(userId);
webSocketTestSet.add(this);
}
}
@OnClose
public void onClose(Session session) {
if (StringUtils.isBlank(userId)) {
return;
}
CopyOnWriteArraySet<WebSocket> webSocketTestSet = userSocketMap.get(userId);
if (CollectionUtils.isEmpty(webSocketTestSet)) {
return;
}
webSocketTestSet.remove(this);
}
@OnMessage
public void onMessage(String message,Session session) {
if(StringUtils.isNotBlank(message) && "ping".equals(message)){
try {
JSONObject jsonObject = new JSONObject();
jsonObject.put("data", "pong");
WebsocketVo websocketVo = new WebsocketVo();
websocketVo.setData(JSON.toJSONString(jsonObject));
session.getBasicRemote().sendText(JSON.toJSONString(websocketVo));
} catch (IOException e) {
log.warn("【WebSocket推送】下发响应心跳包出错", e);
}
}
}
@OnError
public void onError(Session session, Throwable error){
try {
session.close();
} catch (IOException e) {
log.debug("session close error,sessionId:{},error:{}",userId+":"+session.getId(),e);
}
}
public void sendMessage(String message) {
Set<String> userList = userSocketMap.keySet();
if (CollectionUtils.isEmpty(userList)) {
return;
}
for (String userId : userList) {
sendMessage(userId, message);
}
}
private void sendMessage(String userId, String message) {
CopyOnWriteArraySet<WebSocket> webSocket = userSocketMap.get(userId);
if (CollectionUtils.isEmpty(webSocket)) {
return;
}
for (WebSocket item : webSocket) {
if (item.getSession() == null || !item.getSession().isOpen()) {
continue;
}
synchronized (item.getSession()) {
if (item.getSession().isOpen()) {
try {
item.getSession().getBasicRemote().sendText(message);
} catch (Exception e) {
webSocket.remove(item);
}
}
}
}
}
public synchronized void sendMessageByUserId(List<String> userIds, String message) {
List<String> userList = userIds.stream().filter(userSocketMap.keySet()::contains).collect(Collectors.toList());
if(CollectionUtils.isEmpty(userList)){
return;
}
for (String userId : userList) {
sendMessage(userId, message);
}
}
public boolean exitUser(String userId) {
return userSocketMap.containsKey(userId);
}
public CopyOnWriteArraySet<WebSocket> getUserSocketSet(String userId) {
return userSocketMap.get(userId);
}
private void initUserInfo(String userId) {
CopyOnWriteArraySet<WebSocket> webSocketTestSet = new CopyOnWriteArraySet<>();
webSocketTestSet.add(this);
userSocketMap.put(userId, webSocketTestSet);
}
}
|