Netty异步NIO框架(二)前端后端聊天 私聊及群聊
基于上篇文章扩展
1. 引入Netty依赖
<!--后端采用springboot项目,netty只需引入这一个依赖 -->
<!--netty依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
2. 创建netty服务器
package com.cnpc.modules.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
@Component
public class NettyWebSocketServer implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private WebSocketChannelInit webSocketChannelInit;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
this.init();
}
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup();
public void init() {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(webSocketChannelInit);
ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
System.out.println("--Netty服务端启动成功---");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
3. 创建通道初始化对象
package com.cnpc.modules.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class WebSocketChannelInit extends ChannelInitializer {
@Autowired
private WebSocketHandler webSocketHandler;
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(8000));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, false, true));
pipeline.addLast(webSocketHandler);
}
}
4. 创建自定义处理类
package com.cnpc.modules.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.stereotype.Component;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(16);
public static ConcurrentHashMap<String, String> bindMap = new ConcurrentHashMap<>(16);
public static List<Channel> channelList = new ArrayList<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
SocketAddress socketAddress = ctx.channel().remoteAddress();
channels.add(channel);
System.out.println("有新的连接." + socketAddress);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("触发事件");
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
HttpHeaders httpHeaders = complete.requestHeaders();
System.out.println("uri: " + uri);
System.out.println("握手成功");
channelMap.put(paramValue, ctx.channel());
}
super.userEventTriggered(ctx, evt);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
System.out.println("通道: " + ctx.channel().remoteAddress() + "发送了消息");
String msg = textWebSocketFrame.text();
System.out.println("msg:" + msg);
String[] params = msg.split(":");
if (params[1].contains("yangmingquan")) {
channelMap.get("yangmingquan").writeAndFlush(new TextWebSocketFrame(msg));
}
if (params[1].contains("wuzhenyong")) {
channelMap.get("wuzhenyong").writeAndFlush(new TextWebSocketFrame(msg));
}
if (params[1].contains("all")) {
channels.writeAndFlush(new TextWebSocketFrame(msg));
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
SocketAddress socketAddress = ctx.channel().remoteAddress();
System.out.println("通道:" + socketAddress + "已下线");
channelList.remove(channel);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
Channel channel = ctx.channel();
channelList.remove(channel);
ctx.close();
}
}
5. 创建常量类
package com.cnpc.modules.netty;
public final class WebSocketConstant {
public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";
}
6. 前端js
var wsServer = 'ws://ip/';
var limitConnect = 3;
var timeConnect = 0;
function websocket() {
var username = prompt("请输入您的名字", "");
wsServer = 'ws://127.0.0.1:9090/ws?username=' + username;
var socket = new WebSocket(wsServer);
socket.onopen = function() {
};
socket.onmessage = function(res) {
console.log('接收到消息:', res)
} ;
socket.onclose = function() {
reconnect();
};
}
function reconnect() {
if (limitConnect > 0) {
limitConnect--;
timeConnect++;
console.log("第" + timeConnect + "次重连");
setTimeout(function() {
websocket();
},2000);
} else {
console.log("TCP连接已超时");
}
}
7. 以上就可以使用websocket的方式进行聊天了
8. 遇到的问题
-
websocket路径传参问题 通道初始化对象 WebSocketChannelInit类
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, false, true));
-
websocket路径参数获取问题 自定义处理器 WebSocketHandler类
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("触发事件");
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
HttpHeaders httpHeaders = complete.requestHeaders();
System.out.println("uri: " + uri);
System.out.println("握手成功");
channelMap.put(paramValue, ctx.channel());
}
super.userEventTriggered(ctx, evt);
}
-
启动失败问题 Netty服务器 NettyWebSocketServer类
public void init() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(webSocketChannelInit);
ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
System.out.println("--Netty服务端启动成功---");
}
-
js websocket传入header参数 var ws = new WebSocket("地址", ['header参数信息']);
|