IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 《Netty权威指南》(三)WebSocket协议开发 -> 正文阅读

[网络协议]《Netty权威指南》(三)WebSocket协议开发

3.2 WebSocket协议开发

3.2.1 Http协议的弊端

  • ( 1)HTTP协议为半双工协议。半双工协议指数据可以在客户端和服务端两个方向上传输,但是不能同时传输。它意味着在同一时刻,只有一个方向上的数据传送,。
  • (2)HTTP消息冗长而繁琐。HTTP 消息包含消息头、消息体、换行符等,通常情况下采用文本方式传输,相比于其他的二进制通信协议,冗长而繁琐。
  • (3)针对服务器推送的黑客攻击。例如长时间轮询。
  • 现在,很多网站为了实现消息推送,所用的技术都是轮询。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP request,然后由服务器返回最新的数据给客户端浏览器。这种传统的模式具有很明显的缺点,即浏览器需要不断地向服务器发出请求,然而 HTTP request 的 header是非常冗长的,里面包含的可用数据比例可能非常低,这会占用很多的带宽和服务器资源。

3.2.2 WebSocket基本知识

  • webSocket是 HTMLS开始提供的一种浏览器与服务器间进行全双工通信的网络技术,WebSocket通信协议于2011年被IETF定为标准RFC6455,WebSocket API被 W3C定为标准。
  • 在 WebSocket API中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道,两者就可以直接互相传送数据了。WebSocket基于TCP双向全双工进行消息传递,在同一时刻,既可以发送消息,也可以接收消息,相比于HTTP的半双工协议,性能得到很大提升。

image.png

3.2.3 基本过程

image.png

  • 建立WebSocket连接时,需要通过客户端或者浏览器发出握手请求。请求格式如下:
GET /ws HTTP/1.1
Host: localhost:8002
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36 Edg/99.0.1150.30
Upgrade: websocket
Origin: http://localhost:8001
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,zh-TW;q=0.5
Sec-WebSocket-Key: spyQjkK99Jt/y+JGqq0ljA==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Content-Length: 0
  • 为了建立一个 WebSocket连接,客户端浏览器首先要向服务器发起一个HTTP请求,这个请求和通常的HTTP请求不同,包含了一些附加头信息,其中附加头信息Upgrade:WebSocket表明这是一个申请协议升级的HTTP请求。服务器端解析这些附加的头信息,然后生成应答信息返回给客户端,客户端和服务器端的WebSocket连接就建立起来了,双方可以通过这个连接通道自由地传递信息,并且这个连接会持续存在直到客户端或者服务器端的某一方主动关闭连接。
  • 请求消息中的Sec-WebSocket-Key是随机的,服务器端会用这些数据来构造出一个SHA-1的信息摘要,把Sec-WebSocket-Key加上一个魔幻字符串258EAFA5-E914-47DA-95CA-CSABODC85B11。使用SHA-1加密,然后进行BASE-64编码,将结果做为Sec-WebSocket-Accept头的值,返回给客户端。

image.png

3.2.4 WebSocket开发聊天室

效果图:
image.png
规定前后端数据交换格式:定义消息格式 [指令][时间戳][username][头像][消息内容]
image.png
定义消息实体类

package com.shu.Protocol;

import org.msgpack.annotation.Message;


/**
 * @Author shu
 * @Date: 2022/03/03/ 20:05
 * @Description 自定义消息实体类 自
 * 定义消息格式 [指令][时间戳][username][头像][消息内容]
 **/
@Message
public class IMMessage{
	private String addr;		//IP地址及端口
	private String cmd;		//命令类型[LOGIN]或者[SYSTEM]或者[LOGOUT]
	private long time;		//命令发送时间
	private int online;		//当前在线人数
	private String sender;  //发送人
	private String headPic;
	private String receiver;	//接收人
	private String content;	//消息内容

	public IMMessage(){}

	public IMMessage(String cmd,long time,int online,String content){
		this.cmd = cmd;
		this.time = time;
		this.online = online;
		this.content = content;
	}

	public IMMessage(String cmd,long time,String sender){
		this.cmd = cmd;
		this.time = time;
		this.sender = sender;
	}
	public IMMessage(String cmd,long time,String sender,String content){
		this.cmd = cmd;
		this.time = time;
		this.sender = sender;
		this.content = content;
	}

