Hadoop源码分析(17)
1、 RPC解析
? 在文档(16)中分析了RPC的服务端的情况,并以journalnode为例解析了journalnode在启动是创建RPC服务的代码。在journalnode中会调用RPC的Builder来创建server,而builder会通过engine类的getserver方法来创建server。
? 在文档(16)中分析到创建的server实际是继承于ipc包下的Server类。而在其创建的时候会使用super方法逐步调用到父类的构造方法,调用的ipc下的Server类构造方法如下:
protected Server(String bindAddress, int port,
Class<? extends Writable> rpcRequestClass, int handlerCount,
int numReaders, int queueSizePerHandler, Configuration conf,
String serverName, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
this.bindAddress = bindAddress;
this.conf = conf;
this.portRangeConfig = portRangeConfig;
this.port = port;
this.rpcRequestClass = rpcRequestClass;
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
if (queueSizePerHandler != -1) {
this.maxQueueSize = queueSizePerHandler;
} else {
this.maxQueueSize = handlerCount * conf.getInt(
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
}
this.maxRespSize = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
if (numReaders != -1) {
this.readThreads = numReaders;
} else {
this.readThreads = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
}
this.readerPendingConnectionQueue = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
final String prefix = getQueueClassPrefix();
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize =
conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
false);
this.enabledAuthMethods = getAuthMethods(secretManager, conf);
this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);
listener = new Listener();
this.port = listener.getAddress().getPort();
connectionManager = new ConnectionManager();
this.rpcMetrics = RpcMetrics.create(this, conf);
this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
this.tcpNoDelay = conf.getBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT);
this.setLogSlowRPC(conf.getBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));
responder = new Responder();
if (secretManager != null || UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
saslPropsResolver = SaslPropertiesResolver.getInstance(conf);
}
this.exceptionsHandler.addTerseExceptions(StandbyException.class);
}
? 这里大部分都是一些赋值操作,但是有两个类需要注意。首先是第52行的listener,然后是第66行的Responder。
? 首先看listener,其构造方法如下:
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort();
selector= Selector.open();
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
Reader reader = new Reader(
"Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
reader.start();
}
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
? 这段代码实现了一个java NIO的服务端。首先是第4行创建一个serversocketchannel;然后第8行绑定这个服务器需要监听的端口;然后是第11行创建一个selector;然后是第12行到第18行这里会创建一定数量的reader,reader的构造方法很简单就是一些赋值操作而已,它主要是用来读取server接收的请求中的数据;最后是第21行将serversocketchannel注册到selector中,注册的事件是accept事件。
? 然后再看responder,其构造方法如下:
Responder() throws IOException {
this.setName("IPC Server Responder");
this.setDaemon(true);
writeSelector = Selector.open();
pending = 0;
}
? 这个方法很简单主要是赋值操作而已。
? 当server的构造方法执行完成后,文档(16)中分析的RPC的builder创建的server便成功了。同时创建rpc服务的JournalNodeRpcServer类的初始化完成了。在其初始化完成后边会调用start方法启动。调用代码如下:
?调用的start方法内容如下:
void start() {
this.server.start();
}
? 这里是直接调用的创建的rpc server的start方法,其内容如下:
public synchronized void start() {
responder.start();
listener.start();
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}
? 首先是第3行,调用responder的start方法;然后是第4行调用listener的start方法;最后是第5行到末尾,创建数个handler并启动。handler的构造方法也很简单就是一些赋值操作。
? 至此RPC server主要的几个线程类的创建与启动便分析完成了。主要有四个类:listener、reader、handler、responder。其中listener负责监听是否有请求,reader负责从listener监听到的channel中读取数据,handler主要负责执行reader读取的RPC请求。responder负责将执行完的结果返回给客户端。
? 这几个类都是线程类,首先从listener开始,其run方法如下:
public void run() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
connectionManager.startIdleScan();
while (running) {
SelectionKey key = null;
try {
getSelector().select();
Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
LOG.warn("Out of Memory in server select", e);
closeCurrentConnection(key, e);
connectionManager.closeIdle(true);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
}
LOG.info("Stopping " + Thread.currentThread().getName());
synchronized (this) {
try {
acceptChannel.close();
selector.close();
} catch (IOException e) { }
selector= null;
acceptChannel= null;
connectionManager.stopIdleScan();
connectionManager.closeAll();
}
}
? 这个方法看起来很长,但实际很简单。主要是第5行的while循环,这个循环基本就是死循环,只要服务器运行这个循环便会一直运行。然后是第8行首先通过getSelector方法拿到listener初始化时创建的selector,然后调用其select方法。这个selector中注册了serversocketchannel的accept事件,当该channel上发生了该事件后select方法便会将其设置到selectedKeys中,然后第9行拿到selectedKeys中的iterator,然后第10行到第21行使用一个while循环遍历iterator。因为listener中只注册了accept事件,所以这里只处理的accept事件。第15行先判断是否是accept事件,若是则执行doAccept方法。
? doAccept方法内容如下:
void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(true);
Reader reader = getReader();
Connection c = connectionManager.register(channel);
if (c == null) {
if (channel.isOpen()) {
IOUtils.cleanup(null, channel);
}
connectionManager.droppedConnections.getAndIncrement();
continue;
}
key.attach(c);
reader.addConnection(c);
}
}
? 首先是第2行,从SelectionKey中拿到listener创建的serversocketchannel;然后调用serversocketchannel的accept方法获取请求的channel;然后是第10行调用getReader方法获取reader,这个方法会从listener创建的reader选择一个进行返回;然后是第11行将channel注册成connection;然后是第20行将connection添加到SelectionKey中,方便在后续处理中可以拿到这个connection;最后是21行将connection添加到reader中。
? 这里我先详细分析第11行的register方法。这个方法是调用connectionManager对象的,这个对象在上文server的初始化中创建的。register方法内容如下:
Connection register(SocketChannel channel) {
if (isFull()) {
return null;
}
Connection connection = new Connection(channel, Time.now());
add(connection);
if (LOG.isDebugEnabled()) {
LOG.debug("Server connection from " + connection +
"; # active connections: " + size() +
"; # queued calls: " + callQueue.size());
}
return connection;
}
? 这里首先是第5行创建一个connection,然后是第6行将connection添加到其自身存储connection的集合中。最后第12行再返回connection。
? 最后再分析将connection添加到reader中的方法:addConnection方法。这个方法是reader的方法其内容如下:
public void addConnection(Connection conn) throws InterruptedException {
pendingConnections.put(conn);
readSelector.wakeup();
}
? 重点在第2行调用pendingConnections的put方法将添加connection。而pendingConnections是一个队列。
|