1. 前言
前面分析了Consumer是如何发起远程服务调用的,最终DubboInvoker会利用ExchangeClient客户端发送网络请求。Dubbo会将网络请求封装为Request对象发送,但是网络传输的总是字节序列,Request对象必须经过编码才能被发送。同理,服务端在接收到客户端的请求后,也必须先解码才能得到Request对象,Response亦是如此。 ?
Dubbo网络通讯协议分为两部分,分别是Header和Body,Header部分采用Codec编解码,Body部分使用序列化。本篇文章会分析Dubbo对于消息的编解码和序列化的细节。
2. 编解码
Dubbo默认使用Netty作为网络传输层框架,因此我们也以Netty为例,分别从客户端编码和服务端解码两个视角去分析。
2.1 Encoder
消息的编码相对于解码来说要简单的多,因为不用考虑TCP粘包/拆包的问题。要想知道发送的Request对象经历了什么,我们首先要从NettyClient说起。 ?
使用Netty,作为开发者而言,最重要的就是设计ChannelHandler,编排ChannelHandlerPipeline。一般来说,入站的数据需要先解码,出站的数据最终需要编码,因此会在Pipeline的头部设置编解码处理器,Dubbo也正是这么处理的。
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
Dubbo在Pipeline的头部放了编码器InternalEncoder,它依赖于ExchangeCodec,所以我们直接看ExchangeCodec#encodeRequest() 。 编码分为两部分,根据协议设置Header,再根据序列化策略将Request里的Data序列化后写入Body。
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
byte[] header = new byte[HEADER_LENGTH];
Bytes.short2bytes(MAGIC, header);
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
Bytes.long2bytes(req.getId(), header, 4);
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
int len = bos.writtenBytes();
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header);
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
关于Dubbo网络通讯协议的部分,请查阅之前的文章。
这里需要注意一点,Header后4个字节记录的是BodyLength,服务端会基于此解决TCP粘包拆包的问题。
2.2 Decoder
解码比编码要复杂的多,因为要考虑TCP粘包拆包的场景。 ?
和编码一样,针对服务端的解码,我们要从NettyServer说起,下面是Dubbo Pipeline的设置。
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
可以发现,和客户端几乎如出一辙,唯一的不同是最后的Handler,一个用来处理客户端逻辑,一个用来处理服务端逻辑。 ?
Dubbo在Pipeline的头部放了解码器InternalDecoder,它继承自Netty提供的ByteToMessageDecoder抽象类。它依赖DubboCountCodec,本身不处理解码逻辑,就是个简单的循环,以便读取多条消息。
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
ChannelBuffer message = new NettyBackedChannelBuffer(input);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
do {
int saveReaderIndex = message.readerIndex();
Object msg = codec.decode(channel, message);
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
out.add(msg);
}
}
} while (message.readable());
}
}
DubboCountCodec也是个装饰者,它本身也不处理解码逻辑,它在DubboCodec的基础上增加了解码多条消息的能力,而且会把解码的消息数写入attachments。
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex();
MultiMessage result = MultiMessage.create();
do {
Object obj = codec.decode(channel, buffer);
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
buffer.readerIndex(save);
break;
} else {
result.addMessage(obj);
logMessageLength(obj, buffer.readerIndex() - save);
save = buffer.readerIndex();
}
} while (true);
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);
}
return result;
}
最终会调用ExchangeCodec#decode() 开始解码单条消息,首先会尝试读取Header,但是由于TCP拆包的问题,读取到的Header可能并不完整,后面会做判断。
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
return decode(channel, buffer, readable, header);
}
接下来会对接收到的数据做判断,是否满足16字节,如果不满足,说明连最基本的Header都不完整,此时会返回NEED_MORE_INPUT ,表示暂不处理,等待对端发送更多的数据。
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
如果Header读取完毕,则开始解析BodyLength,判断Body是否读取完整,如果不完整照样没法处理,需要等待对端发送更多的数据。
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
到这里,说明一条完整的消息已经接收到了,可以调用decodeBody() 方法进行解码了。解码的方式也不复杂,客户端会按照协议格式写入数据,服务端按照相同的格式读取出来即可,最终的到Request对象。 这里有个点需要注意,Dubbo对于消息Body的反序列化是可以设置工作线程的,默认是在业务线程上进行,也可以通过参数decode.in.io 设置在IO线程上进行。
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
解码出来的Request后面会交给DecodeHandler处理,方法是received() ,如果前面IO线程没有反序列化,这里会利用业务线程反序列化,最终交给Handler处理。
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
如果是来自Consumer的RPC调用请求,解码后的结果就是RpcInvocation,最终会交给DubboProtocol里的ExchangeHandler处理,调用本地Invoker,响应结果,流程结束。
3. 序列化
Dubbo支持多种序列化策略,例如Java本身的序列化、hessian2、Kryo等,序列化策略也是通过SPI加载的,可以非常方便的更换。 Serialization是Dubbo对序列化的抽象接口,默认的序列化方案是Hessian2,我们以它为例分析。 序列化和反序列化其实就是把Java对象和字节序列相互转换的一个过程,Dubbo将这个过程也抽象成了两个接口,分别是ObjectOutput和ObjectInput,前者用于序列化,后者用于反序列化。
public class Hessian2Serialization implements Serialization {
@Override
public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
return new Hessian2ObjectOutput(out);
}
@Override
public ObjectInput deserialize(URL url, InputStream is) throws IOException {
return new Hessian2ObjectInput(is);
}
}
Hessian2序列化,底层依赖Hessian2Output,它是hessian框架提供的序列化类,感兴趣的朋友可以去了解一下,不在本文的讨论范围之内。
4. 总结
网络传输的总是字节序列,无论是请求还是响应,发送方要编码,接收方要解码。Dubbo默认使用Netty作为网络传输层框架,实现消息编解码的方式是在Pipeline的头部设置编解码器,如此一来,对于出站数据,最终会经过Encoder编码,对于入站数据,首先要经过Decoder解码。 解码相较于编码要更加的复杂,因为它要处理TCP粘包/拆包的问题,Dubbo的解决方案是将BodyLength写入Header后4个字节,接受方首先要保证读取到一个完整的Header,然后提取出BodyLength,以此判断接收到的消息是否完整。如果不完整,会返回NEED_MORE_INPUT 代表需要等待对端传输更多的数据。
|