2021SC@SDUSC
通信协议从广义上区分,可以分为公有协议和私有协议。由于私有协议的灵活性,它往往会在某个公司或者组织内部使用,按需定制,也因为如此,升级起来会非常方便,灵活性好。绝大多数的私有协议传输层都基于TCP/IP,所以利用Netty的NIO TCP协议栈可以非常方便地进行私有协议的定制和开发。
在经过前面三种通信协议的分析的过程中,我渐渐发现其实以前分析的一系列demo的代码,实现其实是基本类似的,本质就是完成服务端和客户端的通信,只是对其中的一些请求等进行不同的处理。
我们下面的博客,就主要开始研究Netty私有协议栈的开发。这一部分主要从三个方面展开研究:
- Netty协议栈功能设计
- Netty协议栈开发
- 运行协议栈
目录
一. Netty协议栈功能设计
1.1 Netty协议网络拓扑图
1.2 Netty自定义功能协议栈
1.3 Netty通信交互图??
二 . Netty协议栈开发
2.1 数据结构的定义
2.2? 消息编解码器
2.3 可靠性设计
小结
一. Netty协议栈功能设计
Netty协议栈用于内部各模块之间的通信,它基于TCP/IP协议栈,是一个类HTTP协议的应用层协议栈,相比于传统的标准协议栈,它更加轻巧、灵活和实用。
1.1 Netty协议网络拓扑图
在分布式环境下,每个netty节点之间建立长连接,使用netty协议进行通信。Netty节点并没用服务端和客户端之分,谁先发起请求,谁就是客户端,另一方就是服务端。
?
1.2 Netty自定义功能协议栈
Netty协议栈承载了业务内部各模块之间的消息交互和服务调用,它的主要功能如下:
- 基于Netty的NIO通信框架,提供高性能的异步通信能力
- 提供消息的编解码能力,可以实现消息实体的序列化和反序列化
- 提供基于IP的白名单接入认证能力
- 链路的有效性校验机制
- 链路的断连重连机制
1.3 Netty通信交互图 ?
- ?Netty 协议栈客户端发送握手请求消息,携带节点ID等有效身份认证信息;
- ?Netty协议栈服务端对握手请求消息进行合法性校验,包括节点ID有效性校验、节点重复登录校验和IP地址合法性校验,校验通过后,返回登录成功的握手应答消息;
- 链路建立成功之后,客户端发送业务消息;
- 链路成功之后,服务端发送心跳消息;
- 链路建立成功之后,客户端发送心跳消息;
- 链路建立成功之后,服务端发送业务消息;
- 服务端退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接。
二 . Netty协议栈开发
2.1 数据结构的定义
Netty的协议栈使用的消息的数据结构包括如下两部分:
?
代码:
Header:
public class Header {
private int crcCode = 0xadaf0105; // 唯一的通信标志
private int length; // 总消息的长度 header + body
private long sessionID; // 会话ID
private byte type; // 消息的类型
private byte priority; // 消息的优先级 0~255
private Map<String, Object> attachment = new HashMap<String, Object>(); // 附件
// ...
}
NettyMessage:
public class NettyMessage {
private Header header;
private Object body;
public final Header getHeader() {
return header;
}
public final void setHeader(Header header) {
this.header = header;
}
public final Object getBody() {
return body;
}
public final void setBody(Object body) {
this.body = body;
}
/*
* (non-Javadoc)
*
* @see java.lang.Object#toString()
*/
public String toString() {
return "NettyMessage [header=" + header + "]";
}
}
2.2? 消息编解码器
编解码主要选择Marshaller作为Java对象序列化和反序列化的工具
编码器处理流程图:
解码器处理流程图:
因为我们组有同学专门分析编解码器,因此我在这里分析主要的编解码器的处理流程。这里涉及两个核心的类:NettyMessageEncoder 和 NettyMessageDecoder。
NettyMessageEncoder:
首先判断Message是否为空:
@Override
protected void encode(ChannelHandlerContext ctx, NettyMessage message, ByteBuf sendBuf) throws Exception {
if(message == null || message.getHeader() == null){
throw new Exception("编码失败,没有数据信息!");
}
然后从Message中提取出头信息:
//Head:
Header header = message.getHeader();
sendBuf.writeInt(header.getCrcCode());//校验码
sendBuf.writeInt(header.getLength());//总长度
sendBuf.writeLong(header.getSessionID());//会话id
sendBuf.writeByte(header.getType());//消息类型
sendBuf.writeByte(header.getPriority());//优先级
对于附件信息要单独编码:
marshallingEncoder.encode(value, sendBuf)是进行核心编码的方法。
/对附件信息进行编码
//编码规则为:如果attachment的长度为0,表示没有可选附件,则将长度 编码设置为0
//如果attachment长度大于0,则需要编码,规则:
//首先对附件的个数进行编码
sendBuf.writeInt((header.getAttachment().size())); //附件大小
String key = null;
byte[] keyArray = null;
Object value = null;
//然后对key进行编码,先编码长度,然后再将它转化为byte数组之后编码内容
for (Map.Entry<String, Object> param : header.getAttachment()
.entrySet()) {
key = param.getKey();
keyArray = key.getBytes("UTF-8");
sendBuf.writeInt(keyArray.length);//key的字符编码长度
sendBuf.writeBytes(keyArray);
value = param.getValue();
marshallingEncoder.encode(value, sendBuf);
}
然后从Message中提取出Body,进行编码:
//Body:
Object body = message.getBody();
//如果不为空 说明: 有数据
if(body != null){
//使用MarshallingEncoder
this.marshallingEncoder.encode(body, sendBuf);
} else {
//如果没有数据 则进行补位 为了方便后续的 decoder操作
sendBuf.writeInt(0);
}
最后我们要获取整个数据包的总长度 也就是 header +? body 进行对 header length的设置:
这里有一个减8 的操作,最开始我也比较迷惑,后来去查文档,得到了解释:因为要把CRC和长度本身占的减掉(这部分就和我们计网学习的知识关系密切)
//总长度是在header协议的第二个标记字段中
//第一个参数是长度属性的索引位置
sendBuf.setInt(4, sendBuf.readableBytes() - 8);
NettyMessageDecoder:
public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder
首先调用父类(LengthFieldBasedFrameDecoder)方法,获得frame:
ByteBuf frame = (ByteBuf)super.decode(ctx, in);
然后判断frame不为空后,创建Header,通过读取frame,获得头的信息:
if(frame == null){
return null;
}
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setCrcCode(frame.readInt()); //crcCode ----> 添加通信标记认证逻辑
header.setLength(frame.readInt()); //length
header.setSessionID(frame.readLong()); //sessionID
header.setType(frame.readByte()); //type
header.setPriority(frame.readByte()); //priority
附件信息进行解码:
其中?attch.put(key, marshallingDecoder.decode(frame))是解码的核心操作。
int size = frame.readInt();
//附件个数大于0,则需要解码操作
if (size > 0) {
Map<String, Object> attch = new HashMap<String, Object>(size);
int keySize = 0;
byte[] keyArray = null;
String key = null;
for (int i = 0; i < size; i++) {
keySize = frame.readInt();
keyArray = new byte[keySize];
frame.readBytes(keyArray);
key = new String(keyArray, "UTF-8");
attch.put(key, marshallingDecoder.decode(frame));
}
keyArray = null;
key = null;
//解码完成放入attachment
header.setAttachment(attch);
}
message.setHeader(header);
这里就获得完了完整的头信息,进行setHeader操作。 对于ByteBuf来说,读一个数据,就会少一个数据,所以读完header,剩下的就是body了:
marshallingDecoder.decode(frame)仍然是解码的核心操作。
if(frame.readableBytes() > 4) { //大于4个字节,肯定就有数据了(4个字节是内容长度的占位)
message.setBody(marshallingDecoder.decode(frame));
}
return message;
}
最后setBody之后,将Header和Body组合起来就生成了新的完整的NettyMessage。
到这里我们已经分析完了NettyMessage的编解码流程。对这个过程有了一个整体的认知。
2.3 可靠性设计
Netty协议栈可能会运行在非常恶劣的网络环境中,网络超时、闪断、对方进程僵死或者处理缓慢等情况都有可能发生。为了保证在这些极端异常场景下 Netty 协议栈仍能够正常工作或者自动恢复,需要对它的可靠性进行统一规划和设计。这里主要涉及一下几方面:
- 握手和安全认证
- 心跳机制
- 重连机制
- 消息缓存重发机制
这一部分的内容主要在下次博客进行分析。
小结
这部分主要对Netty协议栈的框架进行了分析。设计消息体的结构以及编接码器。和使用Netty协议栈的基本的交互流程,以及协议栈应该实现的功能。
|