闲得无聊捋了一遍这块的源码,做个粗糙的笔记(阅读不友好型)。
Hadoop RPC中的几个类
NameNode#main() | initial() | NameNoeRpcServer#start() | Server#start(): ----启动Responder : run方法里会调doAsyncWrite,如果一次没处理完,还会attach到channel,下次继续处理。 ----启动Listener : 绑定监听端口,注册OP_ACCEPT事件,执行doAccept方法。初始化Reader线程并start Reader线程(目前用Round-Robin方法选择Reader)。 doAccept方法中主要就是构造Connection对象,然后注册到ConnectionManager中,同时也会把这个connection添加到Reader线程的pendingConnections这个阻塞队列中等待Reader线程的run方法去take处理。 | -------- Reader: doRead()方法里 -> readAndProcess方法。 header:hrpc; data长度等于一个确切的rpc的长度,从头里获取这个值。 count = channelRead(channel, data); processOneRpc() --> Connection#processRpcRequest()->在这里构造RpcCall->internalQueueCall(入CallQueue)
---- 启动Handler: run方法里:CallQueue中take,执行RpcCall#run() -> Server#call方法 -> 找RpcInvoker调用call方法。-> sendResponse() -> doResponse(先setupResponse设置call对象里面的响应体,之后会调用Responder#doRespond方法将call添加到responseQueue里等待Responder线程处理,也即connection.sendResponse(call)方法里)。
|