	public IMMessage(String cmd,long time,String sender,String content,String headPic){
		this.cmd = cmd;
		this.time = time;
		this.sender = sender;
		this.content = content;
		this.headPic = headPic;
	}
	public String getCmd() {
		return cmd;
	}

	public void setCmd(String cmd) {
		this.cmd = cmd;
	}

	public long getTime() {
		return time;
	}
	public void setTime(long time) {
		this.time = time;
	}
	public int getOnline() {
		return online;
	}
	public void setOnline(int online) {
		this.online = online;
	}

	public String getSender() {
		return sender;
	}
	public void setSender(String sender) {
		this.sender = sender;
	}
	public String getReceiver() {
		return receiver;
	}
	public void setReceiver(String receiver) {
		this.receiver = receiver;
	}
	public String getContent() {
		return content;
	}
	public void setContent(String content) {
		this.content = content;
	}

	public String getAddr() {
		return addr;
	}
	public void setAddr(String addr) {
		this.addr = addr;
	}

	public String getHeadPic() {
		return headPic;
	}

	public void setHeadPic(String headPic) {
		this.headPic = headPic;
	}


	@Override
	public String toString() {
		return "IMMessage{" +
				"addr='" + addr + '\'' +
				", cmd='" + cmd + '\'' +
				", time=" + time +
				", online=" + online +
				", sender='" + sender + '\'' +
				", headPic='" + headPic + '\'' +
				", receiver='" + receiver + '\'' +
				", content='" + content + '\'' +
				'}';
	}
}

定义命令枚举类

package com.shu.Protocol;


/**
 * @Author shu
 * @Date: 2022/03/03/ 20:05
 * @Description 自定义IM协议,Instant Messaging Protocol即时通信协议
 *  *
 **/
public enum IMP {
	/** 系统消息 */
	SYSTEM("SYSTEM"),
	/** 登录指令 */
	LOGIN("LOGIN"),
	/** 登出指令 */
	LOGOUT("LOGOUT"),
	/** 聊天消息 */
	CHAT("CHAT"),
	/** 送鲜花 */
	FLOWER("FLOWER");


	private String name;


	public static boolean isIMP(String content){
		return content.matches("^\\[(SYSTEM|LOGIN|LOGOUT|CHAT|FLOWER)\\]");
	}
	
	IMP(String name){
		this.name = name;
	}
	
	public String getName(){
		return this.name;
	}
	
	public String toString(){
		return this.name;
	}

}

消息解码器

package com.shu.Protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.msgpack.MessagePack;
import org.msgpack.MessageTypeException;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
 * @Author shu
 * @Date: 2022/03/03/ 20:05
 * @Description 自定义IM协议的解码器
 **/
public class IMDecoder extends ByteToMessageDecoder {

	//解析IM写一下请求内容的正则
	private Pattern pattern = Pattern.compile("^\\[(.*)\\](\\s\\-\\s(.*))?");

	/**
	 * 解码
	 * @param ctx
	 * @param in
	 * @param out
	 * @throws Exception
	 */
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in,List<Object> out) throws Exception {
		try{
			//先获取可读字节数
	        final int length = in.readableBytes();
	        final byte[] array = new byte[length];
	        String content = new String(array,in.readerIndex(),length);
	        //空消息不解析
	        if(!(null == content || "".equals(content.trim()))){
	        	if(!IMP.isIMP(content)){
	        		ctx.channel().pipeline().remove(this);
	        		return;
	        	}
	        }

	        in.getBytes(in.readerIndex(), array, 0, length);
	        out.add(new MessagePack().read(array,IMMessage.class));
	        in.clear();
		}catch(MessageTypeException e){
			ctx.channel().pipeline().remove(this);
		}
	}


	/**
	 * 字符串解析成自定义即时通信协议
	 * @param msg
	 * @return
	 */
	public IMMessage decode(String msg){
		if(null == msg || "".equals(msg.trim())){ return null; }
		try{
			Matcher m = pattern.matcher(msg);
			String header = "";
			String content = "";
			if(m.matches()){
				header = m.group(1);
				content = m.group(3);
			}

			String [] heards = header.split("\\]\\[");
			long time = 0;
			try{ time = Long.parseLong(heards[1]); } catch(Exception e){}
			String username = heards[2];
			//昵称最多十个字
			username = username.length() < 10 ? username : username.substring(0, 9);

			if(msg.startsWith("[" + IMP.LOGIN.getName() + "]")){
				return new IMMessage(heards[0],time,username);
			}else if(msg.startsWith("[" + IMP.CHAT.getName() + "]")){
				String headPic = heards[3];
				return new IMMessage(heards[0],time,username,content,headPic);
			}else if(msg.startsWith("[" + IMP.FLOWER.getName() + "]")){
				return new IMMessage(heards[0],time,username);
			}else{
				return null;
			}
		}catch(Exception e){
			e.printStackTrace();
			return null;
		}
	}
}

