IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Netty+websocket实现消息推送和统计在线人数 -> 正文阅读

[网络协议]Netty+websocket实现消息推送和统计在线人数

1、 定义变量

定义一个channel组,管理所有的channel,再定义一个map,管理用户与channel的对应关系

public class ChannelConfigs {
    /**
     * 定义一个channel组,管理所有的channel * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 存放用户与Chanel的对应信息,用于给指定用户发送消息
     */
    private static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>();

    /**
     * 获取channel组
     */
    public static ChannelGroup getChannelGroup() {
        return channelGroup;
    }

    /**
     * 获取用户channel map
     */
    public static ConcurrentHashMap<String, Channel> getUserChannelMap() {
        return userChannelMap;
    }
}

2、人数实体类

public class UserInfo {

    private static AtomicInteger uidGener = new AtomicInteger(1000000);

    //用户id
    private int userId;

    //用户名
    private String userName;

    //客户端地址
    private String addr;

    //客户端通道
    private Channel channel;
    .....
    getter()和setter方法省略
 }

3、消息推送初始化

public class PushMsgServer{
    private static final Logger logger = LoggerFactory.getLogger(PushMsgServer.class);

    @Override
    public void init(String host, int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("http-codec", new HttpServerCodec());
                            ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                            ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                            // ch.pipeline().addLast(new WebSocketServerProtocolHandler("ws://localhost:8082/ws", "websocket"));
                            ch.pipeline().addLast(new IdleStateHandler(60, 60, 60, TimeUnit.SECONDS));
                            ch.pipeline().addLast("handler", new PushMsgServerHandler());
                        }
                    });

            Channel ch = b.bind(8082).sync().channel();
            if (ch.isOpen()) {
                logger.info("推送消息服务器启动成功...");
            }
            ch.closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    @Override
    public void startServer(String host, int port) {
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                init(host, port);
            }
        });
        t.start();
    }


    public static void main() {
        try {
            new PushMsgServer().startServer("49.232.66.48", 8082);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4、消息推送Handler

public class PushMsgServerHandler extends SimpleChannelInboundHandler<Object> {
    private static final Logger logger = LoggerFactory.getLogger(PushMsgServerHandler.class);
    //统计用户变量
    private static AtomicInteger userCount = new AtomicInteger(0);
    private WebSocketServerHandshaker handshaker;


    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 传统的HTTP接入
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        }
        // WebSocket接入
        else if (msg instanceof WebSocketFrame) {
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //添加连接
        userCount.incrementAndGet();
        logger.info("当前在线人数为{}:", userCount.intValue());
        // 返回应答消息
        //ctx.channel().write(new TextWebSocketFrame( "这里是服务端:{}" + new Date()));
        // 添加到channelGroup 通道组
        ChannelConfigs.getChannelGroup().add(ctx.channel());
    }


    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //断开连接
        logger.info("客户端断开连接:");
        userCount.decrementAndGet();
        logger.info("当前在线人数为{}:", userCount.intValue());
        ChannelConfigs.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }


    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //判断该事件是否为超时事件
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            String type = "";
            //判断具体事件  具体项目可以根据不同情况进行不同处理
            switch (event.state()) {
                case READER_IDLE:
                    type = "读空闲";
                    break;
                case WRITER_IDLE:
                    type = "写空闲";
                    break;
                default:
                    type = "读写空闲";
            }
            logger.info("{}超时事件={}\n", ctx.channel().remoteAddress().toString(), type);
            ctx.channel().close();
        }
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {

        // 如果HTTP解码失败,返回HHTP异常
        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
            return;
        }

        // 构造握手响应返回,本机测试
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8082/websocket", null, false);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }
    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

        // 判断是否是关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判断是否是Ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 本例程仅支持文本消息,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
        }
        String reqest = ((TextWebSocketFrame) frame).text();
        ctx.channel().write(new TextWebSocketFrame(reqest + "这里是服务端:{}" + new Date()));
    }

    /**
     * 删除用户与channel的对应关系
     */
    private void removeUserId(ChannelHandlerContext ctx) {
       /* AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();*/
        ChannelConfigs.getUserChannelMap().remove(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 删除通道
        ChannelConfigs.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        // 返回应答给客户端
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(),
                    CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            HttpUtil.setContentLength(res, res.content().readableBytes());
        }

        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

4、消息推送Service

public interface PushService {
    /**
     * 推送给指定用户
     */
    void pushMsgToOne(String userId, String msg);

    /**
     * 推送给所有用户
     */
    void pushMsgToAll(String msg);
}

5、消息推送实现类

public class PushServiceImpl implements PushService {
    @Override
    public void pushMsgToOne(String userId, String msg) {
        ConcurrentHashMap<String, Channel> userChannelMap = ChannelConfigs.getUserChannelMap();
        Channel channel = userChannelMap.get(userId);
        channel.writeAndFlush(new TextWebSocketFrame(msg));
    }

    @Override
    public void pushMsgToAll(String msg) {
        ChannelConfigs.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
    }
}

6、前端测试

<!DOCTYPE html>
<html>
<head>
  <meta charset="UTF-8">
  Netty WebSocket 时间服务器
</head>
<br>
<body>
<br>
<script type="text/javascript">
  var socket;
  if (!window.WebSocket) {
    window.WebSocket = window.MozWebSocket;
  }
  if (window.WebSocket) {
    socket = new WebSocket("ws://localhost:8082/websocket");
    socket.onmessage = function (event) {
      var ta = document.getElementById('responseText');
      ta.value = "";
      ta.value = event.data
    };
    socket.onopen = function (event) {
      var ta = document.getElementById('responseText');
      ta.value = "打开WebSocket服务正常,浏览器支持WebSocket!";
    };
    socket.onclose = function (event) {
      var ta = document.getElementById('responseText');
      ta.value = "";
      ta.value = "WebSocket 关闭!";
    };
  }
  else {
    alert("抱歉,您的浏览器不支持WebSocket协议!");
  }

  function send(message) {
    if (!window.WebSocket) {
      return;
    }
    if (socket.readyState == WebSocket.OPEN) {
      socket.send(message);
    }
    else {
      alert("WebSocket连接没有建立成功!");
    }
  }
</script>
<form onsubmit="return false;">
  <input type="text" name="message" value="Netty最佳实践"/>
  <br><br>
  <input type="button" value="发送WebSocket请求消息" onclick="send(this.form.message.value)"/>
  <hr color="blue"/>
  <h3>服务端返回的应答消息</h3>
  <textarea id="responseText" style="width:500px;height:300px;"></textarea>
</form>
</body>
</html>

7、测试结果

开启三个客户端:这里加入了心跳机制,当超过一分钟没有写操作时,断开连接
在这里插入图片描述

发送消息:
在这里插入图片描述
OVER!!!

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-03-10 22:59:52  更:2022-03-10 23:00:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/4 19:30:48-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码