一、黏包和半包现象例子
服务器端代码:
package com.test.netty.c6;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HelloServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 连接建立时会执行该方法
log.debug("connected {}", ctx.channel());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 连接断开时会执行该方法
log.debug("disconnect {}", ctx.channel());
super.channelInactive(ctx);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
log.debug("服务器端启动...");
channelFuture.sync();
log.debug("服务器端启动成功...");
//channelFuture.channel().close().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
//boss.shutdownGracefully();
//worker.shutdownGracefully();
}
}
}
客户端代码:
package com.test.netty.c6;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HelloClinet {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("发送数据...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ByteBuf byteBuf = ch.alloc().buffer();
byteBuf.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(byteBuf);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
worker.shutdownGracefully();
}
}
}
黏包
上面的代码可以看到,客户端期望发送10次,每次发送16个字节。但是运行结果如下:
?客户端一次性接受到了160字节,这就是黏包现象。
半包
当设置了服务器端缓存小于16个字节: serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
运行结果:
?服务器端会多次接收,但是每次接收的大小都不是16个字节,这就是半包现象
二、出现的原理分析
黏包
发送abc def ,接收到abcdef
- 应用层设置的ByteBuf太大了(Netty默认1024)
- 滑动窗口,假设发送256字节数据,由于滑动窗口足够大,返回的不及时,一次发送的数据(256字节)会缓存到滑动窗口中,多次发送就会形成黏包
- Nagle算法,太小的数据发送一次影响效率,所以保存到缓冲区,缓冲区达到一定大小后在一起发送
半包
发送abcdef,接收到abc def
- 应用层的ByteBuf太小了
- 滑动窗口,滑动窗口的缓冲区小于发送的数据,导致发送的数据被分割了
- MSS限制(网卡),当发送数据超过MSS限制后,会将数据分割
发送黏包和半包的本质就是因为TCP是流式协议,消息无边界
三、解决方案
1、短连接
每次客户端发送数据后,都断开连接,这样服务器端就知道这次发送已经完成,这样就不会出现黏包现象,但是一次性发送数据量过大的话,就会出现半包现象
代码改进:
每次发送直接关闭channel即可。
2、定长解码器
其实就是客户端和服务器端约定一个数据大小长度,服务器端每次接收到固定长度的大小就知道是一次发送,但是这样会消耗带宽,例如发送一个字节的数据也需要占位符去占位到约定大小。
这种方式服务器端使用FixedLengthFrameDecoder 对接收到的数据进行固定长度的解码。
服务器端:
客户端:
?运行结果:
3、行解码器
第三种解决方案就是,客户端和服务器端约定一个关键字,遇到这个关键字就知道是一条数据。但是这种方式效率比较低,因为需要检查每个字符是不是关键字。
解码器:
- LineBasedFrameDecoder(int maxLength)? 使用"\n",分隔数据
- DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters) 可以自定义分隔符
服务器改造:
客户端改造:
?
运行结果:
4、长度字段解码器
这种方式就是把消息体的长度,也当做数据进行传递,这样就告诉了服务器端消息体的长度,服务器可以根据长度去解析消息体,但是因为整个消息中包含了数据的长度内容,所以需要约定数据长度站多少字节数,同时在数据长度前后可能存在其他信息(例如版本号等等),所以就算不像前几个那样约定,也要约定怎么解析整个数据体,但是相对前几个效率已经有大大的提升了,并且消息整体携带的信息也有很大的提升。
LengthFieldBasedFrameDecoder的参数,一共5个:
- maxFrameLength 整个数据的最大长度,包含了全部信息
- lengthFieldOffset “长度”的偏移量(用于确定“长度”的起始位置)
- lengthFieldLength? “长度”所占用的字节数
- lengthAdjustment? “长度”标识和真正消息体的偏移量(用于确定“消息体”的起始位置)
- initialBytesToStrip? 读取数据的起始位置
图解:
集中情况的解析:
1、
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 0 (= do not strip header)
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
没有其它数据,只有消息体和长度,其中长度占2个字节。
2、
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 2 (= the length of the Length field)
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
没有其它数据,只有消息体和长度,其中长度占2个字节,但是读取从消息体开始读取,也就是2个字节之后开始读取(initialBytesToStrip =2)
3、
lengthFieldOffset = 2 (= the length of Header 1)
lengthFieldLength = 3
lengthAdjustment = 0
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
| 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
在“长度”前面,存在其它内容,长度是2字节(lengthFieldOffset =2),“长度”字段是3个字节,所以一共是17个字节
4、
lengthFieldOffset = 1 (= the length of HDR1)
lengthFieldLength = 2
lengthAdjustment = 1 (= the length of HDR2)
initialBytesToStrip = 3 (= the length of HDR1 + LEN)
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
在“长度”前面和后面都有一个字节的其他数据(lengthFieldOffset =1,lengthAdjustment?=1),从第三个字节开始读取(initialBytesToStrip=3),所以读到了“长度”后面的1个字节的其他消息
实例:
package com.test.netty.c7;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.nio.charset.StandardCharsets;
public class LengthFieldBasedFrameDecoderTest {
public static void main(String[] args) {
EmbeddedChannel channel = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(1024, 1, 4, 1, 0),
new LoggingHandler(LogLevel.DEBUG)
);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
send(buf, "hello");
channel.writeInbound(buf);
buf.clear();
send(buf, "world");
channel.writeInbound(buf);
}
private static void send(ByteBuf buf, String msg){
int length = msg.length();
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
//长度前面的标识 1个字节
buf.writeByte(0xCA);
//长度
buf.writeInt(length);
//长度后面的标识 1个字节
buf.writeByte(0xFE);
//写消息体
buf.writeBytes(bytes);
}
}
运行结果:
?总结,在实际使用过程中,一定是最后一种?LengthFieldBasedFrameDecoder 运用更加广泛,不仅能动态的读取到消息体,同时也能携带更多的信息,但是还是需要约定相关内容,这个是必然的,我们平时运用的各种协议也都是基于协议的约定进行使用的。
|