最近项目中需要对接上游系统,同步客户信息的变更,采用TCP协议通信,Netty提供了很好对于NIO处理的解决方案,因而采用Netty接收变更数据。
引入相关依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.63.Final</version>
</dependency>
增加相关配置:
socket:
port: 9000
netty:
bossThreadNum: 4
workThreadNum: 16
创建服务端接收变跟数据
@Value("${socket.port}")
private int port;
@Value("${socket.netty.bossThreadNum}")
private int bossThreadNum;
@Value("${socket.netty.workThreadNum}")
private int workThreadNum;
@PostConstruct
public void init() throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(bossThreadNum);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(workThreadNum);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new CustomerServerHandler(dispatch));
}
})
.option(ChannelOption.SO_BACKLOG, DEFAULT_SO_BACKLOG)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
serverBootstrap.bind(new InetSocketAddress(port)).sync();
logger.info("Socket Server started, bind port: " + port);
}
完成具体处理的业务逻辑,自测正常,也符合预期结果能接收和响应变更数据,但是到了线上测试环境,发现收到的报文不完全,只有一部分,显然是出现了粘包拆包。
回过头看一下定义的报文规则:8位长度头(不足8位数字补0)+ 具体的报文内容,显然这里并不能直接使用String的编解码器,使用String的编解码器报文长度超过1024就被截断了。回头看看报文格式,提供了长度头,想到Netty提供的4种用来处理粘包拆包的方案中,刚好就有长度解码,可以考虑通过LengthFieldBasedFrameDecoder解码器自定义长度就可以解决粘包拆包的问题。
说干就干,修改编解码器:
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(4194304, 0, 8, 0, 8));
pipeline.addLast(new LengthFieldPrepender(8, true));
pipeline.addLast(new CustomerServerHandler(dispatch));
但是修改后连报文都解析不出来了,Netty一直报TooLongFrameException,一开始以为是接收报文给的长度不够,调整了长度大小后还是一样出现问题。 后面发现是自己忽略了一个点,Netty提供的LengthFieldBasedFrameDecoder解码器确实是解决了自定义长度TCP粘包拆包的问题,但是这个长度解码器,是以数值形式来解析的,而实际上这个场景中的报文长度是字符串,所以要先把长度以字符串形式截取8位下来,转成整形再具体接收数据。Netty是没有提供字符串长度解码器,所以要自定义一个解码器。
自定义字符串长度解码器:
public class MsgDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(MsgDecoder.class);
private static final int HEAD_LENGTH = 8;
@Override
protected void decode(ChannelHandlerContext context, ByteBuf byteBuf, List<Object> list) throws Exception {
if (byteBuf.readableBytes() < HEAD_LENGTH) {
return;
}
byteBuf.markReaderIndex();
byte[] headBytes = new byte[HEAD_LENGTH];
byteBuf.readBytes(headBytes);
String headStr = new String(headBytes, StandardCharsets.UTF_8);
logger.info("find head: {}", headStr);
int contentLength = Integer.parseInt(headStr);
if (byteBuf.readableBytes() < contentLength) {
byteBuf.resetReaderIndex();
return;
}
byte[] contentBytes = new byte[contentLength];
byteBuf.readBytes(contentBytes);
String contentStr = new String(contentBytes, StandardCharsets.UTF_8);
logger.info("esb receive msg: {}", contentStr);
list.add(contentStr);
}
}
将自定义的解码器加到管道中。
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new MsgDecoder(macEnabled));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new EsbServerHandler(dispatch));
之后便能正常接收到完整的报文了。
通过实现了字符串长度解码器,解析报文头部中的长度,进而读取整个报文内容,解决了粘包拆包的问题。从另一个方面也表明,对于一些问题,还是要知其然再去处理,不然容易给自己挖坑。
|