Websocket 与 SpringBoot 集成使用
1.依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<exclusions>
<!--排除 websocket 集成中带有的 tomcat-->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
2. Websocket 配置
@Configuration
@EnableWebSocket
public class SocketConfig implements WebSocketConfigurer {
@Autowired
private SocketAuthHandler socketAuthHandler;
@Autowired
private SocketInterceptor socketInterceptor;
private static final String WEB_SOCKET_PATH = "websocket";
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
webSocketHandlerRegistry
.addHandler(socketAuthHandler, WEB_SOCKET_PATH)
.addInterceptors(socketInterceptor)
.setAllowedOrigins("*");
}
}
3. SocketSessionManager SocketSession 会话管理类
import com.google.common.cache.Cache;
@Slf4j
public class SocketSessionManager {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final static Cache<String, WebSocketSession> WEB_SOCKET_SESSION_MAP = CacheBuilder.newBuilder()
.maximumSize(10240)
.expireAfterAccess(3, TimeUnit.MINUTES)
.build();
public static void add(String key, WebSocketSession session) {
WEB_SOCKET_SESSION_MAP.put(key, session);
}
public static WebSocketSession remove(String key) {
WebSocketSession session = WEB_SOCKET_SESSION_MAP.getIfPresent(key);
WEB_SOCKET_SESSION_MAP.invalidate(key);
return session;
}
public static void removeAndClose(String key) {
WebSocketSession session = remove(key);
if (session != null) {
try {
session.close();
} catch (IOException e) {
log.error("Websocket session close exception ", e);
}
}
}
public static WebSocketSession get(String key) {
return WEB_SOCKET_SESSION_MAP.getIfPresent(key);
}
public static <T> void sendMessages(String key, T data) {
String sendData = "";
if (data != null) {
try {
sendData = OBJECT_MAPPER.writeValueAsString(data);
} catch (JsonProcessingException e) {
log.error("json序列化异常,{}", e.getMessage());
return;
}
}
WebSocketSession session = WEB_SOCKET_SESSION_MAP.getIfPresent(key);
try {
if (session != null && session.isOpen()) {
session.sendMessage(new TextMessage(sendData));
}
} catch (IOException e) {
log.error("消息发送异常,{}", e.getMessage());
}
}
}
4. SocketInterceptor 握手拦截器
@Slf4j
@Component
public class SocketInterceptor extends HttpSessionHandshakeInterceptor {
private static final String TOKEN_FIELD = "token";
@Autowired
StringRedisTemplate redisTemplate;
@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler,
Map<String, Object> map) throws Exception {
String token = serverHttpRequest.getHeaders().getFirst(TOKEN_FIELD);
String authorization = HttpUtil.decodeParamMap(serverHttpRequest.getURI().getQuery(), Charsets.UTF_8).get(TOKEN_FIELD);
String auth = StrUtil.blankToDefault(authorization, token);
log.debug("websocket 开始握手,token:{}",auth);
xxxxxxxxxxxxxxxxxxxx
return true;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest,
ServerHttpResponse serverHttpResponse,
WebSocketHandler webSocketHandler,
Exception e) {
log.debug("握手完成!");
}
}
5. SocketAuthHandler 业务逻辑处理类
@Slf4j
@Component
public class SocketAuthHandler extends TextWebSocketHandler {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
UserDto moblieUser = (UserDto) session.getAttributes().get(SystemTool.MOBLIE_USER);
}
@Autowired
private ApplicationContext applicationContext;
@Override
protected void handleTextMessage(WebSocketSession session,
TextMessage message) throws Exception {
UserDto pcUser = (UserDto) session.getAttributes().get(SystemTool.PC_USER);
if (pcUser != null && "ping".equalsIgnoreCase(message.getPayload())) {
SocketSessionManager.get(SystemTool.PC_USER + pcUser.getId() + "_" + pcUser.getTenantId());
session.sendMessage(new TextMessage("pong"));
return;
}
ExchangeData exchangeData;
try {
exchangeData = JSONObject.parseObject(message.getPayload(), ExchangeData.class);
} catch (Exception e) {
log.error("序列化失败:{}", e.getMessage());
return;
}
xxxxxxxxxxxxxxxxxxxxxxx
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
UserDto pcUser = (UserDto) session.getAttributes().get(SystemTool.PC_USER);
if (Objects.nonNull(pcUser)) {
log.debug("pc用户 [{}] 断开连接", pcUser.getId() + "_" + pcUser.getTenantId());
SocketSessionManager.removeAndClose(SystemTool.MOBLIE_USER + pcUser.getId() + "_" + pcUser.getTenantId());
}
}
@Override
public void handleTransportError(WebSocketSession session,
Throwable exception) throws Exception {
log.error("socket session{}", session);
}
}
6. 前端开发
var ws = new WebSocket('ws://' + location.host + '/chat');
ws.addEventListener('open', function (event) {
console.log('websocket connected.');
});
ws.addEventListener('message', function (event) {
console.log('message: ' + event.data);
var msgs = JSON.parse(event.data);
});
ws.addEventListener('close', function () {
console.log('websocket closed.');
});
window.chatWs = ws;
var inputText = 'Hello, WebSocket.';
window.chatWs.send(JSON.stringify({text: inputText}));
|