消息编码器

package com.shu.Protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;
import org.msgpack.MessagePack;

/**
 * @Author shu
 * @Date: 2022/03/03/ 20:05
 * @Description 自定义IM协议的编码器
 **/
@Slf4j
public class IMEncoder extends MessageToByteEncoder<IMMessage> {

	/**
	 *
	 * 编码
	 * @param ctx
	 * @param msg
	 * @param out
	 * @throws Exception
	 */
	@Override
	protected void encode(ChannelHandlerContext ctx, IMMessage msg, ByteBuf out)
			throws Exception {
		out.writeBytes(new MessagePack().write(msg));
	}

	public String encode(IMMessage msg){
		if(null == msg){ return ""; }
		String prex = "[" + msg.getCmd() + "]" + "[" + msg.getTime() + "]";
		if(IMP.LOGIN.getName().equals(msg.getCmd()) ||
		   IMP.FLOWER.getName().equals(msg.getCmd())){
			prex += ("[" + msg.getSender() + "]");
		}else if(IMP.CHAT.getName().equals(msg.getCmd()) ){
			prex += ("[" + msg.getSender() + "]["+msg.getHeadPic()+"]");
		}
		else if(IMP.SYSTEM.getName().equals(msg.getCmd())){
			prex += ("[" + msg.getOnline() + "]");
		}
		if(!(null == msg.getContent() || "".equals(msg.getContent()))){
			prex += (" - " + msg.getContent());
		}
		log.info("编码消息"+prex);
		return prex;
	}
}

消息处理中心

package com.shu.Process;
import com.alibaba.fastjson.JSONObject;
import com.shu.Protocol.IMDecoder;
import com.shu.Protocol.IMEncoder;
import com.shu.Protocol.IMMessage;
import com.shu.Protocol.IMP;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;


/**
 * @Author shu
 * @Date: 2022/03/03/ 20:05
 * @Description 消息处理
 **/
@Slf4j
public class MsgProcessor {

    // ChannelGroup是netty提供用于管理web于服务器建立的通道channel的,其本质是一个高度封装的set集合,
    // 在服务器广播消息时,可以直接通过它的writeAndFlush将消息发送给集合中的所有通道中去。
    private static ChannelGroup onlineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private IMDecoder decoder = new IMDecoder();
    private IMEncoder encoder = new IMEncoder();

    //channel自定义属性
    private final AttributeKey<String> USERNAME = AttributeKey.valueOf("username");
    private final AttributeKey<String> HEAD_PIC = AttributeKey.valueOf("headPic");
    private final AttributeKey<String> IP_ADDR = AttributeKey.valueOf("ipAddr");
    private final AttributeKey<JSONObject> ATTRS = AttributeKey.valueOf("attrs");

