IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 2021SC@SDUSC HBase(三)项目代码分析——RPC通信(下) -> 正文阅读

[大数据]2021SC@SDUSC HBase(三)项目代码分析——RPC通信(下)

2021SC@SDUSC

一、RPC简介

作为一个分布式系统,HBase的设计是典型的master-salve架构,HBase中主要有Master,RegionServer,Client三个角色,而RPC是Master、RegionServer和Client三者之间通信交流的纽带。
rcp描述三个角色
Client
Client有很多,比如hbase shell,java client API等,client没有提供任何RPC服务,只是调用RegionServer或Master提供的服务。

Master
Master主要实现了MasterService和RegionServerStatus协议,分别供Client和RegionServer调用。

MaterService
MasterService主要定义了获取集群状态,以及获取表的元信息,添加/删除列,assign region,enable/disable table,负载均衡等DML相关的一些服务。而Master提供了对这些服务的实现,并且供客户端去调用。比如,当我们在hbase shell中运行enable/disable table等相关的命令时,client会首先将该RPC请求发送到Master。

RegionServerStatus
RegionServerStatus主要定义了RegionServer向Master汇报集群状态,RegionServer启动向Master发送RPC请求等相关的服务,而Master根据这些RPC请求信息,可以了解整个集群中RegionServer的状态。

ReginServer
RegionServer主要实现了AdminService和ClientService协议,供client端调用。而与此同时,ReginServer也会调用RegionServerStatus服务,将相关信息汇报给master。

AdminService
AmdinService主要定义了获取table Regin信息,操作region(Open,Flush,Split,Compact, Merge等)相关服务。

ClientService
ClientService主要定了获取数据,更新,添加数据,Scan等相关的服务。

二、HBase中RPC概况

RPC(remote procedure call)即远程过程调用。对于本地调用,定义好一个函数以后,程序的其他部分通过调用该函数,就可以返回想要的结果。而RPC唯一的区别就是函数定义和函数调用通常位于不同的机器,因为涉及到不同的机器,所以RPC相比较本地函数调用多了通信部分,主要涉及到两个角色调用方(Client端)和函数定义实现(Server端)。

Server端RPC实现

点击标题直接跳转到该链接

Client端RPC实现

1、HBaseRPC getProxy入口

public static VersionedProtocol getProxy(
      Class<? extends VersionedProtocol> protocol,long clientVersion, InetSocketAddress addr, User ticket,Configuration conf, SocketFactory factory, int rpcTimeout)
  throws IOException {
    VersionedProtocol proxy=getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion, addr, ticket, conf, factory, Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
    long serverVersion = proxy.getProtocolVersion(protocol.getName(),                                               clientVersion);
    if (serverVersion == clientVersion) {
      return proxy;
    }
    throw new VersionMismatch(protocol.getName(), clientVersion,serverVersion);

获取代理类VersionedProtocol proxy并发起rpc调用serverVersion,完成JDK动态代理获取代理类。

2、getConnection()方法

protected Connection getConnection(InetSocketAddress addr,Class<? extends VersionedProtocol> protocol, User ticket,int rpcTimeout,Call call)throws IOException, InterruptedException {
    if (!running.get()) {
      throw new IOException("The client is stopped");
    }
Connection connection;
    ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);
synchronized (connections) {
      connection = connections.get(remoteId);
      if (connection == null) {
        connection = createConnection(remoteId);
        connections.put(remoteId, connection);
      }
    }
    connection.addCall(call);
    connection.setupIOstreams();
    return connection;
  }

HBaseClient的getConnection方法,默认一个regionserver和一个master均只会建立一个socket链接,可以通过修改hbase.client.ipc.pool.size(默认值为1)增加socket链接数。
同时初始化ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);一个regionserver对应一个ConnectionId。
synchronize(connections)是PoolMap类的实例,如果connections中remoteId对应的链接数量小于hbase.client.ipc.pool.size的配置值则会返回null,然后将请求添加到内部map,方便后续server返回数据时处理。
addCall响应后,若没有socket链接则建立socket链接

protected synchronized void setupIOstreams()
        throws IOException, InterruptedException {
      if (socket != null || shouldCloseConnection.get()) {
        return;
      }
      try{
      //...
      setupConnection();
       this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(NetUtils.getInputStream(socket))));  
       this.out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(socket)));
        writeHeader();  
        //...
        touch();  
        //...
        start(); 
        //...
}

Connection的setupIOstreams方法是HBaseClient的内部类,在getConnection()方法中被调用。
setupConnection();创建连接,接下来this.in;this.out初始化ID,并writeHeader(); 发送header,通过touch()更新最新互动时间,start()启动读线程,该线程监听OS的READ事件,负责从server端读取数据。
通过if语句进行判断如果有可用的socket对象则直接返回

protected synchronized void setupConnection() throws IOException {
      short ioFailures = 0;
      short timeoutFailures = 0;
      //...
      try{
        this.socket = socketFactory.createSocket();  
        this.socket.setTcpNoDelay(tcpNoDelay);  
        this.socket.setKeepAlive(tcpKeepAlive);
        //...
      }
}

Connection的setupConnection方法是HBaseClient的内部类,在setupIOstreams()方法中被调用,同时需要创建socket对象,默认是socketFactory是StandardSocketFactory 。

3、SelectorPool的select

