最近开始搞hbase,大概看书感觉能明白架构,但是很多细节还是需要看代码才能明白,其中balancer模块是一个比较独立的模块,而且也是分布式系统中非常重要的模块,同时在我们自己的项目中也有调度相关的模块,看看hbase有无值得学习和借鉴的。不多逼逼,直接上代码。
在HMaster的balancer()方法中会调用balancer,可以看到balancer会根据当前负载状态生成一个balance plan,但是具体对于plan的执行是由assignmentManager来负责的,这个模块代码先不讨论。
Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable = this.assignmentManager.getRegionStates().getAssignmentsByTable();
List<RegionPlan> plans = new ArrayList<RegionPlan>();
this.balancer.setClusterStatus(getClusterStatus());
for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
if (partialPlans != null) plans.addAll(partialPlans);
}
if (plans != null && !plans.isEmpty()) {
for (RegionPlan plan: plans) {
...
...
this.assignmentManager.balance(plan);
...
}
}
下面来看balancer是如何生成balance 计划的。在hbase中是有多种调度器的,在我使用的版本中默认调度器StochasticLoadBalancer。生成plan的逻辑都在balancerCluster的逻辑中。先进行了一些简单的判断:
List<RegionPlan> plans = balanceMasterRegions(clusterState);
if (plans != null || clusterState == null || clusterState.size() <= 1) {
return plans;
}
if (masterServerName != null && clusterState.containsKey(masterServerName)) {
if (clusterState.size() <= 2) {
return null;
}
clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState);
clusterState.remove(masterServerName);
}
然后使用needsBalance函数判断目前状态是否需要balance,如果不需要直接返回,这个函数后面具体分析。
Cluster cluster = new Cluster(clusterState, loads, finder, rackManager, getConf());
if (!needsBalance(cluster)) {
return null;
}
之后开始尝试生成执行计划,最多尝试生成computedMaxSteps次action,每次action即将一个region从一个server迁移到另一个server(action的生成方案后续讨论),然后计算一个执行该action之后集群状态的cost,如果这个cost比之前的小,会保留该状态,继续生成下一个action;如果这个cost比之前的大,说明这个action对于集群状态没有好处,会放弃该action。
for (step = 0; step < computedMaxSteps; step++) {
int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
CandidateGenerator p = candidateGenerators[generatorIdx];
Cluster.Action action = p.generate(cluster);
if (action.type == Type.NULL) {
continue;
}
cluster.doAction(action);
updateCostsWithAction(cluster, action);
newCost = computeCost(cluster, currentCost);
if (newCost < currentCost) {
currentCost = newCost;
} else {
Action undoAction = action.undoAction();
cluster.doAction(undoAction);
updateCostsWithAction(cluster, undoAction);
}
...
...
}
整体流程大概就是这样,其实还是挺简单的,下面再看看needsBalance函数,就是算一个region数量/server数量的均值,这是每个server应该负载的region数量,为了balancer不会来回抖动,给了个范围,只要server的region数量在这个区间范围内,就认为其是均衡状态,如果所有server都是均衡状态就不需要调度。
protected boolean needsBalance(Cluster c) {
ClusterLoadState cs = new ClusterLoadState(c.clusterState);
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not running balancer because only " + cs.getNumServers()
+ " active regionserver(s)");
}
return false;
}
if(areSomeRegionReplicasColocated(c)) return true;
float average = cs.getLoadAverage();
int floor = (int) Math.floor(average * (1 - slop));
int ceiling = (int) Math.ceil(average * (1 + slop));
if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping load balancing because balanced cluster; " +
"servers=" + cs.getNumServers() +
" regions=" + cs.getNumRegions() + " average=" + average +
" mostloaded=" + serversByLoad.lastKey().getLoad() +
" leastloaded=" + serversByLoad.firstKey().getLoad());
}
return false;
}
return true;
}
下面再看看生成balance plan中两个比较核心的模块,action生成和cost计算,先来看cost计算,可以看到会调用多个costFunction为当前cluster计算cost并求和。每个costFunction算出来之后还会乘一个权重,这个权重是可调的配置值。
protected double computeCost(Cluster cluster, double previousCost) {
double total = 0;
for (CostFunction c:costFunctions) {
if (c.getMultiplier() <= 0) {
continue;
}
total += c.getMultiplier() * c.cost();
if (total > previousCost) {
return total;
}
}
return total;
}
costFunction由以下多个函数组成:
costFunctions = new CostFunction[]{
new RegionCountSkewCostFunction(conf),
new PrimaryRegionCountSkewCostFunction(conf),
new MoveCostFunction(conf),
localityCost,
new TableSkewCostFunction(conf),
regionReplicaHostCostFunction,
regionReplicaRackCostFunction,
regionLoadFunctions[0],
regionLoadFunctions[1],
regionLoadFunctions[2],
regionLoadFunctions[3],
};
}
下面挑一个默认权重最高的RegionCountSkewCostFunction进行分析,计算cost的函数非常简单,regionsPerServer是个二维数组, 其中存放的数据是serverIndex => region list,通过其计算出了stats,即每个server的region数量,更具体的计算由costFromArray进行。
double cost() {
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
for (int i =0; i < cluster.numServers; i++) {
stats[i] = cluster.regionsPerServer[i].length;
}
return costFromArray(stats);
}
逻辑也非常简单,先计算最优分布和最差分布的情况,然后计算当前分布的cost,然后对最优分布和最差分布取归一化。
protected double costFromArray(double[] stats) {
double totalCost = 0;
double total = getSum(stats);
double count = stats.length;
double mean = total/count;
double max = ((count - 1) * mean) + (total - mean);
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);
min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
}
min = Math.max(0, min);
for (int i=0; i<stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}
double scaled = scale(min, max, totalCost);
return scaled;
}
其他的costFunction太多了,以后遇到了用得上再看吧。下面再看看执行的action的生成,action由CandidateGenerator生成,这也是一个接口类,balancer中有多个CandidateGenerator,每次生成action时,会随机选择一个CandidateGenerator来生成action,下面挑一个最简单的CandidateGenerator来分析一下,RandomCandidateGenerator的generate函数如下,逻辑非常简单,随机选俩server,再在server上选一个region,move一下这个region。
Cluster.Action generate(Cluster cluster) {
int thisServer = pickRandomServer(cluster);
int otherServer = pickOtherRandomServer(cluster, thisServer);
return pickRandomRegions(cluster, thisServer, otherServer);
}
剩余的其他CandidateGenerator以后用到了再看吧。
发现分布式系统中的balance策略并没有设计得特别复杂,挺朴素的。我们自己项目中的balance也是这样,主要是这种策略其实没有固定的公式和理论来指导,只能自己拍脑袋想,越简单的方案,越不容易出错,越复杂的方案,越难以收敛。设计这种balance的核心思路:
- 保留充足的阈值,在某个范围内的状态都应该认为其均衡,阈值过小容易来回抖动
- 迁移方案不需要很复杂,最简单的就是随机迁,复杂点儿的可以对每个server负载或者其他打分方式排个序啥的,然后从负载高的往负载低的迁移。
- 打分模块很重要,感觉没有银弹,一些权重参数还是得慢慢调。
|