IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> HBase源码阅读(一) balancer -> 正文阅读

[大数据]HBase源码阅读(一) balancer

最近开始搞hbase,大概看书感觉能明白架构,但是很多细节还是需要看代码才能明白,其中balancer模块是一个比较独立的模块,而且也是分布式系统中非常重要的模块,同时在我们自己的项目中也有调度相关的模块,看看hbase有无值得学习和借鉴的。不多逼逼,直接上代码。

在HMaster的balancer()方法中会调用balancer,可以看到balancer会根据当前负载状态生成一个balance plan,但是具体对于plan的执行是由assignmentManager来负责的,这个模块代码先不讨论。

//生成集群内的分布
//返回值是个map,映射关系是:表名=>{server=>{regions}}
Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable = this.assignmentManager.getRegionStates().getAssignmentsByTable();

List<RegionPlan> plans = new ArrayList<RegionPlan>();
//Give the balancer the current cluster state.
//给balancer设置当前集群的状态,这是调度的参考条件
this.balancer.setClusterStatus(getClusterStatus());
//开始生成调度计划,可以看到调度是以表为单位进行调度的,每次传入的参数是一个表的region在所有server中的分布
for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
  List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
  if (partialPlans != null) plans.addAll(partialPlans);
}
if (plans != null && !plans.isEmpty()) {
  for (RegionPlan plan: plans) {
  ...
  ...
  //开始调度
  this.assignmentManager.balance(plan);
  ...
   }
}

下面来看balancer是如何生成balance 计划的。在hbase中是有多种调度器的,在我使用的版本中默认调度器StochasticLoadBalancer。生成plan的逻辑都在balancerCluster的逻辑中。先进行了一些简单的判断:

//hbase.balancer.tablesOnMaster配置某些表是可以放在master上面的,不过2.0这个配置没用了
//先将应该放在master上的放过去
List<RegionPlan> plans = balanceMasterRegions(clusterState);
if (plans != null || clusterState == null || clusterState.size() <= 1) {
  return plans;
}
//server数量太少,不用balance
if (masterServerName != null && clusterState.containsKey(masterServerName)) {
  if (clusterState.size() <= 2) {
    return null;
  }
  clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState);
  clusterState.remove(masterServerName);
}

然后使用needsBalance函数判断目前状态是否需要balance,如果不需要直接返回,这个函数后面具体分析。

Cluster cluster = new Cluster(clusterState, loads, finder, rackManager, getConf());
if (!needsBalance(cluster)) {
  return null;
}

之后开始尝试生成执行计划,最多尝试生成computedMaxSteps次action,每次action即将一个region从一个server迁移到另一个server(action的生成方案后续讨论),然后计算一个执行该action之后集群状态的cost,如果这个cost比之前的小,会保留该状态,继续生成下一个action;如果这个cost比之前的大,说明这个action对于集群状态没有好处,会放弃该action。

for (step = 0; step < computedMaxSteps; step++) {
  int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
  //生成一个执行计划,生成的方式随机选择一种
  CandidateGenerator p = candidateGenerators[generatorIdx];
  Cluster.Action action = p.generate(cluster);
  if (action.type == Type.NULL) {
    continue;
  }

  cluster.doAction(action);
  updateCostsWithAction(cluster, action);
  newCost = computeCost(cluster, currentCost);
  //新状态是否比旧的更好?
  // Should this be kept?
  if (newCost < currentCost) {
    //保留状态
    currentCost = newCost;
  } else {
    //回滚状态
    // Put things back the way they were before.
    // TODO: undo by remembering old values
    Action undoAction = action.undoAction();
    cluster.doAction(undoAction);
    updateCostsWithAction(cluster, undoAction);
  }
  ...
  ...
}

整体流程大概就是这样,其实还是挺简单的,下面再看看needsBalance函数,就是算一个region数量/server数量的均值,这是每个server应该负载的region数量,为了balancer不会来回抖动,给了个范围,只要server的region数量在这个区间范围内,就认为其是均衡状态,如果所有server都是均衡状态就不需要调度。

  protected boolean needsBalance(Cluster c) {
    ClusterLoadState cs = new ClusterLoadState(c.clusterState);
    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Not running balancer because only " + cs.getNumServers()
            + " active regionserver(s)");
      }
      return false;
    }
    // region也可以有副本,尽量不会让一个region的多副本在同一台机器上
    if(areSomeRegionReplicasColocated(c)) return true;
    // Check if we even need to do any load balancing
    // HBASE-3681 check sloppiness first
    // region数量除server数量就是average
    float average = cs.getLoadAverage(); // for logging
    // 计算每个server应该负载region数量的范围区间
    int floor = (int) Math.floor(average * (1 - slop));
    int ceiling = (int) Math.ceil(average * (1 + slop));
    // 有server的region数量不符合区间范围就需要调度
    if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
      NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
      if (LOG.isTraceEnabled()) {
        // If nothing to balance, then don't say anything unless trace-level logging.
        LOG.trace("Skipping load balancing because balanced cluster; " +
          "servers=" + cs.getNumServers() +
          " regions=" + cs.getNumRegions() + " average=" + average +
          " mostloaded=" + serversByLoad.lastKey().getLoad() +
          " leastloaded=" + serversByLoad.firstKey().getLoad());
      }
      return false;
    }
    return true;
  }