    public void process(Channel client,String msg){
        //将字符串解析为自定义格式
        IMMessage request = decoder.decode(msg);

        log.info("解码消息"+String.valueOf(request));

        if(null == request){return;}
        //获取消息发送者
        String username = request.getSender();
        //判断如果是登录动作,就往onlineUsers中加入一条数据
        if(IMP.LOGIN.getName().equals(request.getCmd())){
            client.attr(IP_ADDR).getAndSet("");
            client.attr(USERNAME).getAndSet(request.getSender());
            client.attr(HEAD_PIC).getAndSet(request.getHeadPic());
            onlineUsers.add(client);
            //像所有用户发送系统消息
            for (Channel channel : onlineUsers) {//向其他人发送消息
                if (channel != client) {
                    //自定义系统消息格式 [system][时间戳][用户数量][消息内容]
                    request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), username + " 加入聊天室!");
                }
                //向自己发送消息
                else {
                    request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), username + " 欢迎进入聊天室!");

                }
                //自定义IM协议解码
                String text = encoder.encode(request);
                //发送消息
                channel.writeAndFlush(new TextWebSocketFrame(text));
            }

        }
        //如果是登出
        else if(IMP.LOGOUT.getName().equals(request.getCmd())){
            logout(client);
        }
        //如果是聊天信息
        else if(IMP.CHAT.getName().equals(request.getCmd())){

            for (Channel channel : onlineUsers) {//向其他人发送消息
                if (channel != client) {
                    request.setSender(username);
                }
                //向自己发送消息
                else {
                    request.setSender("MY_SELF");

                }
                //自定义IM协议解码
                String text = encoder.encode(request);
                //发送消息
                channel.writeAndFlush(new TextWebSocketFrame(text));
            }
        }
        //如果是鲜花
        else if (IMP.FLOWER.getName().equals(request.getCmd())){
            JSONObject attrs = getAttrs(client);
            long currTime = sysTime();
            if(null != attrs){
                long lastTime = attrs.getLongValue("lastFlowerTime");
                //60秒之内不允许重复刷鲜花
                int seconds = 10;
                long sub = currTime - lastTime;
                if(sub < 1000 * seconds){
                    request.setSender("MY_SELF");
                    request.setCmd(IMP.SYSTEM.getName());
                    request.setContent("您送鲜花太频繁," + (seconds - Math.round(sub / 1000)) + "秒后再试");
                    String content = encoder.encode(request);
                    client.writeAndFlush(new TextWebSocketFrame(content));
                    return;
                }
            }

            //正常送花
            for (Channel channel : onlineUsers) {
                if (channel == client) {
                    request.setSender("MY_SELF");
                    request.setContent("你给大家送了一波鲜花雨");
                    setAttrs(client, "lastFlowerTime", currTime);
                }else{
                    request.setSender(getNickName(client));
                    request.setContent(getNickName(client) + "送来一波鲜花雨");
                }
                request.setTime(sysTime());

                String content = encoder.encode(request);
                channel.writeAndFlush(new TextWebSocketFrame(content));
            }
        }



    }


    /**
     * 获取用户昵称
     * @param client
     * @return
     */
    public String getNickName(Channel client){
        return client.attr(USERNAME).get();
    }
    /**
     * 获取用户远程IP地址
     * @param client
     * @return
     */
    public String getAddress(Channel client){
        return client.remoteAddress().toString().replaceFirst("/","");
    }

    /**
     * 获取扩展属性
     * @param client
     * @return
     */
    public JSONObject getAttrs(Channel client){
        try{
            return client.attr(ATTRS).get();
        }catch(Exception e){
            return null;
        }
    }

    /**
     * 获取扩展属性
     * @param client
     * @return
     */
    private void setAttrs(Channel client,String key,Object value){
        try{
            JSONObject json = client.attr(ATTRS).get();
            json.put(key, value);
            client.attr(ATTRS).set(json);
        }catch(Exception e){
            JSONObject json = new JSONObject();
            json.put(key, value);
            client.attr(ATTRS).set(json);
        }
    }

    
    /**
     * 登出通知
     * @param client
     * @return
     */
    public void logout(Channel client) {
        IMMessage request = new IMMessage();
        request.setSender(client.attr(USERNAME).get());
        request.setCmd(IMP.SYSTEM.getName());
        request.setOnline(onlineUsers.size());
        request.setContent(request.getSender()+" 退出聊天室!");
        //向所有用户发送系统消息
        for (Channel channel : onlineUsers) {//向其他人发送消息
            if (channel != client) {
                //自定义IM协议解码
                String text = encoder.encode(request);
                //发送消息
                channel.writeAndFlush(new TextWebSocketFrame(text));
            }
        }
        onlineUsers.remove(client);
    }

    private long sysTime(){
        return System.currentTimeMillis();
    }

}

server启动器

package com.shu.Server;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;


/**
 * @Author shu
 * @Date: 2022/03/03/ 20:05
 * @Description 服务端启动
 **/
