在学习Netty之前,建议大家先学习一下NIO的相关知识哈😀 传送门🚪 NIO的理解和使用
一、介绍
Netty是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端,简单来说,就是用于网络编程的底层框架,向TCP、UDP的通信,或者项目中使用到的RPC框架,如:dubbo等都是使用的Netty作为底层框架
在学习Netty之前,我们要想一下为什么要学习Netty呢?我的回答是
- 可以更好的理解NIO
- 了解到多路复用的思想
- 学习到优秀的设计模式
- 为网络编程打下良好的基础
二、服务端案例
我们来看一下Netty的服务端简单实现吧😇
public class TestServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline ().addLast(new StringDecoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
.bind(8080);
}
}
就这十几行代码,就能够实现服务端且异步的多路复用,简单高效。我们先来解读一下这些代码
- 创建的入口是ServerBootstrap类,它是Netty的启动类其实就是包装了Netty的属性配置、子处理器、子组别等等。
- gourp组别表示创建的线程池类型,Netty中的线程池顶层接口是EventExecutorGroup,它继承了ScheduledExecutorService。
- channel表示创建的通道类型。
- childHandler是Netty的子处理器,这里有子,必然有父,那父子都是干嘛的呢?父负责的是Accept接收任务,读写任务由子处理。
- 在initChannel中有多个pipeline,这些pipeline就是具体的执行逻辑,之间的通信靠ChannelHandlerContext全局对象来传递。
三、常见类
在源码分析Netty之间,我们先来看看Netty中的比较重要的类
🌍Future
public interface Future<V> extends java.util.concurrent.Future<V> {
...
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> sync() throws InterruptedException;
...
}
可以看到,Netty中的Future继承了Java的Future,并多了一些同步和异步的方法,其实就是对java的Future的封装,以满足Netty的业务需求。
🌍Promise
public interface Promise<V> extends Future<V>
Promise继承了Future ,也可以当成一个异步接收对象。
🌍ChannelFuture
public interface ChannelFuture extends Future<Void> {}
继承了Future重写了方法以满足Channel通道上的异步结果的接收。
🌍 EventLoopGroup
public interface EventLoopGroup extends EventExecutorGroup {
...
@Override
EventLoop next();
ChannelFuture register(Channel channel);
...
}
是Netty中线程池的抽象类
🌍EventLoop
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
EventLoop可以处理所有的I/O操作。一个EventLoop 实例将处理多个channel。其实就是Netty中多路复用器的接口,具体的多路复用器实现这个接口,比如NioEventLoop。
🌍ChannelPipeline
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>>
它是各种入栈出栈处理器的集合。 ChannelInboundInvoker就是对应的入栈,可以理解为是读操作的handle。 ChannelOutboundInvoker对应的出栈,理解为是写操作的handle。
四、源码分析
下面我们来跟着源码来分析Netty
🏝?服务端的初始化过程
??体现了Netty的异步策略的代码如下 在init方法中
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
init方法在主线程中执行,使用了线程池的executre方法,也就是使用NIO线程来执行这个任务,当然,在此时NIO线程还未开启,所以,这个任务被放入到了Queue任务队列中。
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
...
}
这里开启了NIO的线程池,开始去处理任务和监听io事件。
🏝?下面我们来看看Netty的NIO线程是怎么监听事件的
看看关键代码吧😊
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
....
??需要注意的点
- 这里是由NIO的线程执行的且无限循环
- ioRatio 是io的比例,也就是执行select监测方法和执行任务的比例。默认是1:1
下面来看看select方法
private void select(boolean oldWakenUp) throws IOException {
.....
Selector selector = this.selector;
if (......) {
selector.selectNow();
}
selectCnt ++;
selector = selectRebuildSelector(selectCnt);
.....
}
??注意几点
- 这个方法重构了selector,避免了在Linux环境下出现的轮询bug
- 引入了计数器和阈值,避免一直循环
- 当某些条件成立判断channel中有可读数据时,调用selectNow方法
下面来看看服务端监听读事件并处理的过程 ??关键的代码有
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
....
🚩需要知道的是
- 读操作也是通过底层原子类unsafe来设置缓冲区buf的大小
- 将读取的数据放入buf中,然后再遍历buf,pipeline中读处理器一个字节一个字节的读取处理,当buf全部读取完毕,则调用回调方法通知。
五、小总结
- Netty中使用了很多的异步处理方法,形如addListener等字样的代码都是异步,异步的意思是当前线程创建任务,由其他的线程去执行这个任务,当某个时间完成了这个任务,则回调方法返回异步结果对象。
- Netty中封装了线程池和Future等,在继承的基础上添加了很多异步和同步的处理方法。
- Netty使用了ioRatio属性很好的权衡了io处理和任务处理的执行比例。
- Netty使用重构的方式将NIO原生的selector重构,避免了在Linux中select轮询的bug。
|