导入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
package com.zm.webscoket.config;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class MyChannelHandler {
public MyChannelHandler() {
}
public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static ConcurrentMap<String, ChannelId> ChannelMap = new ConcurrentHashMap();
public static void addChannel(String apiToken, Channel channel) {
channelGroup.add(channel);
if (null != apiToken) {
ChannelMap.put(apiToken, channel.id());
}
}
public static void updateChannel(String apiToken, Channel channel) {
Channel chan = channelGroup.find(channel.id());
if (null == chan) {
addChannel(apiToken, channel);
} else {
ChannelMap.put(apiToken, channel.id());
}
}
public static void removeChannel(Channel channel) {
channelGroup.remove(channel);
channel.close();
Collection<ChannelId> values = ChannelMap.values();
values.remove(channel.id());
}
public static Channel findChannel(String apiToken) {
ChannelId chanId = ChannelMap.get(apiToken);
if (null == chanId) {
return null;
}
return channelGroup.find(ChannelMap.get(apiToken));
}
public static void sendToAll(String message) {
channelGroup.writeAndFlush(new TextWebSocketFrame(message));
}
private void SendAllExceptMy(String apiToken, String msg) {
Channel myChannel = channelGroup.find(ChannelMap.get(apiToken));
if(null != myChannel){
for(Channel channel:channelGroup){
if(!channel.id().asLongText().equals(myChannel.id().asLongText())){
channel.writeAndFlush(new TextWebSocketFrame(msg));
}
}
}
}
public static void sendToSimple(String apiToken, String message) {
channelGroup.find(ChannelMap.get(apiToken)).writeAndFlush(new TextWebSocketFrame(message));
}
}
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().id() + "与客户端建立连接,通道开启!");
MyChannelHandler.channelGroup.add(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().id() + "与客户端断开连接,通道关闭!");
MyChannelHandler.channelGroup.remove(ctx.channel());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
ChannelId id = channel.id();
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
Map paramMap = getUrlParams(uri);
System.out.println("接收到的参数是:" + paramMap);
if (uri.contains("?")) {
String newUri = uri.substring(0, uri.indexOf("?"));
System.out.println(newUri);
request.setUri(newUri);
}
}
if (msg instanceof TextWebSocketFrame) {
TextWebSocketFrame frame = (TextWebSocketFrame) msg;
System.out.println(new Date() + "客户端收到服务器数据:" + frame.text());
MyChannelHandler.sendToAll(frame.text());
}
super.channelRead(ctx, msg);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("异常发生了...");
cause.printStackTrace();
ctx.close();
}
private static Map getUrlParams(String url) {
Map<String, String> map = new HashMap<>();
url = url.replace("?", ";");
if (!url.contains(";")) {
return map;
}
if (url.split(";").length > 0) {
String[] arr = url.split(";")[1].split("&");
for (String s : arr) {
String key = s.split("=")[0];
String value = s.split("=")[1];
map.put(key, value);
}
return map;
} else {
return map;
}
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.Date;
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj){
if (obj instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)obj;
if (event.state()== IdleState.READER_IDLE){
System.out.println(ctx.channel().id() +"客户端读超时" + new Date());
MyChannelHandler.removeChannel(ctx.channel());
}else if (event.state()== IdleState.WRITER_IDLE){
System.out.println(ctx.channel().id() +"客户端写超时" + new Date());
}else if (event.state()==IdleState.ALL_IDLE){
System.out.println(ctx.channel().id() +"客户端所有操作超时");
}
}
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Date;
@Component
public class NettyServer {
@Value("${server.port:8080}")
private Integer port;
@PostConstruct
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
EventLoopGroup bossGroup = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024);
sb.option(ChannelOption.TCP_NODELAY,true);
sb.group(group, bossGroup)
.channel(NioServerSocketChannel.class)
.localAddress(this.port)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch){
System.out.println("收到新连接"+ new Date());
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpObjectAggregator(8192));
ch.pipeline().addLast(new IdleStateHandler(40,0,0));
ch.pipeline().addLast(new HeartBeatHandler());
ch.pipeline().addLast(new MyWebSocketHandler());
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));
}
});
ChannelFuture cf = sb.bind().sync();
cf.channel().closeFuture().sync();
if (cf.isSuccess()) {
System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
}
} finally {
System.out.println("释放线程池资源");
group.shutdownGracefully().sync();
bossGroup.shutdownGracefully().sync();
}
}
}
测试地址
链接: 在线websocket测试网站. 输入 ws://127.0.0.1:8080/ws 连接即可
|