2021SC@SDUSC
一、RPC简介
作为一个分布式系统,HBase的设计是典型的master-salve架构,HBase中主要有Master,RegionServer,Client三个角色,而RPC是Master、RegionServer和Client三者之间通信交流的纽带。 Client Client有很多,比如hbase shell,java client API等,client没有提供任何RPC服务,只是调用RegionServer或Master提供的服务。
Master Master主要实现了MasterService和RegionServerStatus协议,分别供Client和RegionServer调用。
MaterService MasterService主要定义了获取集群状态,以及获取表的元信息,添加/删除列,assign region,enable/disable table,负载均衡等DML相关的一些服务。而Master提供了对这些服务的实现,并且供客户端去调用。比如,当我们在hbase shell中运行enable/disable table等相关的命令时,client会首先将该RPC请求发送到Master。
RegionServerStatus RegionServerStatus主要定义了RegionServer向Master汇报集群状态,RegionServer启动向Master发送RPC请求等相关的服务,而Master根据这些RPC请求信息,可以了解整个集群中RegionServer的状态。
ReginServer RegionServer主要实现了AdminService和ClientService协议,供client端调用。而与此同时,ReginServer也会调用RegionServerStatus服务,将相关信息汇报给master。
AdminService AmdinService主要定义了获取table Regin信息,操作region(Open,Flush,Split,Compact, Merge等)相关服务。
ClientService ClientService主要定了获取数据,更新,添加数据,Scan等相关的服务。
二、HBase中RPC概况
RPC(remote procedure call)即远程过程调用。对于本地调用,定义好一个函数以后,程序的其他部分通过调用该函数,就可以返回想要的结果。而RPC唯一的区别就是函数定义和函数调用通常位于不同的机器,因为涉及到不同的机器,所以RPC相比较本地函数调用多了通信部分,主要涉及到两个角色调用方(Client端)和函数定义实现(Server端)。
点击标题直接跳转到该链接
Client端RPC实现
1、HBaseRPC getProxy入口
public static VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol,long clientVersion, InetSocketAddress addr, User ticket,Configuration conf, SocketFactory factory, int rpcTimeout)
throws IOException {
VersionedProtocol proxy=getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion, addr, ticket, conf, factory, Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion);
if (serverVersion == clientVersion) {
return proxy;
}
throw new VersionMismatch(protocol.getName(), clientVersion,serverVersion);
获取代理类VersionedProtocol proxy 并发起rpc调用serverVersion ,完成JDK动态代理获取代理类。
2、getConnection()方法
protected Connection getConnection(InetSocketAddress addr,Class<? extends VersionedProtocol> protocol, User ticket,int rpcTimeout,Call call)throws IOException, InterruptedException {
if (!running.get()) {
throw new IOException("The client is stopped");
}
Connection connection;
ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
connection = createConnection(remoteId);
connections.put(remoteId, connection);
}
}
connection.addCall(call);
connection.setupIOstreams();
return connection;
}
HBaseClient的getConnection 方法,默认一个regionserver和一个master均只会建立一个socket链接,可以通过修改hbase.client.ipc.pool.size(默认值为1)增加socket链接数。 同时初始化ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); 一个regionserver对应一个ConnectionId。 synchronize(connections) 是PoolMap类的实例,如果connections中remoteId 对应的链接数量小于hbase.client.ipc.pool.size的配置值则会返回null,然后将请求添加到内部map,方便后续server返回数据时处理。 在addCall 响应后,若没有socket链接则建立socket链接
protected synchronized void setupIOstreams()
throws IOException, InterruptedException {
if (socket != null || shouldCloseConnection.get()) {
return;
}
try{
setupConnection();
this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(NetUtils.getInputStream(socket))));
this.out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(socket)));
writeHeader();
touch();
start();
}
Connection的setupIOstreams 方法是HBaseClient的内部类,在getConnection() 方法中被调用。 用setupConnection(); 创建连接,接下来this.in;this.out 初始化ID,并writeHeader(); 发送header,通过touch()更新最新互动时间,start() 启动读线程,该线程监听OS的READ事件,负责从server端读取数据。 通过if语句进行判断如果有可用的socket对象则直接返回
protected synchronized void setupConnection() throws IOException {
short ioFailures = 0;
short timeoutFailures = 0;
try{
this.socket = socketFactory.createSocket();
this.socket.setTcpNoDelay(tcpNoDelay);
this.socket.setKeepAlive(tcpKeepAlive);
}
}
Connection的setupConnection 方法是HBaseClient的内部类,在setupIOstreams() 方法中被调用,同时需要创建socket对象,默认是socketFactory是StandardSocketFactory 。
3、SelectorPool的select
int select(SelectableChannel channel, int ops, long timeout) throws IOException {
SelectorInfo info = get(channel);
SelectionKey key = null;
int ret = 0;
try {
while (true) {
long start = (timeout == 0) ? 0 : System.currentTimeMillis();
key = channel.register(info.selector, ops);
ret = info.selector.select(timeout);
if (ret != 0) {
return ret;
}
} finally {
if (key != null) {
key.cancel();
}
info.close();
return ret;
}
release(info);
}
}
SelectorInfo info = get(channel); 从pool中拿一个selector,SelectorPool维护着一个Provider列表,每个provider都有一个selector队列。ret = info.selector.select(timeout); 阻塞select,直到超时。在处理完后,清理key。
4、call
public Writable call(Writable param, InetSocketAddress addr, Class<? extends VersionedProtocol> protocol, User ticket, int rpcTimeout) {
Call call = new Call(param);
Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);
connection.sendParam(call);
while (!call.done) {
try {
call.wait(1000);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
RpcClient client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
client.call(...)
生成call对象,从connections能否获取链接,如果获取不到,生成一个,并且保存到connection中。client发起远程调用时,首先生成一个RpcClient的实例(具体实现类是RpcClientImpl),然后调用call参数(传入方法名称,参数等)。如果远端调用没有执行完,则会每隔1s钟检查一次,可见是比较低效的。
5、Reader/Writer IO操作
public void sendCall(final Call call) throws IOException {
if (callsToWrite.size() >= maxQueueSize) {
throw new IOException("Can't add " + call.toShortString()
+ " to the write queue. callsToWrite.size()=" + callsToWrite.size());
}
callsToWrite.offer(call);
BlockingRpcConnection.this.notifyAll();
}
private void writeConnectionHeader() throws IOException {
boolean isCryptoAesEnable = false;
if (saslRpcClient != null) {
boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY. getSaslQop().equalsIgnoreCase(saslRpcClient.getSaslQOP());
isCryptoAesEnable = saslEncryptionEnabled && conf.getBoolean(
CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT);
}
调用sendCall函数,生成CallFuture对象,并且push到Calls ToWrite队列。
int doIO(ByteBuffer buf, int ops) throws IOException {
while (buf.hasRemaining()) {
if (closed) {
return -1;
}
try {
int n = performIO(buf);
if (n != 0) {
return n;
}
} catch (IOException e) {
if (!channel.isOpen()) {
closed = true;
}
throw e;
}
int count = 0;
try {
count = selector.select(channel, ops, timeout);
} catch (IOException e) {
closed = true;
throw e;
}
if (count == 0) {
throw new SocketTimeoutException(timeoutExceptionString(channel,timeout, ops));
}
}
return 0;
}
这里执行reader/writer的操作,基本就是对channel的read/write操作。在输入performID(buf) 后有数据处理,则返回,有可能只处理了部分数据,上层保证数据是否完整。 若有数据都没处理,则输入count=0 继续监听对应READ/WRITE事件,继续执行读写操作,直到buffer被全部处理为止。
BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException {
if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) {
callSender = new CallSender(threadName, this.rpcClient.conf);
callSender.start();
}
另一方面,CallSender线程持续从队列中获取对象,生成Request请求发送的server端,并且把当前call对象push到connection的calls中。
public void run() {
synchronized (BlockingRpcConnection.this) {
while (!closed) {
if (callsToWrite.isEmpty()) {
try {
BlockingRpcConnection.this.wait();
} catch (InterruptedException e) {
}
continue;
}
Call call = callsToWrite.poll();
if (call.isDone()) {
continue;
}
try {
tracedWriteRequest(call);
} catch (IOException e) {
LOG.debug("call write error for {}", call.toShortString());
call.setException(e);
closeConn(e);
}
}
}
}
Connection中run方法持续从3的calls队列读取已发送的请求,检查结果是否从server返回,一旦返回将构造对应的response保存到call中。
三、总结
client端rpc实现的步骤 1、client调用call方法后,首先会把传入的参数封装成Call对象(该对象包含方法名称,调用参数,连接地址等信息),并且根据该对象获取连接信息。client端有一个Map对象connections,缓存了连接信息。如果之前有对应的连接则直接获取,否则新建连接并且缓存起来。 2、client端通过调用sendCall函数,生成CallFuture对象,并且将该对象push到CallsToWrite队列中。然后便一直等待本地调用是否成功返回,无论结果如何都将删除之前在callSender中创建的CallFuture对象,然后把结果包装成Pair<message, cellscanner="">返回。 3、另一方面,CallSender线程持续从CallsToWrite队列中获取2中push进去的对象,生成Request请求发送到server端,并且把当前的call对象push到connection的calls中。 4、此后,Connection中run方法持续从3的calls队列读取已发送的请求,检查结果是否从server返回,一旦返回将构造对应的response保存到call中
|