最近学习了使用netty进行网络通信,对数据帧进行封装打包等,记录一下遇到的问题。网上很官方的描述就不再描述啦。其次,这个笔记是根据近期实现的项目而做的,因此比较偏向于与实物之间的通信协议问题。
一、Netty是什么
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架。
二、Netty作为服务器端的整体架构
出去一个监听器去调用服务之外,主要分为以下4个部分: ①NettyServer (or Client )进行服务的启动,在这里可以定义自己是服务端还是客户端,同时定义channel通道进行使用; ②Decoder 使用①中的channel通道拿到收取的报文,对接受到的数据帧进行解码拿到自己想要的信息; ③Encoder 同②所述,将想要发送给远程客户端的消息体封装成报文并放入channel通道进行发送; ④Handler 这里主要进行一些逻辑处理,例如通道刚刚激活后,要进行什么事务以及读通道中的消息;
1、 NettyServer
首先,来一段作为服务器端的代码(由于是新手,几乎每一句都去搜了是什么意思,写在了注释里,方便以后复习查看): 一些参数查看了这个博客:链接: link.
private static ServerBootstrap serverBootstrap;
public static void initNetty(String ip, Integer port,Integer IDLETIME_READER,Integer IDLETIME_WRITER,Integer IDLETIME_ALL) throws InterruptedException {
serverBootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 2048)
.option(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(512, 1024, 65536))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("idleStateHandler", new IdleStateHandler(IDLETIME_READER, IDLETIME_WRITER, IDLETIME_ALL, TimeUnit.SECONDS))
.addLast("decoder", new ServerDecoder())
.addLast("channelHandler", new ServerHandler())
.addLast("encoder", new ServerEncoder());
logger.info("success to initHandler!");
}
});
try {
ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
logger.info("[Netty] - Server bootstrap bind to addr(" + ip + ":" + port + ")");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error("Server start got exception!", e.getMessage());
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
2、Handler
后来,看了一些博客发现,Handler分为入站和出站,进行对比后发现这个项目中仅仅使用了入站Handler,例如下图,可以定义以下一些方法:
- channelRegistered,ChannelHandlerContext的Channel被注册到EventLoop;
- channelUnregistered,ChannelHandlerContext的Channel从EventLoop中注销
- channelActive,ChannelHandlerContext的Channel已激活
- channelInactive,ChannelHanderContxt的Channel结束生命周期
- channelRead,从当前Channel的对端读取消息
- channelReadComplete,消息读取完成后执行
- userEventTriggered,一个用户事件被处罚
- channelWritabilityChanged,改变通道的可写状态,可以使用Channel.isWritable()检查
- exceptionCaught,重写父类ChannelHandler的方法,处理异常
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
logger.info("into ServerHandler channelActive()........");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx,msg);
logger.info("received message from client :");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
InetSocketAddress addr = (InetSocketAddress) ctx.channel().remoteAddress();
logger.info("链路检测超时,关闭链接:"+((IdleStateEvent) evt).state().toString() + " - " + addr.getAddress().getHostAddress() + ":" + addr.getPort());
ctx.channel().close();
}
}
3、Decoder
public class ServerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
logger.info("into ServerDecoder decode()......");
byte[] msg = ByteBufUtil.getBytes(byteBuf);
System.out.println("Length: " + msg.length + " - msg: " + ByteUtil.bytesToHexString(msg, 0, msg.length));
byte[] fullMessage = null;
int bufLength = byteBuf.readableBytes();
if (bufLength < 8) {
return;
}
byteBuf.markReaderIndex();
byte[] head = new byte[1];
byteBuf.readBytes(head);
String startIdent = ByteUtil.bytesToHexString(head, 0, 1);
assert startIdent != null;
if (startIdent.equals("xx")) {
byte[] len_bs = new byte[2];
byteBuf.readBytes(len_bs);
int len = ByteBigUtil.getInt(len_bs);
int fullLength = (len + 8);
if (bufLength < fullLength) {
byteBuf.resetReaderIndex();
return;
}
byteBuf.resetReaderIndex();
fullMessage = new byte[fullLength];
byteBuf.readBytes(fullMessage);
} else {
byteBuf.resetReaderIndex();
byteBuf.readByte();
return;
}
if (fullMessage != null) {
String info = ByteUtil.bytesToHexString(fullMessage, 0, fullMessage.length);
String ip = ((InetSocketAddress) channelHandlerContext.channel().remoteAddress()).getAddress().getHostAddress();
Integer port = ((InetSocketAddress) channelHandlerContext.channel().remoteAddress()).getPort();
logger.info("Received... (" + ip + ":" + port + ") data: " + info);
Class<?> clazz = Class.forName(StaticUtil.protobuf_package + ByteUtil.bytesToHexString(fullMessage, 1, 2));
Constructor c = clazz.getConstructor(byte[].class, String.class, Integer.class,Channel.class);
Channel channel = channelHandlerContext.channel();
Object o = c.newInstance(fullMessage, ip, port,channel);
list.add(o);
}
}
}
4、Encoder
public class ServerEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
String ip = ((InetSocketAddress) channelHandlerContext.channel().remoteAddress()).getAddress().getHostAddress();
Integer port = ((InetSocketAddress) channelHandlerContext.channel().remoteAddress()).getPort();
MessageBody msgBody = (MessageBody) o;
Method method_writeToBytes = MessageBody.class.getMethod("xxxxwrite");
byte[] sendBs = (byte[]) method_writeToBytes.invoke(msgBody);
byteBuf.writeBytes(sendBs);
channelHandlerContext.writeAndFlush(byteBuf);
byteBuf.retain();
System.out.println("encode ended......");
}
}
二、网络调试助手的使用
关于网络调试助手的下载可以去搜索一些其他的博客,我来说一下因为第一次使用网络调试助手遇到的问题,比较小白: 1、首先确定你自己写的是服务器端还是客户端,去调整这个助手,如果你的代码是实现服务器端,就调整为TCP Client;相反,客户端的话就调整为TCP Server。
2、下边的ip和端口号,根据NettyServer传入的ip和port参数进行调整,按理说应该在监听器中去写,但是这里单拿出一个作为测试连接的例子,可以直接写在NettyServer中:
public static void main(String[] args)throws Exception {
initNetty("127.0.0.1",8081,50,0,0);
}
参照以上的例子,就将网络调试助手设置成一下样子: 然后启动你的服务器,再点击连接就可以啦
3、明确传输报文的格式,明确是ASCII码还是16进制传输,这里我就遇到了很大的坑,当时解码怎么都解不出来,好不容易解出来只能一个字节数组存一位数字,后来才发现没有设置传输的形式,勾选的ASCII码,废了很大的劲完全写完整个项目后才发现这个助手可以调传输数据的方式,于是又废了一晚上大改整个项目… 这里还有一些其他的功能可以自己去了解一下。我也是一个在校大学生小白,因此有很多地方可能说的并不准确而且有问题,希望大家帮忙指正!
|