spring-boot&cloud Websocket 使用
1 WebSocket介绍: WebSocket是HTML5新增的协议,它的目的是在浏览器和服务器之间建立一个不受限的双向通信的通道,比如说,服务器可以在任意时刻发送消息给浏览器。 为什么传统的HTTP协议不能做到WebSocket实现的功能?这是因为HTTP协议是一个请求-响应协议,请求必须先由浏览器发给服务器,服务器才能响应这个请求,再把数据发送给浏览器。换句话说,浏览器不主动请求,服务器是没法主动发数据给浏览器的。
2 websocket springboot 项目实现: 2.1 引入websocket jar包和增加配置文件: 2.1.1 jar 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.1.1 WebSocketConfig.java 配置文件
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
@EnableWebSocket
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
2.2 定义websocket 连接点:
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
@ServerEndpoint("/websocket/wxkf/activity")
public class FlushWebSocket {
private static AtomicInteger onlineCount = new AtomicInteger(0);
private volatile static ConcurrentHashMap<String, Session> webSocketSet = new ConcurrentHashMap<>();
public volatile static ConcurrentHashMap<String, Session> userWebSocketSet = new ConcurrentHashMap<>();
@OnOpen
public void OnOpen(Session session) {
onlineCount.incrementAndGet();
webSocketSet.put(session.getId(),session);
log.info("[WebSocket] 连接成功,当前连接数为:={}", webSocketSet.size());
}
@OnClose
public void OnClose(Session session) {
onlineCount.decrementAndGet();
webSocketSet.remove(session.getId());
log.info("[WebSocket] 退出成功,当前连接数为:={}", webSocketSet.size());
}
@OnMessage
public void onMessage(String message, Session session) {
log.debug("来自客户端的消息:{}" + message);
WebSocketReqDto activityParam = null;
try {
activityParam = JSONObject.parseObject(message, WebSocketReqDto.class);
} catch (Exception e) {
e.printStackTrace();
log.error("消息格式化出错,sessionId:" + session.getId() + ",message:" + message);
}
if (activityParam != null && !StringUtils.isEmpty(activityParam.getOpenUserId()) && !StringUtils.isEmpty(activityParam.getKfOpenId())
&& !StringUtils.isEmpty(activityParam.getCorpId())) {
String sessionKey = activityParam.getCorpId()+ FlushRedisConstant.linkStr+activityParam.getKfOpenId()+FlushRedisConstant.linkStr+ activityParam.getOpenUserId();
if (!userWebSocketSet.containsKey(sessionKey)) {
userWebSocketSet.put(sessionKey, session);
}
sendMessage(session, ResponseDTO.defaultResponse("客户端接到消息sessionId:" + session.getId() + ",message:" + message));
} else {
sendMessage(session, ResponseDTO.defaultResponse("客户端接到消息,sessionId:" + session.getId() + ",message:" + message));
}
}
@OnError
public void onError(Session session, Throwable error) {
log.debug("sessionId:{}的连接发送错误", session.getId());
}
public static boolean sendMessage(Session session, ResponseDTO responseDTO) {
String res = JSONObject.toJSONString(responseDTO, SerializerFeature.WriteDateUseDateFormat);
session.getAsyncRemote().sendText(res);
return true;
}
public static boolean sendMessage(Session session, Object data) {
ResponseDTO responseDTO = ResponseDTO.defaultResponse(data);
String res = JSONObject.toJSONString(responseDTO, SerializerFeature.WriteDateUseDateFormat);
session.getAsyncRemote().sendText(res);
return true;
}
public static boolean sendMessage(String userId, ResponseDTO responseDTO) {
String res = JSONObject.toJSONString(responseDTO, SerializerFeature.WriteDateUseDateFormat);
if (StringUtils.isEmpty(userId)){
log.error("消息发送出错,userId为空,responseDTO:", res);
return false;
}
Session session = userWebSocketSet.get(userId);
if (null == session){
log.error("消息发送出错,session为空,responseDTO:", res);
return false;
}
session.getAsyncRemote().sendText(res);
return true;
}
public static boolean sendMessage(String userId, Object data) {
ResponseDTO responseDTO = ResponseDTO.defaultResponse(data);
String res = JSONObject.toJSONString(responseDTO, SerializerFeature.WriteDateUseDateFormat);
if (StringUtils.isEmpty(userId)){
log.error("消息发送出错,userId为空,responseDTO:", res);
return false;
}
Session session = userWebSocketSet.get(userId);
if (null == session){
log.error("消息发送出错,session为空,responseDTO:", res);
return false;
}
session.getAsyncRemote().sendText(res);
return true;
}
}
2.3 公共类:
2.3.1 ResponseDTO:
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.ArrayList;
import java.util.List;
@Data
@Accessors(chain = true)
public class ResponseDTO<T> {
private static final long serialVersionUID = 3918877423924837166L;
private int code = 200;
private T body;
private String msg;
private boolean redirect = true;
private List<ResponseMessage> messages;
private List<String> message = new ArrayList<>();
private int errNum;
private String errorMsg;
private boolean success = true;
public static ResponseDTO response400() {
return new ResponseDTO().setCode(BizHttpStatus.HTTP_STATUS_400.getStatus());
}
public static ResponseDTO response401() {
return new ResponseDTO().setCode(BizHttpStatus.HTTP_STATUS_401.getStatus());
}
public static ResponseDTO response403() {
return new ResponseDTO().setCode(BizHttpStatus.HTTP_STATUS_403.getStatus());
}
public static ResponseDTO response404() {
return new ResponseDTO().setCode(BizHttpStatus.HTTP_STATUS_404.getStatus());
}
public static ResponseDTO response403(List<ResponseMessage> messages) {
return new ResponseDTO().setCode(BizHttpStatus.HTTP_STATUS_403.getStatus()).setMessages(messages);
}
public static ResponseDTO response405(List<ResponseMessage> messages) {
return new ResponseDTO().setCode(BizHttpStatus.HTTP_STATUS_405.getStatus()).setMessages(messages);
}
public static ResponseDTO response406(List<ResponseMessage> messages) {
return new ResponseDTO().setCode(BizHttpStatus.HTTP_STATUS_406.getStatus()).setMessages(messages);
}
public static ResponseDTO response500() {
return new ResponseDTO().setCode(BizHttpStatus.HTTP_STATUS_500.getStatus());
}
public static ResponseDTO responseParam404(String msg) {
return new ResponseDTO().setCode(BizHttpStatus.HTTP_STATUS_404.getStatus()).setSuccess(false).setMsg(msg);
}
public static ResponseDTO responseData500(String msg) {
return new ResponseDTO().setCode(BizHttpStatus.HTTP_STATUS_500.getStatus()).setSuccess(false).setMsg(msg);
}
public static ResponseDTO responseData500(String msg, String errorMsg) {
return new ResponseDTO().setCode(BizHttpStatus.HTTP_STATUS_500.getStatus()).setSuccess(false).setMsg(msg).setErrorMsg(errorMsg);
}
public static <T> ResponseDTO defaultResponse(T t) {
return new ResponseDTO().setCode(BizHttpStatus.HTTP_STATUS_200.getStatus()).setBody(t);
}
public static ResponseDTO defaultErrorResponse(int httpStatus, List<ResponseMessage> singleMessage) {
return new ResponseDTO().setCode(httpStatus).setMessages(singleMessage);
}
public static ResponseDTO defaultErrorResponse(List<ResponseMessage> singleMessage) {
return new ResponseDTO().setCode(BizHttpStatus.HTTP_STATUS_200.getStatus()).setMessages(singleMessage);
}
public static ResponseDTO defaultErrorResponse(BizHttpStatus bizHttpStatus, List<ResponseMessage> singleMessage) {
return new ResponseDTO().setSuccess(false).setCode(bizHttpStatus.getStatus()).setMessages(singleMessage);
}
public static ResponseDTO errorResponse(BizHttpStatus bizHttpStatus, List<ResponseMessage> singleMessage) {
return new ResponseDTO().setCode(bizHttpStatus.getStatus()).setMessages(singleMessage);
}
public static ResponseDTO errorResponse(BizErrorEnum bizErrorEnum) {
return new ResponseDTO().setCode(bizErrorEnum.getErrorCode()).setMsg(bizErrorEnum.getErrorMessage()).setSuccess(false);
}
public static ResponseDTO errorResponse(BizErrorEnum bizErrorEnum, List<String> message) {
return new ResponseDTO().setCode(bizErrorEnum.getErrorCode()).setMsg(bizErrorEnum.getErrorMessage()).setMessage(message).setSuccess(false);
}
}
2.3.2 FlushRedisConstant:
public class FlushRedisConstant {
public static final String linkStr = "#thisislinkstr#";
}
2.3.3 WebSocketReqDto:
import lombok.Data;
import java.io.Serializable;
@Data
public class WebSocketReqDto implements Serializable {
private String openUserId;
private String kfOpenId;
private String corpId;
private String uuid;
}
2.3.4 在resource 下新建static 文件夹 放入测试的index.html 页面
<!DOCTYPE HTML>
<html>
<head>
<title>My WebSocket</title>
</head>
<body>
<input id="text" type="text" />
<button onclick="send()">Send</button>
<button onclick="closeWebSocket()">Close</button>
<div id="message"></div>
</body>
<script type="text/javascript">
var websocket = null;
if ('WebSocket' in window) {
websocket = new WebSocket("ws://localhost:port/websocket/wxkf/activity");
} else {
alert('Not support websocket')
}
websocket.onerror = function() {
setMessageInnerHTML("error");
};
websocket.onopen = function(event) {
console.log("连接成功建立");
}
websocket.onmessage = function(event) {
setMessageInnerHTML(event.data);
}
websocket.onclose = function() {
setMessageInnerHTML("close");
}
window.onbeforeunload = function() {
websocket.close();
}
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
function closeWebSocket() {
websocket.close();
}
function send() {
var message = document.getElementById('text').value;
websocket.send(message);
}
</script>
</html>
2.3.5 通过http://localhost:端口/index.html 进行测试
3 websocket spring-cloud+gateway 实现和boot 项目相同: 3.1 出现返回值非200 情况:
检查是否被全局的filter 做了拦截 (1)在spring-cloud gateway 中添加监控 Pom.xml 增加actuator的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
在spring-cloud gateway resources文件夹下的配置文件bootstrap.yml中放开对gateway 的监控(如果resources文件夹下没有bootstrap.yml 则手动添加bootstrap.yml)
management:
endpoints:
web:
exposure:
include:
- gateway
http://ip:端口/actuator/gateway/globalfilters 检查全局filter,检查自定义的一些filter是否对改请求进行了拦截;
如果发现某个filter 对websocket 请求进行了拦截,则可以加入一下代码:
String upgrade = exchange.getRequest().getHeaders().getUpgrade();
if ("websocket".equals(upgrade)) {
return chain.filter(exchange);
}
|