1. TCP粘包现象出现的原因
TCP 是面向连接,面向流,可以提供高可靠性,在发送数据的时候,因为TCP发送数据是按照数据块的,为了提供发送的效率,发送端会将多个小的数据数据包合并在一块(主要是Nagle算法),更好的发送数据。这样就出现了一个新的问题,接收端不能识别出来原始数据,这就是粘包。
说明:
- 第一行所示,data1和data2都是两次独立的包,没有发生粘包。
- 第二行所示,data1和data2 两个粘在一块,发生粘包。
- 第三行所示,data1分为一半,前面一半,是一个独立的包,后面的是data1的另一半和data2粘联在一块。
- 第四行所示,data2分为一半,和第三行一样。也发生了粘包。
1.1 现象重显
class TestTCpZhan{
public synchronized void startServer(){
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
System.out.println("服务端收到消息" + byteBuf.toString(StandardCharsets.UTF_8));
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(UUID.randomUUID().toString(),StandardCharsets.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
});
}
});
PrintUtil.printInfo("server is start");
ChannelFuture future = serverBootstrap.bind(7777).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workGroup.shutdownGracefully();
boosGroup.shutdownGracefully();
}
}
public void startClient(){
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
System.out.println("客户端收到服务端的消息为:" + byteBuf.toString(StandardCharsets.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello:server" + i,StandardCharsets.UTF_8));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(7777)).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workGroup.shutdownGracefully();
}
}
}
- 结果
按照上面的代码逻辑,一次发送数据,应该返回一个回应,但是从运行结果上看,只发了两次数据,收到两次响应,可以看到数据压缩了。发生了粘包。
2. 解决思路
- 使用自定义的协议和编解码器来做。
- 每次规定读取多少字节的数据,就能消除粘包。
3. netty解决方法
//自定义的消息传输的协议
class MyMessageProtocol{
private int length;
byte[] content;
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
注意,这里的代码分为客户端和服务端,我直接把两个写在一块了,并且在服务端添加了编解码器,在代码里面添加了注释。
class NettyResolveTcpZhan{
public synchronized void startServer(){
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ByteToMessageCodec<MyMessageProtocol>() {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MyMessageProtocol messageProtocol, ByteBuf byteBuf) throws Exception {
PrintUtil.printInfo("ByteToMessageCodec#encode:"+ messageProtocol.toString());
byteBuf.writeInt(messageProtocol.getLength());
byteBuf.writeBytes(messageProtocol.getContent());
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
int length = byteBuf.readInt();
PrintUtil.printInfo("ByteToMessageCodec#decode:" + length);
byte[] content = new byte[length];
byteBuf.readBytes(content);
MyMessageProtocol myMessageProtocol = new MyMessageProtocol();
myMessageProtocol.setLength(length);
myMessageProtocol.setContent(content);
list.add(myMessageProtocol);
}
})
.addLast(new SimpleChannelInboundHandler<MyMessageProtocol>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyMessageProtocol byteBuf) throws Exception {
System.out.println("服务端收到消息" + byteBuf.toString());
byte[] responseMessageBytes = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
MyMessageProtocol myMessageProtocol = new MyMessageProtocol();
myMessageProtocol.setContent(responseMessageBytes);
myMessageProtocol.setLength(responseMessageBytes.length);
channelHandlerContext.writeAndFlush(myMessageProtocol);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
});
}
});
PrintUtil.printInfo("server is start");
ChannelFuture future = serverBootstrap.bind(7777).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workGroup.shutdownGracefully();
boosGroup.shutdownGracefully();
}
}
public void startClient(){
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
String message = byteBuf.toString(StandardCharsets.UTF_8);
System.out.println("客户端收到响应:" + message);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
String message = "hello:server" + i;
byte[] messageBytes = message.getBytes(StandardCharsets.UTF_8);
int messageLength = messageBytes.length;
MyMessageProtocol myMessageProtocol = new MyMessageProtocol();
myMessageProtocol.setContent(messageBytes);
myMessageProtocol.setLength(messageLength);
ByteBuf buffer = Unpooled.buffer();
buffer.writeInt(messageLength);
buffer.writeBytes(messageBytes);
ctx.writeAndFlush(buffer);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(7777)).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workGroup.shutdownGracefully();
}
}
}
总体的解决方案就是自定义协议,告诉netty,一个消息的长度是多少 ,剩下的操作就是在编解码器这里操作了。
- 结果
可以看到,就没有上面出现的问题了。很好的解决掉了tcp粘包的问题。
补充
和粘包对应的是拆包,意思就是把粘在一起的包拆开。上面操作就是通过自定义协议来处理粘包也就是正确的拆包。
|