IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊 -> 正文阅读

[网络协议]Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊


基于上篇文章扩展

1. 引入Netty依赖

<!--后端采用springboot项目,netty只需引入这一个依赖 -->
<!--netty依赖 -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>

2. 创建netty服务器

package com.cnpc.modules.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

/**
 * @author wuzhenyong
 * ClassName:NettyWebSocketServer.java
 * date:2022-05-05 8:48
 * Description: Netty服务器
 */
@Component
public class NettyWebSocketServer implements ApplicationListener<ContextRefreshedEvent> {
    @Autowired
    private WebSocketChannelInit webSocketChannelInit;

    /**
     * 容器初始化完成后调用
     *
     * @param contextRefreshedEvent
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        // 启动netty服务器
        this.init();
    }

    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);

    private EventLoopGroup workerGroup = new NioEventLoopGroup();

    public void init() {
        try {
            //1.创建服务端启动助手
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //2.设置线程组
            serverBootstrap.group(bossGroup, workerGroup);
            //3.设置参数
            serverBootstrap.channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(webSocketChannelInit);
            //4.启动  绑定端口不能和服务端口一致
            ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
            System.out.println("--Netty服务端启动成功---");
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

3. 创建通道初始化对象

package com.cnpc.modules.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author wuzhenyong
 * ClassName:WebSocketChannelInit.java
 * date:2022-05-05 8:53
 * Description: 通道初始化对象
 */
@Component
public class WebSocketChannelInit extends ChannelInitializer {
    @Autowired
    private WebSocketHandler webSocketHandler;

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        //对http协议的支持.
        pipeline.addLast(new HttpServerCodec());
        // 对大数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        //post请求分三部分. request line / request header / message body
        // HttpObjectAggregator将多个信息转化成单一的request或者response对象
        pipeline.addLast(new HttpObjectAggregator(8000));
        // 将http协议升级为ws协议. websocket的支持
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, false, true));
        // 自定义处理handler
        pipeline.addLast(webSocketHandler);
    }
}

4. 创建自定义处理类

package com.cnpc.modules.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.stereotype.Component;

import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author wuzhenyong
 * ClassName:WebSocketHandler.java
 * date:2022-05-05 8:54
 * Description: 自定义处理类
 */
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
	/**
     * 管理channel的组,可以理解为channel的池 —— 客服使用
     */
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    /**
     * 用户通道管理
     */
    public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(16);
    /**
     * 通道绑定管理
     */
    public static ConcurrentHashMap<String, String> bindMap = new ConcurrentHashMap<>(16);
    public static List<Channel> channelList = new ArrayList<>();

    /**
     * 通道就绪事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        // channel.read();
        //当有新的客户端连接的时候, 将通道放入集合
        SocketAddress socketAddress = ctx.channel().remoteAddress();
        // 放入通道组
        channels.add(channel);
        System.out.println("有新的连接." + socketAddress);
    }

    /**
     * 用户事件触发 token校验
     *
     * @param ctx ctx
     * @param evt evt
     * @throws Exception 异常
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("触发事件");
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
            HttpHeaders httpHeaders = complete.requestHeaders();
            // 自行处理鉴权问题
            System.out.println("uri: " + uri);
            System.out.println("握手成功");
            channelMap.put(paramValue, ctx.channel());
        }

        super.userEventTriggered(ctx, evt);
    }

    /**
     * 收到消息事件
     *
     * @param ctx 通道处理程序上下文
     * @param textWebSocketFrame    文本框架网络套接字
     * @throws Exception 异常
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
        // 按照自己公司的逻辑进行处理消息
        System.out.println("通道: " + ctx.channel().remoteAddress() + "发送了消息");
        String msg = textWebSocketFrame.text();
        System.out.println("msg:" + msg);
        String[] params = msg.split(":");
        if (params[1].contains("yangmingquan")) {
            // 私发
            channelMap.get("yangmingquan").writeAndFlush(new TextWebSocketFrame(msg));
        }
        if (params[1].contains("wuzhenyong")) {
            // 私发
            channelMap.get("wuzhenyong").writeAndFlush(new TextWebSocketFrame(msg));
        }
        if (params[1].contains("all")) {
            // 群发
            channels.writeAndFlush(new TextWebSocketFrame(msg));
        }
    }

    /**
     * 通道未就绪--channel下线
     *
     * @param ctx ctx
     * @throws Exception 异常
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        SocketAddress socketAddress = ctx.channel().remoteAddress();
        System.out.println("通道:" + socketAddress + "已下线");
        //当有客户端断开连接的时候,就移除对应的通道
        channelList.remove(channel);
        ctx.close();
    }

    /**
     * 异常处理事件
     *
     * @param ctx   ctx
     * @param cause 导致
     * @throws Exception 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        Channel channel = ctx.channel();
        //移除集合
        channelList.remove(channel);
        ctx.close();
    }
}

5. 创建常量类

package com.cnpc.modules.netty;

/**
 * @author wuzhenyong
 * date:2022-05-05 8:35
 * Description:
 */
