前言
上一篇我们讲解了解码器的相关知识,其中也提到了编码器的定义。
Netty 源码分析系列(十二)Netty 解码器
编码器就是用来把出站数据从一种格式转换到另外一种格式,因此它实现了ChannelOutboundHandler ,类似于解码器,Netty 也提供了一组类来帮助开发者快速上手编码器,当然,这些类提供的是与解码器相反的方法,如下所示:
- 编码从消息到字节(
MessageToByteEncoder )。 - 编码从消息到消息(
MessageToMessageEncoder )。
MessageToByteEncoder 抽象类
在上一篇文章中,我们知道了如何使用ByteToMessageDecoder 来将字节转换成消息,现在可以使用MessageToByteEncoder 实现相反的效果。
MessageToByteEncoder 核心代码如下:
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
private final TypeParameterMatcher matcher;
private final boolean preferDirect;
protected MessageToByteEncoder() {
this(true);
}
protected MessageToByteEncoder(Class<? extends I> outboundMessageType) {
this(outboundMessageType, true);
}
protected MessageToByteEncoder(boolean preferDirect) {
this.matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
this.preferDirect = preferDirect;
}
protected MessageToByteEncoder(Class<? extends I> outboundMessageType, boolean preferDirect) {
this.matcher = TypeParameterMatcher.get(outboundMessageType);
this.preferDirect = preferDirect;
}
public boolean acceptOutboundMessage(Object msg) throws Exception {
return this.matcher.match(msg);
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (this.acceptOutboundMessage(msg)) {
I cast = msg;
buf = this.allocateBuffer(ctx, msg, this.preferDirect);
try {
this.encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(msg);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException var17) {
throw var17;
} catch (Throwable var18) {
throw new EncoderException(var18);
} finally {
if (buf != null) {
buf.release();
}
}
}
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, I msg, boolean preferDirect) throws Exception {
return preferDirect ? ctx.alloc().ioBuffer() : ctx.alloc().heapBuffer();
}
protected abstract void encode(ChannelHandlerContext var1, I var2, ByteBuf var3) throws Exception;
protected boolean isPreferDirect() {
return this.preferDirect;
}
}
在MessageToByteEncoder 抽象类中,唯一要关注的是encode 方法,该方法是开发者需要实现的唯一抽象方法。它与出站消息一起调用,将消息编码为ByteBuf ,然后,将ByteBuf 转发到ChannelPipeline 中的下一个ChannelOutboundHandler 。
以下是MessageToByteEncoder 的使用示例:
public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
@Override
protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
out.writeShort(msg);
}
}
上述示例中,ShortToByteEncoder 收到 Short 消息,编码它们,并把它们写入ByteBuf 。然后,将ByteBuf 转发到ChannelPipeline 中的下一个ChannelOutboundHandler ,每个 Short 将占有 ByteBuf 的两个字节。
实现ShortToByteEncoder 主要分为以下两步:
- 实现继承自
MessageToByteEncoder 。 - 写 Short 到 ByteBuf。
上述的例子处理流程图如下:
Netty 也提供了很多MessageToByteEncoder 类的子类来帮助开发者实现自己的编码器,例如:
MessageToMessageEncoder 抽象类
MessageToMessageEncoder 抽象类用于将出站数据从一种消息转换为另一种消息。
核心源码如下:
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {
private final TypeParameterMatcher matcher;
protected MessageToMessageEncoder() {
matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
}
protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) {
matcher = TypeParameterMatcher.get(outboundMessageType);
}
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
CodecOutputList out = null;
try {
if (acceptOutboundMessage(msg)) {
out = CodecOutputList.newInstance();
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
encode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
if (out.isEmpty()) {
throw new EncoderException(
StringUtil.simpleClassName(this) + " must produce at least one message.");
}
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable t) {
throw new EncoderException(t);
} finally {
if (out != null) {
try {
final int sizeMinusOne = out.size() - 1;
if (sizeMinusOne == 0) {
ctx.write(out.getUnsafe(0), promise);
} else if (sizeMinusOne > 0) {
if (promise == ctx.voidPromise()) {
writeVoidPromise(ctx, out);
} else {
writePromiseCombiner(ctx, out, promise);
}
}
} finally {
out.recycle();
}
}
}
}
private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {
final ChannelPromise voidPromise = ctx.voidPromise();
for (int i = 0; i < out.size(); i++) {
ctx.write(out.getUnsafe(i), voidPromise);
}
}
private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {
final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
for (int i = 0; i < out.size(); i++) {
combiner.add(ctx.write(out.getUnsafe(i)));
}
combiner.finish(promise);
}
protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}
同MessageToByteEncoder 抽象类一样,MessageToMessageEncoder 唯一要关注的也是encode 方法,该方法是开发者需要实现的唯一抽象方法。对于使用write() 编写的每条消息,都会调用该消息,以将消息编码为一个或多个新的出站消息,然后将编码后的消息转发。
下面是使用 MessageToMessageEncoder 的一个例子:
public class IntegerToStringEncoder extends MessageToMessageEncoder <Integer> {
@Override
protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
out.add(String.ValueOf(msg));
}
}
上述示例将 Integer 消息编码为 String 消息,主要分为两步:
- 实现继承自
MessageToMessageEncoder 。 - 将Integer 转为 String,并添加到 MessageBuf。
上述例子的处理流程如下图所示:
总结
通过上述文章的讲解,我们对于编码器和解码器应该都有了一定的认识,其实针对编码和解码,Netty 还提供了第三种方式,那就是编解码器。下节我们就来讲解一下。
结尾
我是一个正在被打击还在努力前进的码农。如果文章对你有帮助,记得点赞、关注哟,谢谢!
|