主要依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
粘包半包解决
粘包半包问题,这里使用LengthFieldBasedFrameDecoder来作为解决方案 在发送消息前,先约定用定长字节表示接下来数据的长度 LengthFieldBasedFrameDecoder()构造方法参数:
- maxFrameLength:数据帧最大长度
- lengthFieldoffset:长度字段偏移量
- lengthFieldLength:长度字段长度(字节)
- lengthAdjustment:长度字段后第几个字节是正文内容
- initialBytesToStrip:从帧首去除字节数
序列化(二进制序列化)
java序列化
注意事项: 序列化对象需要实现Serializable接口(java.io.Serializable) 代码:
public class MessageCodecJava extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
out.writeInt(bytes.length);
out.writeBytes(bytes);
outList.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
System.out.println(message);
out.add(message);
}
}
fastjson序列化
注意事项: 不需要实现Serializable接口(java.io.Serializable) 代码:
public class MessageCodecJson extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> list) throws Exception {
ByteBuf out = ctx.alloc().buffer();
byte[] jsonBytes = JSON.toJSONString(msg, SerializerFeature.WriteClassName).getBytes(StandardCharsets.UTF_8);
out.writeInt(jsonBytes.length);
out.writeBytes(jsonBytes);
list.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> list) throws Exception {
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
Message msg = JSON.parseObject(new String(bytes, StandardCharsets.UTF_8),Message.class);
System.out.println(msg);
list.add(msg);
}
}
粘包半包序列化解决举例:
客户端代码:
package com.haust.blog;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new MyClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}
finally {
group.shutdownGracefully();
}
}
}
服务器端代码:
package com.haust.blog;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyServer {
public static void main(String[] args) {
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup,workerGroup)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
POJO类 为提高编解码的扩展性,我们单独定义一个父类,然后让其他POJO类集成此父类
package com.haust.blog.pojo;
import java.io.Serializable;
public class Message implements Serializable {
}
package com.haust.blog.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class JsonBeanMessage extends Message {
private int id;
private String name;
}
package com.haust.blog.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class BinaryBeanMessage extends Message{
private int id;
private String name;
private String sex;
}
客户端初始化类
package com.haust.blog;
import com.haust.blog.handler.MyClientHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder
(1024,0,4,0,0));
pipeline.addLast(new MessageCodecJson());
pipeline.addLast(new MyClientHandler());
}
}
服务器端初始化类
package com.haust.blog;
import com.haust.blog.handler.MyServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder
(1024,0,4,0,0));
pipeline.addLast(new MessageCodecJson());
pipeline.addLast(new MyServerHandler());
}
}
客户端处理类
package com.haust.blog.handler;
import com.haust.blog.pojo.BinaryBeanMessage;
import com.haust.blog.pojo.JsonBeanMessage;
import com.haust.blog.pojo.Message;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyClientHandler extends SimpleChannelInboundHandler<Message> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {
System.out.println("客户端收到了服务器发来的数据:"+message);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JsonBeanMessage message = new JsonBeanMessage(1, "李四");
ctx.writeAndFlush(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
}
服务器端处理类
package com.haust.blog.handler;
import com.haust.blog.pojo.BinaryBeanMessage;
import com.haust.blog.pojo.Message;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyServerHandler extends SimpleChannelInboundHandler<Message> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {
System.out.println("服务器收到了客户端发来的数据:"+message);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
BinaryBeanMessage message = new BinaryBeanMessage(1, "张三", "男");
ctx.writeAndFlush(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
}
java序列化类
package com.haust.blog;
import com.haust.blog.pojo.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
@Slf4j
@ChannelHandler.Sharable
public class MessageCodecJava extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
out.writeInt(bytes.length);
out.writeBytes(bytes);
outList.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
System.out.println(message);
out.add(message);
}
}
fastjson序列化类
package com.haust.blog;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.haust.blog.pojo.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import java.nio.charset.StandardCharsets;
import java.util.List;
@ChannelHandler.Sharable
public class MessageCodecJson extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> list) throws Exception {
ByteBuf out = ctx.alloc().buffer();
byte[] jsonBytes = JSON.toJSONString(msg, SerializerFeature.WriteClassName).getBytes(StandardCharsets.UTF_8);
out.writeInt(jsonBytes.length);
out.writeBytes(jsonBytes);
list.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> list) throws Exception {
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
Message msg = JSON.parseObject(new String(bytes, StandardCharsets.UTF_8),Message.class);
System.out.println(msg);
list.add(msg);
}
}
包结构: 结果:
总结:
- 如何解决粘包半包?思路:自定义协议,在对内容进行编码时,将内容的长度也作为信息发送过去,使解码时可以确定信息边界正确解码,从而避免粘包半包。
- 在已经确定好编解码规则(自定义协议)的情况下,为何还会需要LengthFieldBasedFrameDecoder类解决粘包半包问题?当发送数据过大时,导致一组消息多次发送(一个完整的消息,经过发送多次才发送结束),若直接按照解码规则进行解码就会导致半包问题出现,若我们使用LengthFieldBasedFrameDecoder在解码前进行预处理,LengthFieldBasedFrameDecoder会等到接收到完整的消息之后才会传给自定义decoder进行解码,从而避免了半包问题等。
代码举例:
package com.haust.blog;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.haust.blog.pojo.JsonBeanMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import java.nio.charset.StandardCharsets;
public class test {
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(),
new LengthFieldBasedFrameDecoder(
1024, 0, 4, 0, 0),
new MessageCodecJson()
);
JsonBeanMessage json = new JsonBeanMessage(1, "张三");
channel.writeOutbound(json);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
byte[] jsonBytes = JSON.toJSONString(json, SerializerFeature.WriteClassName).getBytes(StandardCharsets.UTF_8);
buf.writeInt(jsonBytes.length);
buf.writeBytes(jsonBytes);
ByteBuf s1 = buf.slice(0, 50);
ByteBuf s2 = buf.slice(50, buf.readableBytes() - 50);
s1.retain();
channel.writeInbound(s1);
channel.writeInbound(s2);
}
}
结果:传过来74B信息,第一次只传过来50B信息,此时LengthFieldBasedFrameDecoder进行拦截(若此时直接将50B信息传给解码器就会出现半包),直到另外24B信息到来(此时信息已经完整),LengthFieldBasedFrameDecoder才将完整消息传给decoder(自定义)进行解码操作
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 74B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 46 7b 22 40 74 79 70 65 22 3a 22 63 6f |...F{"@type":"co|
|00000010| 6d 2e 68 61 75 73 74 2e 62 6c 6f 67 2e 70 6f 6a |m.haust.blog.poj|
|00000020| 6f 2e 4a 73 6f 6e 42 65 61 6e 4d 65 73 73 61 67 |o.JsonBeanMessag|
|00000030| 65 22 2c 22 69 64 22 3a 31 2c 22 6e 61 6d 65 22 |e","id":1,"name"|
|00000040| 3a 22 e5 bc a0 e4 b8 89 22 7d |:"......"} |
+--------+-------------------------------------------------+----------------+
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 50B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 46 7b 22 40 74 79 70 65 22 3a 22 63 6f |...F{"@type":"co|
|00000010| 6d 2e 68 61 75 73 74 2e 62 6c 6f 67 2e 70 6f 6a |m.haust.blog.poj|
|00000020| 6f 2e 4a 73 6f 6e 42 65 61 6e 4d 65 73 73 61 67 |o.JsonBeanMessag|
|00000030| 65 22 |e" |
+--------+-------------------------------------------------+----------------+
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 24B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 2c 22 69 64 22 3a 31 2c 22 6e 61 6d 65 22 3a 22 |,"id":1,"name":"|
|00000010| e5 bc a0 e4 b8 89 22 7d |......"} |
+--------+-------------------------------------------------+----------------+
JsonBeanMessage(id=1, name=张三)
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
Process finished with exit code 0
-
自定义编码器加@ChannelHandler.Sharable注解时 需要extends MessageToMessageCodec<ByteBuf, Message> Message是当接收到Message及其子类时进入此handler进行处理 -
为什么需要定义父类Message,然后其他POJO类继承?提高了可扩展性,如果父类message实现了序列化接口,子类就无需再显示的实现序列化接口。其次在编解码(handler)处理时指定了泛型,那么泛型类型及其子类均可以进入handler进行处理。
|