1 es 基于 jdk, nio sdk 搭建 rpc 网络层
nio多路复用 : SelectableChannel Selector
libs/nio :
NioSelectorGroup NioSelector ChannelFactory ChannelContext
plugins/transport-nio :
NioGropuFactory NioTranport
2 NioSelector 处理rpc消息
每个 nio selector 单线程处理ready 状态channel
void singleLoop() {
try {
closePendingChannels();
preSelect();
long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime());
int ready;
if (nanosUntilNextTask == 0) {
ready = selector.selectNow();
} else {
long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask);
// Only select until the next task needs to be run. Do not select with a value of 0 because
// that blocks without a timeout.
ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1)));
}
if (ready > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey sk = keyIterator.next();
keyIterator.remove();
if (sk.isValid()) {
try {
processKey(sk);
} catch (CancelledKeyException cke) {
eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), cke);
}
} else {
eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), new CancelledKeyException());
}
}
}
handleScheduledTasks(System.nanoTime());
} catch (ClosedSelectorException e) {
if (isOpen()) {
throw e;
}
} catch (IOException e) {
eventHandler.selectorException(e);
} catch (Exception e) {
eventHandler.uncaughtException(e);
}
}
处理监听,读写请求
void processKey(SelectionKey selectionKey) {
ChannelContext<?> context = (ChannelContext<?>) selectionKey.attachment();
if (selectionKey.isAcceptable()) {
assert context instanceof ServerChannelContext : "Only server channels can receive accept events";
ServerChannelContext serverChannelContext = (ServerChannelContext) context;
int ops = selectionKey.readyOps();
if ((ops & SelectionKey.OP_ACCEPT) != 0) {
try {
eventHandler.acceptChannel(serverChannelContext);
} catch (IOException e) {
eventHandler.acceptException(serverChannelContext, e);
}
}
} else {
assert context instanceof SocketChannelContext : "Only sockets channels can receive non-accept events";
SocketChannelContext channelContext = (SocketChannelContext) context;
int ops = selectionKey.readyOps();
if ((ops & SelectionKey.OP_CONNECT) != 0) {
attemptConnect(channelContext, true);
}
if (channelContext.isConnectComplete()) {
if (channelContext.selectorShouldClose() == false) {
if ((ops & SelectionKey.OP_WRITE) != 0) {
handleWrite(channelContext);
}
if (channelContext.selectorShouldClose() == false && (ops & SelectionKey.OP_READ) != 0) {
handleRead(channelContext);
}
}
}
eventHandler.postHandling(channelContext);
}
}
|