@Component
public class CharServer {
    private static final Logger logger = LoggerFactory.getLogger(CharServer.class);
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    EventLoopGroup boss = new NioEventLoopGroup();
    EventLoopGroup work = new NioEventLoopGroup();
    ChannelFuture future = null;


    @PreDestroy
    public void stop(){
        if(future!=null){
            future.channel().close().addListener(ChannelFutureListener.CLOSE);
            future.awaitUninterruptibly();
            boss.shutdownGracefully();
            work.shutdownGracefully();
            future=null;
            logger.info(" 服务关闭 ");
        }
    }


    public void start(){
        logger.info("nettyServer 正在启动");
        int port = 8002;
        serverBootstrap.group(boss,work)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,1024)
                .childHandler(new ChatNettyServerInitializer());
        logger.info("netty服务器在["+port+"]端口启动监听");
        try{
            future = serverBootstrap.bind(port).sync();
            if(future.isSuccess()){
                logger.info("nettyServer 完成启动 ");
            }
            // 等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        }catch (Exception e){
            logger.info("[出现异常释放资源,{%s}]",e);
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}
package com.shu.Server;

import com.shu.Handle.WebSocketHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/**
 * @Author shu
 * @Date: 2022/03/03/ 20:27
 * @Description  初始化
 **/
public class ChatNettyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 增加日志信息
        socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
        // 心跳监测
        socketChannel.pipeline().addLast(new IdleStateHandler(0,0,5, TimeUnit.MINUTES));
        // Http消息编码解码
        socketChannel.pipeline().addLast(new HttpServerCodec());
        // 消息组装
        socketChannel.pipeline().addLast(new HttpObjectAggregator(65536));
        // WebSocket通信支持
        socketChannel.pipeline().addLast(new ChunkedWriteHandler());
        // 自定义处理逻辑
        socketChannel.pipeline().addLast(new WebSocketHandler());
    }
}

自定义处理器

package com.shu.Handle;

import com.shu.Process.MsgProcessor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;


