源码分析一Group构造函数
- 线程数=2*core
- SelectorProvider.provider底层通过DefaultSelectorProvider.create创建KQueueSelectorProvider对象
public NioEventLoopGroup() {
传0表示采用默认配置,2*core
this(0);
}
public NioEventLoopGroup(int nThreads) {
不传默认DefaultThreadFactory
this(nThreads, null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
KQueueSelectorProvider
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
MultithreadEventLoopGroup处理
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
MultithreadEventLoopGroup在交给父类MultithreadEventExecutorGroup处理
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
源码分析一Group父类构造函数
原理图
- 初始化children和chooser属性
MultithreadEventExecutorGroup构造函数
-
初始化eventloop数组 -
初始化线程选择器(轮询) -
newChild初始化eventloop数组元素 -
创建eventLoop失败则优雅关闭线程组 -
通过Future处理异步回调
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
创建DefaultThreadFactory
threadFactory = newDefaultThreadFactory();
}
step-1: 创建指定大小的eventloop数组
children = new SingleThreadEventExecutor[nThreads];
step-2: 每次从eventloop数组选择线程的策略:轮询
if (isPowerOfTwo(children.length)) {
2的次方则采用与运算加速性能
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
step-3: 通过new child初始化eventloop数组的元素
NioEventLoop或者EpollEventLoop等
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
创建失败则优雅关机
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
step-4: 通过listener异步监听terminationFuture的操作
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
}
源码分析一newChild
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
SelectorProvider用于创建Seletor
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
源码分析一NioEventLoop创建
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
父类创建线程
super(parent, threadFactory, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
子类负责创建选择器
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
源码分析一父类构造SingleThreadEventExecutor
原理图
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
...... 删除其他代码
创建eventLoop的线程
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
自旋执行抽象方法,实现为NioEventLoop.run
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...... 删除大量代码
}
}
});
...... 删除其他代码
...... 初始化任务队列
taskQueue = newTaskQueue();
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
源码分析一openSelector
原理图
-
被修改的selectKeys使用数组替代集合,遍历性能更好 -
selectKey的remove也无需手动调用,每次selector.select会先清空上次的selectKeys -
创建选择器KQueueSelectorImpl -
通过反射使用SelectedSelectionKeySet替换publicSelectedKeys属性类
执行select方法通过SelectedSelectionKeySet包装类会自动清空selectionKeys元素
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
step-1: 通过new KQueueSelectorImpl创建Selector
unwrappedSelector = provider.openSelector();
if (DISABLE_KEYSET_OPTIMIZATION) {
开启优化开关,需要将系统属性io.netty.noKeySetOptimization设置为true
return new SelectorTuple(unwrappedSelector);
}
step-2: 通过已经优化的SelectedSelectionKeySet替换sun.nio.ch.SelectorImpl对象中的selectedKeys和publicSelectedKeys两个HashSet集合
a 优化主要是数据结构改变,用数组替代了HashSet,
b 重写了add()和iterator()方法,使数组的遍历效率更高。
c 同时对原生selectorKeys集合的remove不需要在手动remove
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
...... 删除大量代码
修改unwrappedSelector属性类为SelectedSelectionKeySet
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
...... 删除大量代码
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
总结
- NioEventLoopGroup创建的核心是创建NioEventLoop线程组【初始化线程和任务队列】
- 为每个NioEventLoop创建选择器并通过SelectedSelectionKeySet优化选择器数据结构
扩展点一EventLoop.run
|