消息协议
消息协议的概念听起来非常的高大上,但是消息协议到底指代的是什么?
消息协议是指通讯双方传输的数据(消息)是如何表达描述的。
如 HTTP 协议,浏览器在打开一个网页是,首先和服务端建立连接,然后发送请求(请求中主要包括一些请求头、请求类型、请求URL、请求报文等),服务端接收到请求后,首先会对当前请求通过既定规则进行解析,然后响应返回响应的数据流给客户端,这些既定规则也就是消息协议。
自定义消息协议
那么自定义消息一般包括哪些内容呢?如:
- 版本号,
- 消息类型, 请求/响应, GET、POST、DELETE
- 消息长度
- 消息的正文
- 序列化算法
- …
如何自定义一个消息协议???
协议定义
statusCode | sessionId | reqType | contentLength | content
上面我们自定义了一个协议,其中 statusCode 表示状态代码,sessionId、reqType、contentLength就是请求头信息,content为消息内容。 下面通过Netty框架来完成一个自定义消息协议的定义,以及客户端、服务端使用当前协议进行数据交换的过程。
通过Netty实现自定义消息协议
1. 工程目录
首先创建一个Maven工程,其中 netty-msg-agreement 表示自定义消息协议,netty-msg-client 表示客户端,netty-msg-server 表示服务端,client、server模块依赖与agreement模块。
netty-msg-protocol pom.xml 依赖:添加netty-all依赖以及lombox依赖。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.69.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.14</version>
<scope>provided</scope>
</dependency>
2. netty-msg-agreement
其中MessageRecord代表消息记录,客户端服务端通过该对象进行消息传递。消息传递过程中,客户端服务端通过MessageRecordDecoder(解码器)、MessageRecordEncoder(编码器)进行消息编码和解码。
主要代码如下:
2.1 MessageRecord
/**
* 消息记录
* 状态代码 | 请求头(会话id | 请求方式 | 请求体长度) | 消息内容
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageRecord {
// 状态代码:4个字节
private int statusCode;
// 消息请求头
private Header header;
// 消息内容
private Object body;
}
2.1 Header
/**
* 请求头:自定义设计
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Header {
// 会话id:8个字节
private long sessionId;
// 请求方式:1个字节
private byte reqType;
// 请求体长度:4个字节
private int contentLength;
}
2.3 MessageRecordEncoder-编码器
/**
* 编码器
*/
public class MessageRecordEncoder extends MessageToByteEncoder<MessageRecord> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MessageRecord messageRecord, ByteBuf byteBuf) throws Exception {
System.out.println(">>>>>>>>>>>消息编码 start>>>>>>>>>>>");
// 状态行
byteBuf.writeInt(messageRecord.getStatusCode());
// 请求头
Header header = messageRecord.getHeader();
byteBuf.writeLong(header.getSessionId());
byteBuf.writeByte(header.getReqType());
Object body = messageRecord.getBody();
if (body != null){// 消息内容不为空
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(outputStream);
out.writeObject(body);
byte[] bytes = outputStream.toByteArray();
// 消息长度
byteBuf.writeInt(bytes.length);
// 消息内容
byteBuf.writeBytes(bytes);
}else {// 消息内容为空
byteBuf.writeInt(0);
}
// 写入并且刷新
channelHandlerContext.writeAndFlush(messageRecord);
System.out.println(">>>>>>>>>>>消息编码 end>>>>>>>>>>>");
}
}
2.4 MessageRecordDecoder-解码器
/**
* 解码器
*/
public class MessageRecordDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> out) throws Exception {
System.out.println(">>>>>>>>>>>消息解码 start>>>>>>>>>>>");
// 通过byteBuf获取数据
int statusCode = byteBuf.readInt();// 获取4个字节
Header header = new Header();
header.setSessionId(byteBuf.readLong());// 获取8个字节
header.setReqType(byteBuf.readByte());
header.setContentLength(byteBuf.readInt());// 获取4个字节
if (header.getContentLength() > 0){// 消息长度大于0
MessageRecord messageRecord = new MessageRecord();
messageRecord.setStatusCode(statusCode);
messageRecord.setHeader(header);
// 获取消息体
byte[] bytes = new byte[header.getContentLength()];
byteBuf.readBytes(bytes);// 读取消息体内容到 bytes
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);// java自带反序列化工具
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
messageRecord.setBody(objectInputStream.readObject());
System.out.println("收到消息内容为:" + messageRecord);
// 注意:需要将消息传输对象添加到 `List<Object> out` 中,如果不添加,服务端接收处理不到消息内容
out.add(messageRecord);
}else {
System.out.println("消息内容为空,不解析");
}
System.out.println(">>>>>>>>>>>消息解码 end>>>>>>>>>>>");
}
}
2.5 枚举
/**
* 请求方式 枚举
*/
public enum RequestTypeEnums {
GET((byte) 1),
POST((byte) 2),
DELETE((byte) 3),
;
private byte reqType;
RequestTypeEnums(byte reqType) {
this.reqType = reqType;
}
public byte getReqType() {
return this.reqType;
}
}
/**
* 状态行 枚举
*/
public enum StatusCodeEnums {
SUCCESS(0, "成功"),
FAIL(-1, "失败"),
EXCEPTION(-2, "异常"),
;
private int statusCode;
private String desc;
StatusCodeEnums(int statusCode, String desc) {
this.statusCode = statusCode;
this.desc = desc;
}
public int getStatusCode() {
return statusCode;
}
public String getDesc() {
return this.desc;
}
}
3. netty-msg-server
其中,ProtocolServer 为服务启动类,等待客户端连接。ServerFinalHeaders 为服务端消息处理类。
在netty-msg-server pom.xml 中添加 netty-msg-agreement 依赖。
<dependency>
<groupId>org.example</groupId>
<artifactId>netty-msg-agreement</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
3.1 ProtocolServer
/**
* 服务端
*/
public class ProtocolServer {
public static void main(String[] args) {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024,
13, // statusCode + sessionId + reqType
4, // 请求体长度
0,
0))
.addLast(new MessageRecordDecoder())
.addLast(new MessageRecordEncoder())
.addLast(new ServerFinalHeaders());
}
});
try {
int port = 8080;
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println(">>>>>>>>>>ProtocolServer start success>>>>>>>>>>" + port);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
3.2 ServerFinalHeaders
public class ServerFinalHeaders extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MessageRecord messageRecord = (MessageRecord) msg;
System.out.println("Server 收到消息内容为:" + messageRecord);
// 把消息写回客户端
messageRecord.setBody("server data:" + messageRecord.getBody());
ctx.channel().writeAndFlush(messageRecord);
super.channelRead(ctx, msg);
}
}
4. netty-msg-client
其中,ProtocolClient 为客户端启动类,通过该类与服务端ProtocolServer建立连接,并传递消息。ClientFinalHeaders 为客户端消息处理类。
在netty-msg-server pom.xml 中添加 netty-msg-agreement 依赖。
<dependency>
<groupId>org.example</groupId>
<artifactId>netty-msg-agreement</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
4.1 ProtocolClient
/**
* 客户端
*/
public class ProtocolClient {
public static void main(String[] args) {
EventLoopGroup work = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(work).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024,
13, // statusCode + sessionId + reqType
4, // 请求体长度
0,
0))
.addLast(new MessageRecordDecoder())
.addLast(new MessageRecordEncoder())
.addLast(new ClientFinalHeaders());
}
});
try {
ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync();
Channel channel = future.channel();
for (int i = 0; i < 5; i++) {
MessageRecord msg = new MessageRecord();
msg.setStatusCode(StatusCodeEnums.SUCCESS.getStatusCode());
Header header = new Header();
header.setSessionId(System.currentTimeMillis());
header.setReqType(RequestTypeEnums.POST.getReqType());
msg.setHeader(header);
String body = String.format("第%s条请求数据:%s", i + 1, UUID.randomUUID().toString());
msg.setBody(body);
channel.writeAndFlush(msg);
}
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
work.shutdownGracefully();
}
}
}
4.2 ClientFinalHeaders
public class ClientFinalHeaders extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MessageRecord messageRecord = (MessageRecord) msg;
System.out.println("Client 收到消息内容为:" + messageRecord);
super.channelRead(ctx, msg);
}
}
|