@ChannelHandler.Sharable
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketServerHandshaker handShaker;
    private ChannelHandlerContext ctx;
    private MsgProcessor process = new MsgProcessor();

    /**
     * 收到消息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("收到消息:"+msg);
        // 传统的HTTP接入
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        }
        // WebSocket接入
        else if (msg instanceof WebSocketFrame) {
            handleWebSocket(ctx, (WebSocketFrame) msg);
        }
    }


    /**
     * 处理Http请求,完成WebSocket握手
     * 注意:WebSocket连接第一次请求使用的是Http
     * @param ctx
     * @param request
     * @throws Exception
     */
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        // 如果HTTP解码失败,返回HHTP异常
        if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }

        // 正常WebSocket的Http连接请求,构造握手响应返回
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false);
        handShaker = wsFactory.newHandshaker(request);
        if (handShaker == null) { // 无法处理的websocket版本
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
            // 向客户端发送websocket握手,完成握手
            handShaker.handshake(ctx.channel(), request);
            // 记录管道处理上下文,便于服务器推送数据到客户端
            this.ctx = ctx;
            log.info("websocket 建立成功!");
        }
    }

    /**
     * Http返回
     * @param ctx
     * @param request
     * @param response
     */
    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) {
        // 返回应答给客户端
        if (response.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8);
            response.content().writeBytes(buf);
            buf.release();
            HttpHeaders.setContentLength(response, response.content().readableBytes());
        }

        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(response);
        if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }



    /**
     * 处理Socket请求
     * @param ctx
     * @param frame
     * @throws Exception
     */
    private void handleWebSocket(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        // 判断是否是关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            // 结束握手操作
            handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判断是否是Ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 当前只支持文本消息,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
        }
        String msg =((TextWebSocketFrame)frame).text();
        // 处理来自客户端的WebSocket请求
        log.info("来自客服端信息:"+msg);
        //保存当前用户
        process.process(ctx.channel(),msg);
    }

    /**
     * 退出
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        process.logout(ctx.channel());
    }



    /**
     * 如果5分钟没有读请求,则向客户端发送心跳
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (IdleState.READER_IDLE.equals((event.state()))) {
                ctx.writeAndFlush("heartbeat").addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
            }
        }
        super.userEventTriggered(ctx, evt);
    }
}

启动类

package com.shu;

import com.shu.Server.CharServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;


@SpringBootApplication(exclude={DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class})
public class NettyChatApplication {
    public static void main(String[] args) {
        SpringApplication.run(NettyChatApplication.class, args);
        new CharServer().start();
    }
}

观察结果

  • 启动服务完成服务器器的启动

image.png

  • 发送WebSocket请求建立握手请求

image.png

  • 服务器发送附加头信息Upgrade:WebSocket与Sec-WebSocket-Key返回前端

image.png

  • 前端发送数据,后端收到数据解码

image.png

  • 服务关闭消息

image.png

3.2.5 过程分析

  • 第一次握手请求消息由 HTTP协议承载,所以它是一个HTTP消息,执行 handlclIttpRequcst方法来处理WcbSocket握手请求。
		// 传统的HTTP接入
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        }
        // WebSocket接入
        else if (msg instanceof WebSocketFrame) {
            handleWebSocket(ctx, (WebSocketFrame) msg);
        }
  • 首先对握手请求消息进行判断,如果消息头中没有包含Upgrade字段或者它的值不是 websocket,则返回HTTP 400响应。
		// 如果HTTP解码失败,返回HHTP异常
        if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
  • 握手请求简单校验通过之后,开始构造握手工厂,创建握手处理WebSocketServerHandshaker,通过它构造握手响应消息返回给客户端。
        // 正常WebSocket的Http连接请求,构造握手响应返回
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false);
        handShaker = wsFactory.newHandshaker(request);
        if (handShaker == null) { // 无法处理的websocket版本
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
            // 向客户端发送websocket握手,完成握手
            handShaker.handshake(ctx.channel(), request);
            // 记录管道处理上下文,便于服务器推送数据到客户端
            this.ctx = ctx;
            log.info("websocket 建立成功!");
        }
  • 此时表名WebSocket连接已建立,将进入WebSocket处理逻辑
/**
     * 处理Socket请求
     * @param ctx
     * @param frame
     * @throws Exception
     */
    private void handleWebSocket(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        // 判断是否是关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            // 结束握手操作
            handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判断是否是Ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 当前只支持文本消息,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
        }
        String msg =((TextWebSocketFrame)frame).text();
        // 处理来自客户端的WebSocket请求
        log.info("来自客服端信息:"+msg);
        //保存当前用户
        process.process(ctx.channel(),msg);
    }
  • 利用消息中心对消息进行编解码处理,并返回给客服端
package com.shu.Process;
import com.alibaba.fastjson.JSONObject;
import com.shu.Protocol.IMDecoder;
import com.shu.Protocol.IMEncoder;
import com.shu.Protocol.IMMessage;
import com.shu.Protocol.IMP;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;


/**
 * @Author shu
 * @Date: 2022/03/03/ 20:05
 * @Description 消息处理
 **/
@Slf4j
public class MsgProcessor {

    // ChannelGroup是netty提供用于管理web于服务器建立的通道channel的,其本质是一个高度封装的set集合,
    // 在服务器广播消息时,可以直接通过它的writeAndFlush将消息发送给集合中的所有通道中去。
    private static ChannelGroup onlineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private IMDecoder decoder = new IMDecoder();
    private IMEncoder encoder = new IMEncoder();

    //channel自定义属性
    private final AttributeKey<String> USERNAME = AttributeKey.valueOf("username");
    private final AttributeKey<String> HEAD_PIC = AttributeKey.valueOf("headPic");
    private final AttributeKey<String> IP_ADDR = AttributeKey.valueOf("ipAddr");
    private final AttributeKey<JSONObject> ATTRS = AttributeKey.valueOf("attrs");

