本篇文章,自定义一个数据协议,通过Python语言,使用这个自定义的数据协议,将数据发送给Netty接收端. 之所以使用两种不同的语言,也在说明,数据之间的传输与语言无关.只要发送端和接收端彼此遵守相同的协议即可. 关于协议,无处不在,比如与网络相关的HTTP协议, 比如向Redis发送命令使用的RESP协议,比如Dubbo消费者和提供者之间的数据传输,比如RocketMQ消费者与服务端之间的消息传输,比如JVM中使用jstack命令获取堆栈信息时所使用的协议,等等. 它们之间必然会有一套相关的协议,用于数据传输. 一切皆协议,世间协议再多,常见的协议也无外乎那么几个,在Netty中已经默认提供了相关常见协议的解码器.
FixedLengthFrameDecoder
LengthFieldBasedFrameDecoder
LineBasedFrameDecoder
DelimiterBasedFrameDecoder
比如HTTP协议,在它的请求头里面就使用Content-Length指明了请求正文的长度. 其实Dubbo,RocketMQ,Redis等它们也是类似思想实现的,这样的协议很常见很实用.
于是乎,Netty就默认实现了一种这样的协议解码器,即LengthFieldBasedFrameDecoder解码器.在之前的文章中有介绍它,这里就不介绍了.
而我们本篇自定义的协议也是和它类似的,如下所示 整个协议是由请求头(head)和请求体(body)两个部分组成, 请求头(head)用于存放请求体(body)的长度,请求体(body)是真正存放数据.
接下来就是通过代码演示环节
首先看下Python端(作为客户端,用于发送数据)
from socket import *
import struct
import json
class Book(object):
def __init__(self, addr, year):
self.addr = addr
self.year = year
def addr(self):
return self.addr
def keys(self):
return ['addr', 'year']
def __getitem__(self, item):
return getattr(self, item)
if __name__ == '__main__':
client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8082))
book = Book('成都', 2021)
json = json.dumps(dict(book))
body = bytes(json, 'utf-8')
body_len = len(body)
head = struct.pack('>I', body_len)
client.sendall(head + body)
再看一下Netty接收端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
channelPipeline.addLast(new MyDecoder());
channelPipeline.addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 8082).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyDecoder extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
String json = new String(bytes, "UTF-8");
MyModel v = JSON.parseObject(json, MyModel.class);
ctx.fireChannelRead(v);
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ServerHandler extends SimpleChannelInboundHandler<MyModel> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyModel msg) throws Exception {
System.out.println(msg.getAddr());
}
}
以上Netty代码的功能,如下图所示
先运行Netty服务端,再运行Python客户端,在控制台就可以看到Netty服务端接收到的数据.
简单录制了一个视频
个人站点 语雀
公众号
|