IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RegionServer初始化流程&zk(batch 2.2) -> 正文阅读

[大数据]RegionServer初始化流程&zk(batch 2.2)

一、创建 RegionServer实例:

入口HRegionServerCommandLine.start执行以下核心代码启动独立线程:

HRegionServer hrs = HRegionServer.constructRegionServer(regionServerClass, conf);
        hrs.start();
        hrs.join();

constructRegionServer用于实例化 RegionServer,该处通过加载configuration,用反射的方法获得 RegionServer实例:

  public static HRegionServer constructRegionServer(
      Class<? extends HRegionServer> regionServerClass,
      final Configuration conf2) {
    try {
      Constructor<? extends HRegionServer> c = regionServerClass
          .getConstructor(Configuration.class);
      return c.newInstance(conf2);
    } catch (Exception e) {
      throw new RuntimeException("Failed construction of " + "Regionserver: "
          + regionServerClass.toString(), e);
    }
  }

值得注意的是,Configuration配置类中使用了软链接来作为Registry,实践中用weak的地方很多,但是个人用的不多,曾经见过两个场景,其一是应用在缓存中,当内存不足时,比较久远的缓存信息将被垃圾回收器回收掉,第二种,是在海豚调度器中大量的外部信息加载时有被使用,考虑到部分key值丢失,会导致垃圾将内存耗尽的情况。不过当时堆内内存仍然非常紧张,最后改为了堆外内存。

该配置用于加载hadoop的所有的配置文件:

WeakHashMap<Configuration, Object> REGISTRY = new WeakHashMap();

接着上一步来说,regionServerClass.getConstructor用于实例化 RegionServer及其子类,实例化之前,执行了 checkMemberAccess用于检查 client的权限,权限不足将拒绝并抛出异常,这个权限检查就是属性和方法的 private或者 public,如果不是public,就没办法通过反射进行实例化 :

  public Constructor<T> getConstructor(Class<?>... parameterTypes)
        throws NoSuchMethodException, SecurityException {
        checkMemberAccess(Member.PUBLIC, Reflection.getCallerClass(), true);
        return getConstructor0(parameterTypes, Member.PUBLIC);
    }

这边是实例化的代码,使用有参构造的Constructor反射方法实例化 regionServer,这些参数全部来源于 hadoop特定版本的 常量值:

 public static HRegionServer constructRegionServer(
      Class<? extends HRegionServer> regionServerClass,
      final Configuration conf2) {
    try {
      Constructor<? extends HRegionServer> c = regionServerClass
          .getConstructor(Configuration.class);
      return c.newInstance(conf2);
    } catch (Exception e) {
      throw new RuntimeException("Failed construction of " + "Regionserver: "
          + regionServerClass.toString(), e);
    }
  }

二、初始化 HRegionServer:

通过start方法,进入 RegionServer的 run 方法,将是RegionServer初始化的具体工作,这段代码在 HReigonServer.java?中

1、preRegistrationInitialization:

run方法执行起首 :对 pre-registration进行初始化,其中包括??zookeeper,lease thread的初始化等。方法名为 preRegistrationInitialization(),该方法中只包含 启动 RpcServer,配置 connection和zookeeper,其他两个比较简单,不再赘述,只讲一下zk:

  private void preRegistrationInitialization() {
    try {
      initializeZooKeeper();
      setupClusterConnection();
      // Setup RPC client for master communication
      this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
          this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
    } catch (Throwable t) {
      // Call stop if error or process will stick around for ever since server
      // puts up non-daemon threads.
      this.rpcServices.stop();
      abort("Initialization of RS failed.  Hence aborting RS.", t);
    }
  }

1.1、准备工作之初始化zk:

进入?initializeZooKeeper() ,首先是一些检查工作,可以直接忽略,分别为master地址和集群状态,然后是 等待 master启动的 waitForMasterActive代码,前两项检查跟zk息息相关:

    blockAndCheckIfStopped(this.masterAddressTracker);
    blockAndCheckIfStopped(this.clusterStatusTracker);
    waitForMasterActive();

