IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Netty+Protobuf+WebSocket/Socket后台服务端的对channel的处理 -> 正文阅读

[网络协议]Netty+Protobuf+WebSocket/Socket后台服务端的对channel的处理

package com.lsbc.game.server.handler;

import com.google.protobuf.MessageLite;
import com.google.protobuf.MessageLiteOrBuilder;
import com.lsbc.game.pojo.constant.ParatrooperConstant;
import com.lsbc.game.protobuf.RequestParamMsg;
import com.lsbc.hornet.auth.SpringContextUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * 统一端口的处理器
 * <p>
 * 使用同一个端口去处理TCP/HTTP协议的请求,因为HTTP的底层协议也是TCP,因此可以在此处理器内部可以通过解析部分数据
 * 来判断请求是TCP请求还是HTTP请求,然使用动态的pipeline切换
 */
@Slf4j
public class PortUnificationServerHandler extends ByteToMessageDecoder {

    private NettyServerHandler nettyServerHandler= (NettyServerHandler)SpringContextUtils.getBean("nettyServerHandler");

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        // Will use the first five bytes to detect a protocol.
        if (byteBuf.readableBytes() < 5) {
            return;
        }
        final int magic1 = byteBuf.getUnsignedByte(byteBuf.readerIndex());
        final int magic2 = byteBuf.getUnsignedByte(byteBuf.readerIndex() + 1);

        // 判断是不是HTTP请求
        if (isHttp(magic1, magic2)) {
            log.info("this is a http msg");
            switchToHttp(channelHandlerContext);
        } else {
            log.info("this is a socket msg");
            switchToTcp(channelHandlerContext);
        }

    }

    /**
     * 跳转到http处理
     *
     * @param ctx
     */
    private void switchToHttp(ChannelHandlerContext ctx) {
        ChannelPipeline pipeline = ctx.pipeline();
        // HTTP请求的解码和编码
        pipeline.addLast(new HttpServerCodec());
        // 把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse,
        // 原因是HTTP解码器会在每个HTTP消息中生成多个消息对象HttpRequest/HttpResponse,HttpContent,LastHttpContent
        pipeline.addLast(new HttpObjectAggregator(65536));
        // 主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的; 增加之后就不用考虑这个问题了
        pipeline.addLast(new ChunkedWriteHandler());
        // WebSocket数据压缩
        pipeline.addLast(new WebSocketServerCompressionHandler());
        //设置路由;协议包长度限制 ;里面加入的WebSocketServerProtocolHandshakeHandler对路由下FullHttpRequest解码成WebSocketFrame
        pipeline.addLast(new WebSocketServerProtocolHandler("/", null, true));
        // 协议包解码
        //websocket消息帧处理看下面代码(这里需要把前台的消息分类,判断传过来的是websocket哪个帧,如果为二进制帧往下传值,让protobuf解码)
        pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() {
            @Override
            protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
                //文本帧处理(收到的消息广播到前台客户端)
                if (frame instanceof TextWebSocketFrame) {
                    log.info("文本帧消息:" + ((TextWebSocketFrame) frame).text());
                }
                //二进制帧处理,将帧的内容往下传
                else if (frame instanceof BinaryWebSocketFrame) {
                    BinaryWebSocketFrame binaryWebSocketFrame = (BinaryWebSocketFrame) frame;
                    byte[] by = new byte[frame.content().readableBytes()];
                    binaryWebSocketFrame.content().readBytes(by);
                    ByteBuf bytebuf = Unpooled.buffer();
                    bytebuf.writeBytes(by);
                    out.add(bytebuf);
                } else if (frame instanceof PingWebSocketFrame) {
                    log.info("其它帧消息" + frame.toString());
                }
            }
        });
        // 协议包编码
        pipeline.addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() {
            @Override
            protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
                ByteBuf result = null;
                if (msg instanceof MessageLite) {
                    result = Unpooled.wrappedBuffer(((MessageLite) msg).toByteArray());
                }
                if (msg instanceof MessageLite.Builder) {
                    result = Unpooled.wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
                }
                // ==== 上面代码片段是拷贝自TCP ProtobufEncoder 源码 ====
                // 然后下面再转成websocket二进制流,因为客户端不能直接解析protobuf编码生成的
                WebSocketFrame frame = new BinaryWebSocketFrame(result);
                out.add(frame);
            }
        });
        //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
        pipeline.addLast(new IdleStateHandler(ParatrooperConstant.NettyConstant.SERVER_READER_IDLE_TIME, ParatrooperConstant.NettyConstant.SERVER_WRITER_IDLE_TIME, ParatrooperConstant.NettyConstant.SERVER_ALL_IDLE_TIME, TimeUnit.SECONDS));
        // 解码和编码,应和客户端一致
        // 传输的协议 Protobuf
        /**
         * 前端用websocket通信,此处的ProtobufVarint32FrameDecoder和ProtobufVarint32LengthFieldPrepender对分包、粘包的处理反而影响数据的转换
         * 估计原因应是前面把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse导致
         * 如果不用websocket加上分包、粘包的处理就可以
         */
//        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        pipeline.addLast(new ProtobufDecoder(RequestParamMsg.RequestParam.getDefaultInstance()));
//        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(nettyServerHandler);
        pipeline.remove(this);
    }


    /**
     * 判断请求是否是HTTP请求
     *
     * @param magic1 报文第一个字节
     * @param magic2 报文第二个字节
     * @return
     */
    private boolean isHttp(int magic1, int magic2) {
        return magic1 == 'G' && magic2 == 'E' || // GET
                magic1 == 'P' && magic2 == 'O' || // POST
                magic1 == 'P' && magic2 == 'U' || // PUT
                magic1 == 'H' && magic2 == 'E' || // HEAD
                magic1 == 'O' && magic2 == 'P' || // OPTIONS
                magic1 == 'P' && magic2 == 'A' || // PATCH
                magic1 == 'D' && magic2 == 'E' || // DELETE
                magic1 == 'T' && magic2 == 'R' || // TRACE
                magic1 == 'C' && magic2 == 'O';   // CONNECT
    }

    /**
     * 跳转到http处理
     *
     * @param ctx
     */
    private void switchToTcp(ChannelHandlerContext ctx) {
        ChannelPipeline pipeline = ctx.pipeline();
        //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
        pipeline.addLast(new IdleStateHandler(ParatrooperConstant.NettyConstant.SERVER_READER_IDLE_TIME, ParatrooperConstant.NettyConstant.SERVER_WRITER_IDLE_TIME, ParatrooperConstant.NettyConstant.SERVER_ALL_IDLE_TIME, TimeUnit.SECONDS));
        // 解码和编码,应和客户端一致
        // 传输的协议 Protobuf
        /**
         * 前端用websocket通信,此处的ProtobufVarint32FrameDecoder和ProtobufVarint32LengthFieldPrepender对分包、粘包的处理反而影响数据的转换
         * 估计原因应是前面把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse导致
         * 如果不用websocket加上分包、粘包的处理就可以
         */
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        pipeline.addLast(new ProtobufDecoder(RequestParamMsg.RequestParam.getDefaultInstance()));
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(new ProtobufEncoder());
        pipeline.addLast(nettyServerHandler);
        pipeline.remove(this);
    }
}

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-07-10 14:49:19  更:2021-07-10 14:49:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 17:53:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码