int select(SelectableChannel channel, int ops, long timeout) throws IOException {  
      SelectorInfo info = get(channel);  
      SelectionKey key = null;  
      int ret = 0;   
      try {  
        while (true) {  
          long start = (timeout == 0) ? 0 : System.currentTimeMillis();  
          key = channel.register(info.selector, ops);    
          ret = info.selector.select(timeout);   
          if (ret != 0) {  
            return ret;  
          }   
      } finally {  
        if (key != null) {  
          key.cancel();  
        }  
        //...
        info.close();  
        return ret;   
      }       
        release(info);  
      }  
    }

SelectorInfo info = get(channel); 从pool中拿一个selector,SelectorPool维护着一个Provider列表,每个provider都有一个selector队列。ret = info.selector.select(timeout); 阻塞select,直到超时。在处理完后,清理key。

4、call

public Writable call(Writable param, InetSocketAddress addr, Class<? extends VersionedProtocol> protocol, User ticket, int rpcTimeout)  {
    Call call = new Call(param);
    Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);
    connection.sendParam(call);                 
      while (!call.done) {
        try {
          call.wait(1000);        
        } catch (InterruptedException ignored) {
          interrupted = true;
        }
      }
RpcClient client =  new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
client.call(...)

生成call对象,从connections能否获取链接,如果获取不到,生成一个,并且保存到connection中。client发起远程调用时,首先生成一个RpcClient的实例(具体实现类是RpcClientImpl),然后调用call参数(传入方法名称,参数等)。如果远端调用没有执行完,则会每隔1s钟检查一次,可见是比较低效的。

5、Reader/Writer IO操作

 public void sendCall(final Call call) throws IOException {
      if (callsToWrite.size() >= maxQueueSize) {
        throw new IOException("Can't add " + call.toShortString()
            + " to the write queue. callsToWrite.size()=" + callsToWrite.size());
      }
      callsToWrite.offer(call);
      BlockingRpcConnection.this.notifyAll();
    }
     private void writeConnectionHeader() throws IOException {
    boolean isCryptoAesEnable = false;
    if (saslRpcClient != null) {
      boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.       getSaslQop().equalsIgnoreCase(saslRpcClient.getSaslQOP());
      isCryptoAesEnable = saslEncryptionEnabled && conf.getBoolean(
          CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT);
    }

调用sendCall函数,生成CallFuture对象,并且push到Calls ToWrite队列。

int doIO(ByteBuffer buf, int ops) throws IOException {  
  //...
   while (buf.hasRemaining()) {  
     if (closed) {  
       return -1;  
     }  
     try {  
       int n = performIO(buf);  
       if (n != 0) {  
         return n;  
       }  
     } catch (IOException e) {  
       if (!channel.isOpen()) {  
         closed = true;  
       }  
       throw e;  
     }  
     int count = 0;  
     try {  
       count = selector.select(channel, ops, timeout);    
     } catch (IOException e) { 
       closed = true;  
       throw e;  
     }   
     if (count == 0) {  
       throw new SocketTimeoutException(timeoutExceptionString(channel,timeout, ops));  
     }  
   }  
   return 0; 
 }  

这里执行reader/writer的操作,基本就是对channel的read/write操作。在输入performID(buf)后有数据处理,则返回,有可能只处理了部分数据,上层保证数据是否完整。
若有数据都没处理,则输入count=0继续监听对应READ/WRITE事件,继续执行读写操作,直到buffer被全部处理为止。

 BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException {
 //...
 if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) {
      callSender = new CallSender(threadName, this.rpcClient.conf);
      callSender.start();
 }

另一方面,CallSender线程持续从队列中获取对象,生成Request请求发送的server端,并且把当前call对象push到connection的calls中。

public void run() {
      synchronized (BlockingRpcConnection.this) {
        while (!closed) {
          if (callsToWrite.isEmpty()) {
            try {
              BlockingRpcConnection.this.wait();
            } catch (InterruptedException e) {
            }
            continue;
          }
          Call call = callsToWrite.poll();
          if (call.isDone()) {
            continue;
          }
          try {
            tracedWriteRequest(call);
          } catch (IOException e) {
            LOG.debug("call write error for {}", call.toShortString());
            call.setException(e);
            closeConn(e);
          }
        }
      }
    }

Connection中run方法持续从3的calls队列读取已发送的请求,检查结果是否从server返回,一旦返回将构造对应的response保存到call中。

三、总结

client端rpc实现的步骤
1、client调用call方法后,首先会把传入的参数封装成Call对象(该对象包含方法名称,调用参数,连接地址等信息),并且根据该对象获取连接信息。client端有一个Map对象connections,缓存了连接信息。如果之前有对应的连接则直接获取,否则新建连接并且缓存起来。
2、client端通过调用sendCall函数,生成CallFuture对象,并且将该对象push到CallsToWrite队列中。然后便一直等待本地调用是否成功返回,无论结果如何都将删除之前在callSender中创建的CallFuture对象,然后把结果包装成Pair<message, cellscanner="">返回。
3、另一方面,CallSender线程持续从CallsToWrite队列中获取2中push进去的对象,生成Request请求发送到server端,并且把当前的call对象push到connection的calls中。
4、此后,Connection中run方法持续从3的calls队列读取已发送的请求,检查结果是否从server返回,一旦返回将构造对应的response保存到call中

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-18 17:27:40  更:2021-10-18 17:29:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 6:20:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码