背景
项目里边有个模块首页会显示一些汇总信息,有部分信息需要进行实时更新,因此想到使用WebSocket进行长连接,进入后便链接后台,然后后台根据需求(定时、数据有更新)给前台反馈数据,从而达到一次链接,一直通信的功能。避免了前端轮询调用带来的资源消耗。
WebSocket:
本人是第一次接触websocket,之前对类似的处理方式能想到的就是前端隔一段时间调用一次接口来获取数据。然后项目内的人就推荐让我用websocket去写,这样就不用前端一直调用接口了,直接初始化链接,然后一直保持通信,后台根据需求给前端返回数据。
pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
websocket处理类
@ServerEndpoint(value = "/webSocket/projectHome", subprotocols = "")
@Slf4j
@Component
public class WebSocketServer {
@Autowired
private static StatisticsDataService service;
private static final CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();
private Session session;
private static final AtomicInteger count = new AtomicInteger();
private String sid = "";
@OnOpen
public void onOpen(Session session) {
Authentication authentication = (Authentication) session.getUserPrincipal();
SecurityUtils.setAuthentication(authentication);
String username = SecurityUtils.getUsername();
this.session = session;
for (WebSocketServer webSocket : webSocketSet) {
if (webSocket.sid.equals(username)) {
webSocketSet.remove(webSocket);
count.getAndDecrement();
}
}
count.getAndIncrement();
webSocketSet.add(this);
this.sid = username;
}
@OnClose
public void onClose() {
webSocketSet.remove(this);
count.getAndDecrement();
}
@OnMessage
public void onMessage(String message, Session session) {
Authentication authentication = (Authentication) session.getUserPrincipal();
log.info("收到来自" + sid + "的信息:" + message);
service.refresh(sid, authentication);
}
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
private void sendMessage(String type, Object data) throws IOException {
Map<String, Object> result = new HashMap<>();
result.put("type", type);
result.put("data", data);
this.session.getAsyncRemote().sendText(ObjectMapperBuilder.toJSONString(result));
}
public static void sendInfo(String type, Object data, @PathParam("sid") String sid) {
for (WebSocketServer item : webSocketSet) {
try {
if (sid == null) {
item.sendMessage(type, data);
} else if (item.sid.equals(sid)) {
item.sendMessage(type, data);
}
} catch (IOException ignored) {
}
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WebSocketServer that = (WebSocketServer) o;
return Objects.equals(session, that.session);
}
public static boolean isConn(String sid) {
for (WebSocketServer item : webSocketSet) {
if (item.sid.equals(sid)) {
return true;
}
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(session);
}
@Autowired
public void setRepository(StatisticsDataService service) {
WebSocketServer.service = service;
}
开发出现的问题
- 业务代码处理
- 注入的service类为null
- 上下文认证
问题处理
1.业务代码的调用: 一开始想到的是用定时任务隔一段时间跑一次,如下:
@Scheduled(fixedDelay = 5000L)
private void refresh(){
}
但是业务需求是将数据与上一次发送的数据做对比,如果有更新才给客户端发送数据, 而且需要判断用户是否还在线,这样定时任务就不适用了,无法关联用户。 然后转换到在websocket中OnMessage内调用业务方法,如下:
@OnMessage
public void onMessage(String message, Session session) {
Authentication authentication = (Authentication) session.getUserPrincipal();
log.info("收到来自" + sid + "的信息:" + message);
service.refresh(sid, authentication);
}
2.这样调用引发了第二个问题,就是注入的service在调用方法时一直报空指针,原理应该是websocket不受spring管控,所以在websocket中是拿不到spring中注入的对象的。然后百度一波,发现在websocket中需要自己设定一个,如下:
@Autowired
private static StatisticsDataService service;
@Autowired
public void setRepository(StatisticsDataService service) {
WebSocketServer.service = service;
}
3.这样就解决了注入对象空指针异常的问题了。本以为可以愉快的跑起来了,但是还没咧开嘴第3个问题就出来了。因为是微服务,所以有的接口是需要通过feign接口来调用的,然后业务处理的话是使用另外一个新线程来跑的,这就造成上下文中没有了用户的信息,导致获取用户权限时失败。。。 解决方法: 1.前端在建立链接时将认证token以参数的方式传入后台 例:ws://localhost:8080/webSocket/projectHome?Authorization=abcdefghi*** 2.后台Spring-Security进行拦截过滤,从url中获取token,根据token获取用户信息,然后将用户注入到上下文中。这块代码是别人写的,因为涉及到搭建框架方式,所以不大清楚处理方式,不过逻辑是这样的。 3.在业务处理线程中将用户信息再次注入。代码如下:
websocket内在收到客户端信息后进行业务处理调用
@OnMessage
public void onMessage(String message, Session session) {
Authentication authentication = (Authentication) session.getUserPrincipal();
log.info("收到来自" + sid + "的信息:" + message);
service.refresh(sid, authentication);
}
service中
private void refresh(String userId, Authentication authentication) {
ThreadPoolExecutorUtil.getPoll().execute(() -> {
SecurityUtils.setAuthentication(authentication);
int num = 0;
while (WebSocketServer.isConn(userId)) {
int newNum = 1;
if (num != newNum) {
num = newNum;
WebSocketServer.sendInfo("num", newNum, userId);
}
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
这样就能实现长连接实时通信了,而且上下文中也能获取到用户信息,进行用户鉴权了。
后来项目内大佬有写了一种定时任务来替换创建线程执行业务代码,代码如下:
private void refresh(String userId, Authentication authentication) {
handler.start(5000L, task -> {
SecurityUtils.setAuthentication(authentication);
int num = 0;
if (WebSocketServer.isConn(userId)) {
int newNum = 1;
if (num != newNum) {
num = newNum;
WebSocketServer.sendInfo("num", newNum, userId);
}
return true;
}
return false;
});
}
handler内:
public void start(long delay, Function<Timeout, Boolean> function) {
timer.newTimeout(t -> {
Boolean result = function.apply(t);
if (result) {
timer.newTimeout(t.task(), delay, TimeUnit.MILLISECONDS);
}
}, delay, TimeUnit.MILLISECONDS);
}
到此WebSocket使用开发完毕,第一次用,有些地方说的不对的还请大佬指出,一起学习。
|