在之前浏览器还不支持WebSocket的时候,Web开发者大多使用轮询接口的方式来实现近实时的数据更新。这种单方向通信的方式,由于服务器是被动接受查询,只能实现近实时的消息更新,且轮询的频率很难准确确定,如果频率高势必会增加服务器的负担;如果频率低,服务器端的消息可能很有很长的延迟才能达到客户端。 如今,web3.0时代的到来,几乎所有的浏览器和Web服务器均支持了WebSocket。WebSocket的产生正式为了解决客户端与Web服务器之间单向通信的问题。WebSocket实现了浏览器(客户端)与Web服务器的双向通信,建立连接之后任何一方都可以主动发送消息到对方。
Spring Boot提供了WebSocket的自动化配置,按照下面的步骤即可快速的将WebSocket整合到项目中。 本文使用的环境如下:
- Spring Boot:2.6.6
- JDK:11.0.2
1.引入依赖Starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2. 编写WebSocket消息处理器
首先介绍一下WebSocketHandler接口,此接口用于处理连接的生命周期事件和消息处理:
package org.springframework.web.socket;
public interface WebSocketHandler {
void afterConnectionEstablished(WebSocketSession session) throws Exception;
void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;
void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;
void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;
boolean supportsPartialMessages();
}
实现自己的Handler:
import com.github.cloudgyb.websocket.config.WebSocketEndpoint;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
@Component
public class MyWebSocketHandler extends TextWebSocketHandler {
private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
System.out.println("连接已建立,会话ID:" + session.getId() + ",客户端地址:" + session.getRemoteAddress());
sessions.put(session.getId(),session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
System.out.println("接受到消息:" + message.getPayload());
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
System.out.println("消息传输出错!");
exception.printStackTrace();
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
System.out.println("连接被关闭,会话ID:" + session.getId() + ",客户端地址:" + session.getRemoteAddress());
sessions.remove(session.getId());
}
public void pushMsg(String msg) {
final Collection<WebSocketSession> webSocketSessions = sessions.values();
final TextMessage textMessage = new TextMessage(msg);
webSocketSessions.forEach(s -> {
try {
s.sendMessage(textMessage);
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
上边实现了主要的4个方法,分别用于处理连接建立、文本消息处理、处理传输错误和处理连接关闭。另外使用了ConcurrentHashMap实现了会话的管理,将建立连接时的会话保存了起来,用于我们集体推送消息到客户端。
3. WebSocket配置类
上边的Handler还不能处理WebSocket消息,因为还没启用WebSocket,下面的类用于启用WebSocket和注册端点处理器。
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.handler.TextWebSocketHandler;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final MyWebSocketHandler myWebSocketHandler;
public WebSocketConfig(MyWebSocketHandler myWebSocketHandler){
this.myWebSocketHandler = myWebSocketHandler;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myWebSocketHandler, "/ws");
}
}
上边注册了我们自定义的Handler,这样客户端就可以使用ws://localhost:8080/ws进行连接了。
4.测试消息发送
启动项目,我们使用Postman来进行测试: 服务端输出:
5. 服务端消息推送
接下来我来实现并演示一下,服务器消息推送到客户端:
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class MsgSource implements ApplicationRunner {
private final MyWebSocketHandler myWebSocketHandler;
public MsgSource(MyWebSocketHandler myWebSocketHandler) {
this.myWebSocketHandler = myWebSocketHandler;
}
@Override
public void run(ApplicationArguments args) {
new Thread(() -> {
while (true) {
myWebSocketHandler.pushMsg("这是一个消息" + new Date());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
使用Postman测试,打开多个标签建立多个连接,同时都能收到服务端推送的消息: 到此为止,Spring Boot整合WebSocket实现客户端与服务器之间的双向通信已经完成了。
下面是一些代码设计上的优化,如果你只是了解如何整合,你可以忽略。
6. 代码设计优化
6.1 不合理分析
上面的WebSocketConfig类的registerWebSocketHandlers方法用于注册Handler,我直接将MyWebSocketHandler注入到了该类中。 仔细想想这样设计不合理:
- 一个配置类依赖了某个功能的具体实现?
- 当我们有多个Handler时,需要修改该配置类增加Handler的实现依赖,以注册Handler。不符合开闭原则!
- Handler的实现类与WebSocketConfig严重耦合。
6.2 优化
我们可以自定义一个注解WebSocketEndpoint 来标识一个类是一个Handler。
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface WebSocketEndpoint {
String value();
}
修改MyWebSocketHandler实现:
@Component
@WebSocketEndpoint("/ws")
public class MyWebSocketHandler extends TextWebSocketHandler {
}
我们标注了@WebSocketEndpoint注解,指定了端点路径。
修改WebSocketConfig 配置类:
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final ObjectProvider<TextWebSocketHandler> webSocketHandlers;
public WebSocketConfig(ObjectProvider<TextWebSocketHandler> webSocketHandlers) {
this.webSocketHandlers = webSocketHandlers;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
webSocketHandlers.forEach(textWebSocketHandler -> {
final WebSocketEndpoint annotation = textWebSocketHandler.getClass()
.getAnnotation(WebSocketEndpoint.class);
if (annotation != null) {
final String endpoint = annotation.value();
registry.addHandler(textWebSocketHandler, endpoint);
}
});
}
}
这里我们借助了Spring提供的ObjectProvider方便的拿到所有的实现了TextWebSocketHandler的Handler Bean,这样我们就可以不用关心到底有多少个Handler,达到了一次性注册的目的。并且增加一个handler该配置了也无需修改了! 我们遵守了面向对象设计里面的开闭原则。
仔细看看,其实MyWebSocketHandler 类的设计也有不可理的地方,它本质上是一个消息处理器,但是我们的设计让它同时维护了session,这点是不是不符合单一职责原则呢?
7. 源码地址
为了方便大家参考,我将该文对应的源码放大了Github上:https://github.com/cloudgyb/websocket-spring-boot.
|