1.概括
dubbo 2.7 中提供了4中负载均衡算法,分别为:随机、轮询、一致性哈希、最小活跃度,分别对应于源码文件的 RandomLoadBalance.class、RoundRobinLoadBalance.class、ConsistentHashLoadBalance、LeastActiveLoadBalance.class。
2.使用
对于dubbo中负载均衡的配置使用也是很简单,只需要在对应的服务提供方或消费端配置即可,两边都配置优先取消费端,配置方式如下
服务提供端
/**
* 负载均衡
* roundrobin/random/leastactive/consistenthash
* 轮询/加权随机(默认)/最少活跃度/一致性hash
*/
@Service(loadbalance = "roundrobin") // 这里的 @Service是dubbo的注解,不是Spring的注解
public class PayServiceImpl implements IPayService {
@Override
public String pay(String id) {
System.out.println("request coming");
return "dubbo-springboot: pay————>" + id;
}
}
服务消费端
/**
* 服务调用
*/
@RestController
@RequestMapping("/pay")
public class PayController {
// roundrobin/random/leastactive/consistenthash
@Reference(loadbalance = "roundrobin")
private IPayService payService;
@GetMapping("/{id}")
public String pay(@PathVariable String id) {
return payService.pay(id);
}
}
这样就可以了
3.源码分析
这里仅分析最小活跃度的源码实现,其他的负载均衡相对简单。
如果要使用最小活跃度的负载均衡算法,可以配置最小活跃度对应的过滤器来使用,如下
@Reference(loadbalance = "leastactive", filter = "actives")
private IPayService payService;
?首先配置了 actives 过滤器,服务会使得 ActiveLimitFilter.class 这个过滤器生效,这个过滤器会在invoker被调用的时候进行活跃度的维护(invoker是dubbo中对于处理的封装,包括连接和调用处理,只要拿到invoker调用就可以完成远程通信)。
接下来是重点部分?LeastActiveLoadBalance.class 的 doSelect方法,源码如下:
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers invoker 数量
int length = invokers.size();
// The least active value of all invokers
int leastActive = -1;
// The number of invokers having the same least active value (leastActive)
int leastCount = 0;
// The index of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length];
// the weight of every invokers
int[] weights = new int[length];
// The sum of the warmup weights of all the least active invokes
int totalWeight = 0;
// The weight of the first least active invoke
int firstWeight = 0;
// Every least active invoker has the same weight value?
boolean sameWeight = true;
// Filter out all the least active invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// Get the active number of the invoke 获得活跃度,通过 uti 和 方法名
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// Get the weight of the invoke configuration. The default value is 100. 获得权重,没设置默认100
int afterWarmup = getWeight(invoker, invocation);
// save for later use 记录活跃度
weights[i] = afterWarmup;
// If it is the first invoker or the active number of the invoker is less than the current least active number
if (leastActive == -1 || active < leastActive) {
// 第一次一定能走这里,后边当拿到的invoker的活跃度小于最小活跃度,则进来这里的逻辑
// Reset the active number of the current invoker to the least active number 记录获得的最小活跃度
leastActive = active;
// Reset the number of least active invokers 拥有最小活跃度的invoker数量
leastCount = 1;
// Put the first least active invoker first in leastIndexes 记录拥有最小活跃度的invoker的下标到leastIndexes[0]
leastIndexes[0] = i;
// Reset totalWeight 总权重
totalWeight = afterWarmup;
// Record the weight the first least active invoker 记录最小活跃的的invoker的权重
firstWeight = afterWarmup; 因为最小活跃度的invoker只有1个,所以是等权的
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
// 如果当前遍历的invoker的活跃度和当前记录的最小活跃度相等,走这里
} else if (active == leastActive) {
// leastCount自增,并且记录最小活跃度的invoker的下标到leastIndexes数组
// Record the index of the least active invoker in leastIndexes order
leastIndexes[leastCount++] = i;
// 统计总权重
// Accumulate the total weight of the least active invoker
totalWeight += afterWarmup;
// 如果之前是等权,并且现在获得的最小活跃度的invoker的权重和之前最小活跃度的invoker的权重不一样,则不再等权
// If every invoker has the same weight?
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// 只有一个最小活跃度的invoker,则就选择这个
// Choose an invoker from all the least active invokers
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
// 如果不等权,并且总权重 > 0
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
// 随机获得一个总权重之内的整数
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
// 对拥有最小活跃的的invoker进行加权随机
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// 等权的情况则在最小活跃度的invoker中随机获取
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
梳理下就是:
1.拿到当前的所有 invoker(因为是集群,会有多个)
2.遍历所有invoker,拿到其中?active(活跃度在每次调用会被ActiveLimitFilter维护)最小的一个或多个invoker,并且记录并统计拥有最小活跃度的invoker的权重,不设置默认为200
3.最后如果拥有最小活跃度的invoker只有1个,则就返回这个;如果大于1个,且他们的权重不是都一样的,则加权随机(随机获得在总权重之内的非负整数,落到谁的权重范围内则就是谁);如果是等权,则直接等权随机(在拥有最小活跃度的invoker中随机获得一个)。
具体的流程看上边源码注释即可
|