blockAndCheckIfStopped中在检查regionserver是否被shutdown时,会无限期的等待 znode,定期检查,直到可用,ZKNodeTracker tracker类用于追踪zk节点是否可用和价值,这里就是不断阻塞调调用?ZKUtil.getDataAndWatch(watcher, node) 查询 node value?,(原理可能是借助 zk的临时节点特性,如果regionserver和zk断开连接,超时以后,临时节点将被删除),另一个集群状态检查方法类似,不再展开:

  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
      throws IOException, InterruptedException {
    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
      if (this.stopped) {
        throw new IOException("Received the shutdown message while waiting.");
      }
    }
  }

其中与zk相关的只有一条,获取了一下集群的id,如果集群状态是 now up,那么就一定能获取到,否则报错 abort,程序中止(上一篇中hmaster启动时会在zk中保存 集群id):

 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);

?最后一步是 监控 快照和其他 程序,这初始化了一个?RegionServerProcedureManagerHost,该类不重要,不再赘述

2、注册协处理器Coprocessor:

注册协处理器并初始化,以防止其他协处理器想使用zk:

this.rsHost = new RegionServerCoprocessorHost(this, this.conf);

?其中核心代码为,该方法使用 ClassLoader从配置文件实例化协处理器,hbase中用 classloader的地方非常多,相当多的类都是通过 classLoader加载进来的,我认为这种方式的好处是可以根据配置文件需要反射想要用的类,灵活性非常高 ,里边用到了for循环,好像 协处理器非常多:

loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);

传入的?REGIONSERVER_COPROCESSOR_CONF_KEY 决定了 协处理器的个数,该常量默认为?hbase.coprocessor.regionserver.classes,但是目前 该常量下都有哪些协处理器不清楚,不知道是从哪里加载进来的

3、向Master注册,并接收并处理反馈:

又是阻塞程序不停地向Master发送请求,直到超时。如果发送成功,并拿到反馈,就执行核心handleReportForDutyResponse,该部分的主要工作是?运行初始化。设置 wal 并启动所有服务器线程:

3.1 在 zk中设置 ephemeral node,路径为 hbase/rs:

createMyEphemeralNode();

3.2 初始化hdfs上的文件系统:

initializeFileSystem();

??????3.2.1 根据Fs和hbase root.dir的根路径,创建 tableDescriptor:

private void initializeFileSystem() throws IOException {
    // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
    // checksum verification enabled, then automatically switch off hdfs checksum verification.
    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
    FSUtils.setFsDefault(this.conf, FSUtils.getWALRootDir(this.conf));
    this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
    this.walRootDir = FSUtils.getWALRootDir(this.conf);
    // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
    // underlying hadoop hdfs accessors will be going against wrong filesystem
    // (unless all is set to defaults).
    FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
    this. fs= new HFileSystem(this.conf, useHBaseChecksum);
    this.rootDir = FSUtils.getRootDir(this.conf);
    this.tableDescriptors = getFsTableDescriptors();
  }

  protected TableDescriptors getFsTableDescriptors() throws IOException {
    return new FSTableDescriptors(this.conf,
      this.fs, this.rootDir, !canUpdateTableDescriptor(), false, getMetaTableObserver());
  }

在createMetaTableDescriptor时,使用了建造者方法。

3.3 设置 wal和replication,并启动它们

setupWALAndReplication()
startReplicationService()

3.4设置表和regionserver的监控:metrics*,不再赘述

4、和active Master进行交互:

启动一个while循环不停地执行以下核心代码:

long now = System.currentTimeMillis();
        if ((now - lastMsg) >= msgInterval) {
          tryRegionServerReport(lastMsg, now);
          lastMsg = System.currentTimeMillis();
        }

该方法展开之后的核心代码为:

ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
    try {
      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
      request.setServer(ProtobufUtil.toServerName(this.serverName));
      request.setLoad(sl);
      rss.regionServerReport(null, request.build());

以上代码中,向master注册使用的 protoBuf,protoBuf是Google开发的一种语言无关、平台无关、可序列化结构数据的方法,可用于数据通讯协议,数据存储。所以代码分析到此为止。

4、abortRequested为真的时候,执行中断程序,不再赘述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-14 10:59:11  更:2021-07-14 11:00:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/6 16:08:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码