Websocket
提供多种websocket实现方案,包括集群模式的解决,附详细代码,轻松掌握websocket。
总结
- Websocket介绍
- Websocket使用场景
- 基于SpringBoot实现Websocket
- 基于Netty实现Websocket
- 基于tio实现Websocket(推荐)
- 实现websocket(集群版)
- demo代码
Websocket介绍
WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
Websocket使用场景
- 数字大屏实时更新,典型的如:航班信息,股票基金报价,体育实况等
- 消息提醒
- 社交订阅
- 多人聊天
- web页面日志实时查看
- web页面中模拟shell交互
- 等等
基于SpringBoot实现WS
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
添加Webscoket配置
@Component
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
Webscoket通信代码
@Component
@ServerEndpoint("/websocket/{id}")
@Slf4j
public class WebSocketServer {
@OnOpen
public void onOpen(@PathParam(value = "id") String id, Session session) {
log.info("客户端" + id + "连接建立.");
WsSessionManager.add(id, session);
try {
sendMessage(id, "客户端" + id + "连接建立.");
} catch (IOException e) {
log.error("WebSocket IO异常");
}
}
@OnClose
public void onClose(@PathParam(value = "id") String id, Session session) {
log.info("有一连接关闭:{}", id);
WsSessionManager.remove(id);
}
@OnMessage
public void onMessage(@PathParam(value = "id") String id, String message) {
log.info("来自客户端的消息:" + message);
String[] messages = message.split("[|]");
try {
if (messages.length > 1) {
sendToUser(messages[0], messages[1], id);
} else {
sendToAll(messages[0]);
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
@OnError
public void onError(Session session, Throwable e) {
log.error("WebSocket发生错误:{}", e.getMessage(), e);
}
private void sendMessage(String id, String message) throws IOException {
Session session = WsSessionManager.get(id);
session.getBasicRemote().sendText(message);
}
private void sendToUser(String message, String sendClientId, String myId) throws IOException {
if (sendClientId == null || WsSessionManager.get(sendClientId) == null) {
sendMessage(myId, "当前客户端不在线");
} else {
sendMessage(sendClientId, message);
}
}
private void sendToAll(String message) throws IOException {
for (String key : WsSessionManager.SESSION_POOL.keySet()) {
WsSessionManager.get(key).getBasicRemote().sendText(message);
}
}
}
Session管理管理工具类
@Slf4j
public class WsSessionManager {
public static ConcurrentHashMap<String, Session> SESSION_POOL = new ConcurrentHashMap<>();
public static void add(String key, Session session) {
SESSION_POOL.put(key, session);
}
public static Session remove(String key) {
return SESSION_POOL.remove(key);
}
public static void removeAndClose(String key) {
Session session = remove(key);
if (session != null) {
try {
session.close();
} catch (IOException e) {
log.error("删除并同步关闭连接异常:{}", e.getMessage(), e);
}
}
}
public static Session get(String key) {
return SESSION_POOL.get(key);
}
}
测试
这里推荐一个在线的测试工具:http://coolaf.com/zh/tool/chattest
你发送的信息 2022-09-04 14:05:41
你好|4
你发送的信息 2022-09-04 14:05:57
你好|4
websocket连接已断开!!!
连接成功,现在你可以发送信息啦!!!
服务端回应 2022-09-04 14:09:23
客户端1连接建立.
你发送的信息 2022-09-04 14:09:27
你好|4
服务端回应 2022-09-04 14:09:31
当前客户端不在线
你发送的信息 2022-09-04 14:09:42
你好
服务端回应 2022-09-04 14:09:42
你好
你发送的信息 2022-09-04 14:09:49
11
基于Netty实现WS
引入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
Netty配置
@Component
@Slf4j
public class NettyServer {
private int port = 8090;
private EventLoopGroup mainGroup;
private EventLoopGroup subGroup;
private ServerBootstrap server;
private ChannelFuture future;
public NettyServer() {
mainGroup = new NioEventLoopGroup();
subGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.option(ChannelOption.SO_BACKLOG, 1024);
server.group(mainGroup, subGroup).channel(NioServerSocketChannel.class).localAddress(this.port).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("收到新连接:" + ch.localAddress());
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpObjectAggregator(8192));
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
ch.pipeline().addLast(new MyWebSocketHandler());
}
});
}
public void start() {
this.future = server.bind(this.port);
log.info("netty server 启动完毕,启动端口为:" + this.port);
}
}
处理器
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channelGroup;
static {
channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("与客户端建立连接,通道开启!");
channelGroup.add(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("与客户端断开连接,通道关闭!");
channelGroup.remove(ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg){
System.out.println("服务器收到的数据:" + msg.text());
sendAllMessage();
}
private void sendMessage(ChannelHandlerContext ctx) {
String message = "你好,"+ctx.channel().localAddress()+" 给固定的人发消息";
ctx.channel().writeAndFlush(new TextWebSocketFrame(message));
}
private void sendAllMessage(){
String message = "我是服务器,这里发送的是群消息";
channelGroup.writeAndFlush( new TextWebSocketFrame(message));
}
}
启动类
@SpringBootApplication
public class Main implements CommandLineRunner {
@Autowired
private NettyServer nettyServer;
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
@Override
public void run(String... args) throws Exception {
this.nettyServer.start();
}
}
测试
这里推荐一个在线的测试工具:http://coolaf.com/zh/tool/chattest 输入地址:ws://localhost:8090/ws 就能愉快的测试了
基于tio实现WS(推荐)
tio是什么?
https://www.tiocloud.com/doc/tio/85 他的优势在于API设计易懂,尽量避免引入自创概念——最大限度降低学习成本。
引入依赖
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-websocket-spring-boot-starter</artifactId>
<version>3.6.0.v20200315-RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
编写配置
@Component
public class MyWebSocketMsgHandler implements IWsMsgHandler {
@Override
public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
return httpResponse;
}
@Override
public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
System.out.println("onAfterHandshaked 握手成功");
}
@Override
public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
System.out.println("onBytes 接收到bytes消息");
return null;
}
@Override
public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
System.out.println("onClose");
return null;
}
@Override
public Object onText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception {
System.out.println("onText 接收到文本消息:"+s);
return "应答消息:"+s;
}
}
主动推送
@RestController
@RequestMapping("/push")
public class PushController {
@Autowired
private TioWebSocketServerBootstrap bootstrap;
@GetMapping("/msg")
public void pushMessage(String msg){
if (StrUtil.isEmpty(msg)){
msg = "hello tio websocket spring boot starter";
}
Tio.sendToAll(bootstrap.getServerTioConfig(), WsResponse.fromText(msg,"utf-8"));
}
}
启动类
@SpringBootApplication
@EnableTioWebSocketServer
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
配置文件
tio:
websocket:
server:
port: 9876
heartbeat-timeout: 60000
测试
这里推荐一个在线的测试工具:http://coolaf.com/zh/tool/chattest 输入地址:ws://localhost:9876 就能愉快的测试了
基于tio实现websocket(集群版)
原理
引入Redis的发布订阅模式
demo代码已经发布到GitHub,需要请自取:https://github.com/shenhuan2021/websocket-cluster-demo
|