下面再看看生成balance plan中两个比较核心的模块,action生成和cost计算,先来看cost计算,可以看到会调用多个costFunction为当前cluster计算cost并求和。每个costFunction算出来之后还会乘一个权重,这个权重是可调的配置值。

  protected double computeCost(Cluster cluster, double previousCost) {
    double total = 0;

    for (CostFunction c:costFunctions) {
      if (c.getMultiplier() <= 0) {
        continue;
      }
      // 使用当前costFunction计算出cost并乘上权重。
      // 权重是一个配置值
      // 默认权重中
      total += c.getMultiplier() * c.cost();

      if (total > previousCost) {
        return total;
      }
    }
    return total;
  }

costFunction由以下多个函数组成:

   costFunctions = new CostFunction[]{
      new RegionCountSkewCostFunction(conf),
      new PrimaryRegionCountSkewCostFunction(conf),
      new MoveCostFunction(conf),
      localityCost,
      new TableSkewCostFunction(conf),
      regionReplicaHostCostFunction,
      regionReplicaRackCostFunction,
      regionLoadFunctions[0],
      regionLoadFunctions[1],
      regionLoadFunctions[2],
      regionLoadFunctions[3],
    };
  }

下面挑一个默认权重最高的RegionCountSkewCostFunction进行分析,计算cost的函数非常简单,regionsPerServer是个二维数组, 其中存放的数据是serverIndex => region list,通过其计算出了stats,即每个server的region数量,更具体的计算由costFromArray进行。

    double cost() {
      if (stats == null || stats.length != cluster.numServers) {
        stats = new double[cluster.numServers];
      }

      for (int i =0; i < cluster.numServers; i++) {
        stats[i] = cluster.regionsPerServer[i].length;
      }

      return costFromArray(stats);
    }

逻辑也非常简单,先计算最优分布和最差分布的情况,然后计算当前分布的cost,然后对最优分布和最差分布取归一化。

protected double costFromArray(double[] stats) {
      double totalCost = 0;
      //该表的region总数
      double total = getSum(stats);

      // 该表分布的server数量
      double count = stats.length;
      // 每个server的region均值
      double mean = total/count;

      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
      // a zero sum cost for this to make sense.
      // 最差分布的cost,即有一个server上负载了所有region,其他server上region为0的cost
      double max = ((count - 1) * mean) + (total - mean);

      // It's possible that there aren't enough regions to go around
      // 最优分布的cost
      double min;
      if (count > total) {
        min = ((count - total) * mean) + ((1 - mean) * total);
      } else {
        // Some will have 1 more than everything else.
        int numHigh = (int) (total - (Math.floor(mean) * count));
        int numLow = (int) (count - numHigh);

        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));

      }
      min = Math.max(0, min);
      // 计算当前的cost
      for (int i=0; i<stats.length; i++) {
        double n = stats[i];
        double diff = Math.abs(mean - n);
        totalCost += diff;
      }
      // 对当前的cost在最优分布和最差分布中归一化
      double scaled =  scale(min, max, totalCost);
      return scaled;
    }

其他的costFunction太多了,以后遇到了用得上再看吧。下面再看看执行的action的生成,action由CandidateGenerator生成,这也是一个接口类,balancer中有多个CandidateGenerator,每次生成action时,会随机选择一个CandidateGenerator来生成action,下面挑一个最简单的CandidateGenerator来分析一下,RandomCandidateGenerator的generate函数如下,逻辑非常简单,随机选俩server,再在server上选一个region,move一下这个region。

    Cluster.Action generate(Cluster cluster) {

      int thisServer = pickRandomServer(cluster);

      // Pick the other server
      int otherServer = pickOtherRandomServer(cluster, thisServer);

      return pickRandomRegions(cluster, thisServer, otherServer);
    }

剩余的其他CandidateGenerator以后用到了再看吧。

发现分布式系统中的balance策略并没有设计得特别复杂,挺朴素的。我们自己项目中的balance也是这样,主要是这种策略其实没有固定的公式和理论来指导,只能自己拍脑袋想,越简单的方案,越不容易出错,越复杂的方案,越难以收敛。设计这种balance的核心思路:

  1. 保留充足的阈值,在某个范围内的状态都应该认为其均衡,阈值过小容易来回抖动
  2. 迁移方案不需要很复杂,最简单的就是随机迁,复杂点儿的可以对每个server负载或者其他打分方式排个序啥的,然后从负载高的往负载低的迁移。
  3. 打分模块很重要,感觉没有银弹,一些权重参数还是得慢慢调。
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-13 21:52:47  更:2022-03-13 21:54:06 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 17:35:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码