ChannelHandlerContext
? ChannelHandlerContext 代表了一个 ChannelHandler 和 ChannelPipeline 之间的关系,Netty会把 ChannelHandler 包装进 ChannelHandlerContext 的实例 DefaultChannelHandlerContext, 然后把 ChannelHandlerContext 作为元素来组成链表。所以 ChannelHandler 中一定有获得ChannelHandler 和 ChannelPipeline 相关的方法。
? ChannelHandlerContext主要功能是管理在同一ChannelPipeline中各个ChannelHandler的交互,所以同时ChannelHandlerContext也有很多触发事件传播相关的方法。实际上ctx也只是一个接口,里面除了获取ChannelHandler和ChannelPipeline的方法,还有fire开头的方法,表示向后传递事件。而我们要看具体实现就得找到子类,他主要有三个子类需要看:
AbstractChannelHandlerContext
? 上面的三个子类都是继承这个AbstractChannelHandlerContext抽象类,这个抽象类中的方法几乎都与网络通信相关,而且都是一个套路,比如fireChannelRead方法,这是个inbound类型的方法,当调用 ctx.fireChannelRead 方法时,首先找到链表中下一个inbound 类型的 handler,然后进行线程本地性判断,是则直接执行,不是则投入线程的队列异步执行。
HeadContext 和 TailContext
? 前面在DefaultChannelPipeline的构造函数中
? HeadContext和TailContext是defaultChannelPipeline是在创建的时候被加入Pipeline的,而且我们加入的业务handler是加入到head和tail之间,也就是HeadContext–>业务Context–>TailContext。
HeadContext
? 查看源码可以发现,HeadContext既是ChannelHandlerContext,又是ChannelHandler,而且是同时处理inbound类型和outbound类型的ChannelHandler。
? 在具体的方法上,outbound 类型事件的方法都是直接调用 unsafe 中的相关方法,而inbound 类型事件的方法则会触发事件在 ChannelPipeline 中的传播(也就是调用pipeline的firexxxx方法)。
TailContext
? 可以看见,TailContext既是ChannelHandlerContext,又是ChannelHandler,而且是处理inbound类型ChannelHandler。
? 在TailContext其实大部分的方法都是空实现,我们需要关注的是资源释放这个点,我们在前面有说过,如果在自定义的业务Handler中重写了方法对数据进行了处理,要么就手动释放资源,要么就向后传递。手动释放就没啥好说的,调用ReferenceCountUtil.release方法就可以了,但是继续向后传递,数据是如何得到释放的呢?因为Tail是最后一个handlerContext,所以肯定是在这个对象里面完成操作的,就拿ChannelRead方法来看,它里面是调用了onUnhandledInboundMessage(msg)方法,那就继续进入到这个方法:
? 可以发现在这个onUnhandledInboundMessage方法里面调用了ReferenceCountUtil.release(msg)来释放资源,也就证实了之前说的结论:要么手动释放,要么向后传递,netty在最后会进行资源的释放工作。
ChannelHandler
? ChannelHandler类似于Servlet的Filter过滤器,负责对I/O事件或者IO操作进行拦截和处理,它可以选择性的拦截和处理自己感兴趣的事件,也可以透传和终止事件的传递。
? 一般我们使用Netty主要也是基于ChannelHandler进行业务逻辑定制,例如消息编解码、打印日志、解决半包问题等。
? 在ChannelHandler中有个常用的注解:@Sharable,加上此注解可以使多个Pipeline共同使用一个ChannelHandler。
? 对于出站和入站两种Handler他们各有一个类关系图:
ChannelHandlerAdapter
? 对于大多数的 ChannelHandler 会选择性地拦截和处理某个或者某些事件,其他的事件会忽略,由下一个 ChannelHandler 进行拦截和处理。这就会导致一个问题:用户 ChannelHandler 必须要实现 ChannelHandler 的所有接口,包括它不关心的那些事件处理接口,这会导致用户代码的冗余和臃肿,代码的可维护性也会变差。
? 为了解决这个问题,Netty 提供了 ChannelHandlerAdapter 基类,它的实现要么就是事件透传,要么就是空实现,如果用户 ChannelHandler 关心某个事件,只需要覆盖ChannelHandlerAdapter 对应的方法即可,对于不关心的,可以直接继承使用父类的方法,这样子类的代码就会非常简洁和清晰。
ChannelHandler 子类
? 相对于Channel和ByteBuf等其它的组件,ChannelHandler的类继承关系稍微简单些,但是它的子类非常多。由于ChannelHandler是Netty框架和用户代码的主要扩展和定制点,所以它的子类种类繁多,功能各异。所以我们挑两个比较常用的子类----ByteToMessageDecoder和 MessageToByteEncoder来看看里面关键的方法。
ByteToMessageDecoder
? ByteToMessageDecoder 用于将 ByteBuf 转换成 Message,这个message可以是我们自定义的POJO类型,转换后继续在ChannelPipeline中往后面的入站Handler传递,ByteToMessageDecoder 继承自ChannelInboundHandlerAdapter,需要开发者实现的是 decode 方法。decode(ChannelHandlerContext ctx, ByteBuf in, List out) 方法里,in 是用来读取的数据,out 是我们转换后的对象列表。这个抽象类中的关键方法我们主要看channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
boolean var10 = false;
try {
var10 = true;
ByteBuf data = (ByteBuf)msg;
this.first = this.cumulation == null;
if (this.first) {
this.cumulation = data;
} else {
this.cumulation = this.cumulator.cumulate(ctx.alloc(), this.cumulation, data);
}
this.callDecode(ctx, this.cumulation, out);
var10 = false;
} catch (DecoderException var11) {
throw var11;
} catch (Exception var12) {
throw new DecoderException(var12);
} finally {
if (var10) {
if (this.cumulation != null && !this.cumulation.isReadable()) {
this.numReads = 0;
this.cumulation.release();
this.cumulation = null;
} else if (++this.numReads >= this.discardAfterReads) {
this.numReads = 0;
this.discardSomeReadBytes();
}
int size = out.size();
this.decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
}
if (this.cumulation != null && !this.cumulation.isReadable()) {
this.numReads = 0;
this.cumulation.release();
this.cumulation = null;
} else if (++this.numReads >= this.discardAfterReads) {
this.numReads = 0;
this.discardSomeReadBytes();
}
int size = out.size();
this.decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
} else {
ctx.fireChannelRead(msg);
}
}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while(true) {
if (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
return;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
this.decodeRemovalReentryProtection(ctx, in, out);
if (!ctx.isRemoved()) {
if (outSize == out.size()) {
if (oldInputLength != in.readableBytes()) {
continue;
}
} else {
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() did not read anything but decoded a message.");
}
if (!this.isSingleDecode()) {
continue;
}
}
}
}
return;
}
} catch (DecoderException var6) {
throw var6;
} catch (Exception var7) {
throw new DecoderException(var7);
}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
this.decodeState = 1;
boolean var8 = false;
try {
var8 = true;
this.decode(ctx, in, out);
var8 = false;
} finally {
if (var8) {
boolean removePending = this.decodeState == 2;
this.decodeState = 0;
if (removePending) {
this.handlerRemoved(ctx);
}
}
}
boolean removePending = this.decodeState == 2;
this.decodeState = 0;
if (removePending) {
this.handlerRemoved(ctx);
}
}
MessageToByteEncoder
? MessageToByteEncoder 用于将 Message 转换成 ByteBuf,转换后继续在 ChannelPipeline中往前面的出站 Handler 传递,MessageToByteEncoder 继承自ChannelOutboundHandlerAdapter,需要开发者实现的是 encode 方法。里面需要关注的方法是write方法:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (this.acceptOutboundMessage(msg)) {
I cast = msg;
buf = this.allocateBuffer(ctx, msg, this.preferDirect);
try {
this.encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(msg);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException var17) {
throw var17;
} catch (Throwable var18) {
throw new EncoderException(var18);
} finally {
if (buf != null) {
buf.release();
}
}
}
|