public final class WebSocketConstant {
    /**
     * websocket head参数key
     */
    public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";
}

6. 前端js

//1.创建websocket客户端
    var wsServer = 'ws://ip/';
    var limitConnect = 3;  // 断线重连次数
    var timeConnect = 0;

    function websocket() {
        //这里需要注意的是,prompt有两个参数,前面是提示的话,后面是当对话框出来后,在对话框里的默认值
        var username = prompt("请输入您的名字", ""); //将输入的内容赋给变量 name ,
    
        //建立WebSocket通讯
        //注意:如果你要兼容ie8+,建议你采用 socket.io 的版本。下面是以原生WS为例
        wsServer = 'ws://127.0.0.1:9090/ws?username=' + username;
        var socket = new WebSocket(wsServer);

        //连接成功时触发
        socket.onopen = function() {

        };

        //收到的消息事件 按自己需求处理
        socket.onmessage = function(res) {
            //res为接受到的值,如 {"emit": "messageName", "data": {}}
            //emit即为发出的事件名,用于区分不同的消息
            console.log('接收到消息:', res)
        } ;
        socket.onclose = function() {
            reconnect();
        };

        // 另外还有onclose、onerror,分别是在链接关闭和出错时触发
    }

    // 重连
    function reconnect() {
        // lockReconnect加锁,防止onclose、onerror两次重连
        if (limitConnect > 0) {
            limitConnect--;
            timeConnect++;
            console.log("第" + timeConnect + "次重连");
            // 进行重连
            setTimeout(function() {
                websocket();
            },2000);

        } else {
            console.log("TCP连接已超时");
        }
    }

7. 以上就可以使用websocket的方式进行聊天了

8. 遇到的问题

  • websocket路径传参问题 通道初始化对象 WebSocketChannelInit类

    // 将http协议升级为ws协议. websocket的支持
    pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, false, true));
    // 最后一个参数设置为true则连接websocket可以进行路径传参,否则传参连接不成功
    
  • websocket路径参数获取问题 自定义处理器 WebSocketHandler类

    // 重写userEventTriggered方法  用户连接触发事件
    
    /**
     * 用户事件触发 token校验
     *
     * @param ctx ctx
     * @param evt evt
     * @throws Exception 异常
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("触发事件");
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
            // 获取header信息
            HttpHeaders httpHeaders = complete.requestHeaders();
            // 自行处理鉴权问题
            System.out.println("uri: " + uri);
            System.out.println("握手成功");
            channelMap.put(paramValue, ctx.channel());
        }
    
        super.userEventTriggered(ctx, evt);
    }
    
  • 启动失败问题 Netty服务器 NettyWebSocketServer类

    //init方法更换为一下代码,删除异常捕获关闭事件
    public void init() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
    
        //1.创建服务端启动助手
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //2.设置线程组
        serverBootstrap.group(bossGroup, workerGroup);
        //3.设置参数
        serverBootstrap.channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.DEBUG))
            .childHandler(webSocketChannelInit);
        //4.启动  绑定端口不能和服务端口一致
        ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
        System.out.println("--Netty服务端启动成功---");
    }
    
  • js websocket传入header参数

    var ws = new WebSocket("地址", ['header参数信息']);
    
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-05-09 13:06:47  更:2022-05-09 13:08:18 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/26 1:33:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码