    public void process(Channel client,String msg){
        //将字符串解析为自定义格式
        IMMessage request = decoder.decode(msg);

        log.info("解码消息"+String.valueOf(request));

        if(null == request){return;}
        //获取消息发送者
        String username = request.getSender();
        //判断如果是登录动作,就往onlineUsers中加入一条数据
        if(IMP.LOGIN.getName().equals(request.getCmd())){
            client.attr(IP_ADDR).getAndSet("");
            client.attr(USERNAME).getAndSet(request.getSender());
            client.attr(HEAD_PIC).getAndSet(request.getHeadPic());
            onlineUsers.add(client);
            //像所有用户发送系统消息
            for (Channel channel : onlineUsers) {//向其他人发送消息
                if (channel != client) {
                    //自定义系统消息格式 [system][时间戳][用户数量][消息内容]
                    request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), username + " 加入聊天室!");
                }
                //向自己发送消息
                else {
                    request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), username + " 欢迎进入聊天室!");

                }
                //自定义IM协议解码
                String text = encoder.encode(request);
                //发送消息
                channel.writeAndFlush(new TextWebSocketFrame(text));
            }

        }
        //如果是登出
        else if(IMP.LOGOUT.getName().equals(request.getCmd())){
            logout(client);
        }
        //如果是聊天信息
        else if(IMP.CHAT.getName().equals(request.getCmd())){

            for (Channel channel : onlineUsers) {//向其他人发送消息
                if (channel != client) {
                    request.setSender(username);
                }
                //向自己发送消息
                else {
                    request.setSender("MY_SELF");

                }
                //自定义IM协议解码
                String text = encoder.encode(request);
                //发送消息
                channel.writeAndFlush(new TextWebSocketFrame(text));
            }
        }
        //如果是鲜花
        else if (IMP.FLOWER.getName().equals(request.getCmd())){
            JSONObject attrs = getAttrs(client);
            long currTime = sysTime();
            if(null != attrs){
                long lastTime = attrs.getLongValue("lastFlowerTime");
                //60秒之内不允许重复刷鲜花
                int seconds = 10;
                long sub = currTime - lastTime;
                if(sub < 1000 * seconds){
                    request.setSender("MY_SELF");
                    request.setCmd(IMP.SYSTEM.getName());
                    request.setContent("您送鲜花太频繁," + (seconds - Math.round(sub / 1000)) + "秒后再试");
                    String content = encoder.encode(request);
                    client.writeAndFlush(new TextWebSocketFrame(content));
                    return;
                }
            }

            //正常送花
            for (Channel channel : onlineUsers) {
                if (channel == client) {
                    request.setSender("MY_SELF");
                    request.setContent("你给大家送了一波鲜花雨");
                    setAttrs(client, "lastFlowerTime", currTime);
                }else{
                    request.setSender(getNickName(client));
                    request.setContent(getNickName(client) + "送来一波鲜花雨");
                }
                request.setTime(sysTime());

                String content = encoder.encode(request);
                channel.writeAndFlush(new TextWebSocketFrame(content));
            }
        }



    }


    /**
     * 获取用户昵称
     * @param client
     * @return
     */
    public String getNickName(Channel client){
        return client.attr(USERNAME).get();
    }
    /**
     * 获取用户远程IP地址
     * @param client
     * @return
     */
    public String getAddress(Channel client){
        return client.remoteAddress().toString().replaceFirst("/","");
    }

    /**
     * 获取扩展属性
     * @param client
     * @return
     */
    public JSONObject getAttrs(Channel client){
        try{
            return client.attr(ATTRS).get();
        }catch(Exception e){
            return null;
        }
    }

    /**
     * 获取扩展属性
     * @param client
     * @return
     */
    private void setAttrs(Channel client,String key,Object value){
        try{
            JSONObject json = client.attr(ATTRS).get();
            json.put(key, value);
            client.attr(ATTRS).set(json);
        }catch(Exception e){
            JSONObject json = new JSONObject();
            json.put(key, value);
            client.attr(ATTRS).set(json);
        }
    }

    
    /**
     * 登出通知
     * @param client
     * @return
     */
    public void logout(Channel client) {
        IMMessage request = new IMMessage();
        request.setSender(client.attr(USERNAME).get());
        request.setCmd(IMP.SYSTEM.getName());
        request.setOnline(onlineUsers.size());
        request.setContent(request.getSender()+" 退出聊天室!");
        //向所有用户发送系统消息
        for (Channel channel : onlineUsers) {//向其他人发送消息
            if (channel != client) {
                //自定义IM协议解码
                String text = encoder.encode(request);
                //发送消息
                channel.writeAndFlush(new TextWebSocketFrame(text));
            }
        }
        onlineUsers.remove(client);
    }

    private long sysTime(){
        return System.currentTimeMillis();
    }

}

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-03-10 22:59:52  更:2022-03-10 23:01:04 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/26 7:19:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码