IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> springboot使用netty整合WebSocket-netty(三) -> 正文阅读

[网络协议]springboot使用netty整合WebSocket-netty(三)

导入依赖

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.36.Final</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
package com.zm.webscoket.config;

import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class MyChannelHandler {

    public MyChannelHandler() {
    }

    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    private static ConcurrentMap<String, ChannelId> ChannelMap = new ConcurrentHashMap();

    public static void addChannel(String apiToken, Channel channel) {
        channelGroup.add(channel);
        if (null != apiToken) {
            ChannelMap.put(apiToken, channel.id());
        }
    }

    public static void updateChannel(String apiToken, Channel channel) {
        Channel chan = channelGroup.find(channel.id());
        if (null == chan) {
            addChannel(apiToken, channel);
        } else {
            ChannelMap.put(apiToken, channel.id());
        }
    }

    public static void removeChannel(Channel channel) {
        channelGroup.remove(channel);
        channel.close();
        Collection<ChannelId> values = ChannelMap.values();
        values.remove(channel.id());
    }

    public static Channel findChannel(String apiToken) {
        ChannelId chanId = ChannelMap.get(apiToken);
        if (null == chanId) {
            return null;
        }
        return channelGroup.find(ChannelMap.get(apiToken));
    }

    public static void sendToAll(String message) {
        channelGroup.writeAndFlush(new TextWebSocketFrame(message));
    }

    //给每个人发送消息,除发消息人外
    private void SendAllExceptMy(String apiToken, String msg) {
        Channel myChannel = channelGroup.find(ChannelMap.get(apiToken));
        if(null != myChannel){
            for(Channel channel:channelGroup){
                if(!channel.id().asLongText().equals(myChannel.id().asLongText())){
                    channel.writeAndFlush(new TextWebSocketFrame(msg));
                }
            }
        }
    }

    public static void sendToSimple(String apiToken, String message) {
        channelGroup.find(ChannelMap.get(apiToken)).writeAndFlush(new TextWebSocketFrame(message));
    }


}

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * TextWebSocketFrame是netty用于处理websocket发来的文本对象
 */
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().id() + "与客户端建立连接,通道开启!");
        //添加到channelGroup通道组
        MyChannelHandler.channelGroup.add(ctx.channel());
    }

    /**
     * 要想实现客户端感知服务端的存活情况,需要进行双向的心跳;
     * Netty中的channelInactive()方法是通过Socket连接关闭时挥手数据包触发的,
     * 因此可以通过channelInactive()方法感知正常的下线情况,但是因为网络异常等非正常下线则无法感知;
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().id() + "与客户端断开连接,通道关闭!");
        //添加到channelGroup 通道组
        MyChannelHandler.channelGroup.remove(ctx.channel());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = ctx.channel();
        ChannelId id = channel.id();
        //首次连接是FullHttpRequest,处理参数
        if (msg instanceof FullHttpRequest) {
            FullHttpRequest request = (FullHttpRequest) msg;
            String uri = request.uri();
            Map paramMap = getUrlParams(uri);
            System.out.println("接收到的参数是:" + paramMap);
            //如果url包含参数,需要处理
            if (uri.contains("?")) {
                String newUri = uri.substring(0, uri.indexOf("?"));
                System.out.println(newUri);
                request.setUri(newUri);
            }
        }
        if (msg instanceof TextWebSocketFrame) {
            //正常的TEXT消息类型
            TextWebSocketFrame frame = (TextWebSocketFrame) msg;
            System.out.println(new Date() + "客户端收到服务器数据:" + frame.text());
            MyChannelHandler.sendToAll(frame.text());
        }
        super.channelRead(ctx, msg);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.out.println("异常发生了...");
        cause.printStackTrace();
        ctx.close();
    }


    private static Map getUrlParams(String url) {
        Map<String, String> map = new HashMap<>();
        url = url.replace("?", ";");
        if (!url.contains(";")) {
            return map;
        }
        if (url.split(";").length > 0) {
            String[] arr = url.split(";")[1].split("&");
            for (String s : arr) {
                String key = s.split("=")[0];
                String value = s.split("=")[1];
                map.put(key, value);
            }
            return map;

        } else {
            return map;
        }
    }


}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

import java.util.Date;

/**
 * 检查客户端心跳机制
 * IdleStateHandler心跳检测主要是通过向线程任务队列中添加定时任务,判断channelRead()方法或write()方法是否调用空闲超时,如果超时则触发超时事件执行自定义userEventTrigger()方法;
 *
 * Netty通过IdleStateHandler实现最常见的心跳机制不是一种双向心跳的PING-PONG模式,而是客户端发送心跳数据包,服务端接收心跳但不回复,
 * 因为如果服务端同时有上千个连接,心跳的回复需要消耗大量网络资源;如果服务端一段时间内没有收到客户端的心跳数据包则认为客户端已经下线,
 * 将通道关闭避免资源的浪费;在这种心跳模式下服务端可以感知客户端的存活情况,无论是宕机的正常下线还是网络问题的非正常下线,
 * 服务端都能感知到,而客户端不能感知到服务端的非正常下线;
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj){
        if (obj instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)obj;
            if (event.state()== IdleState.READER_IDLE){
                System.out.println(ctx.channel().id() +"客户端读超时" + new Date());
                MyChannelHandler.removeChannel(ctx.channel());
            }else if (event.state()== IdleState.WRITER_IDLE){
                System.out.println(ctx.channel().id() +"客户端写超时" + new Date());
            }else if (event.state()==IdleState.ALL_IDLE){
                System.out.println(ctx.channel().id() +"客户端所有操作超时");
            }
        }
    }

}


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Date;

@Component
public class NettyServer {

    @Value("${server.port:8080}")
    private Integer port;

    @PostConstruct
    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();// 主线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();//创建从线程组,处理主线程组分配下来的io操作
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.option(ChannelOption.SO_BACKLOG, 1024);// 存放已完成三次握手的请求的等待队列
            //  要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法;
            //  如果要减少发送次数,就设置为false,会累积一定大小后再发送
            sb.option(ChannelOption.TCP_NODELAY,true);
            sb.group(group, bossGroup) // 绑定线程池
                    .channel(NioServerSocketChannel.class) // 指定使用的channel
                    .localAddress(this.port)// 绑定监听端口
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
                        @Override
                        protected void initChannel(SocketChannel ch){
                            System.out.println("收到新连接"+ new Date());
                            //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
                            ch.pipeline().addLast(new HttpServerCodec());
                            //以块的方式来写的处理器
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                             /*
                            说明
                                1. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
                                2. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
                             */
                            ch.pipeline().addLast(new HttpObjectAggregator(8192));
                            // 对客户端,如果在40秒内没有向服务端发送心跳,就主动断开
                            // 前三个的参数解释如下:
                            //1)readerIdleTime:为读超时时间(即服务端一定时间内未接收到客户端消息的时间,服务端一段时间内没有数据读取)
                            //2)writerIdleTime:为写超时时间(即服务端一定时间内未向客户端发送消息的时间,服务端一段时间内没有数据发送)
                            //3)allIdleTime:所有类型的超时时间(以上两种满足其中一个即可)
                            ch.pipeline().addLast(new IdleStateHandler(40,0,0));
                            ch.pipeline().addLast(new HeartBeatHandler());
                            ch.pipeline().addLast(new MyWebSocketHandler());
                             /*
                            说明
                                1. 对应websocket ,它的数据是以 帧(frame) 形式传递
                                2. 可以看到WebSocketFrame 下面有六个子类
                                3. 浏览器请求时 ws://localhost:8888/hello 表示请求的uri
                                4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
                                5. 是通过一个 状态码 101
                             */
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));

                        }
                    });
            ChannelFuture cf = sb.bind().sync(); // 启动server 服务器异步创建绑定
            cf.channel().closeFuture().sync(); // 监听服务器关闭channel通道
            if (cf.isSuccess()) {
                System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
            }
        } finally {
            System.out.println("释放线程池资源");
            group.shutdownGracefully().sync(); // 释放线程池资源
            bossGroup.shutdownGracefully().sync();
        }
    }


}

测试地址

链接: 在线websocket测试网站.
输入 ws://127.0.0.1:8080/ws 连接即可

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-08-16 12:04:52  更:2021-08-16 12:06:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 19:40:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码