负载均衡即LoadBalance。
含义:表示将一些网络请求或者其他形式的负载分摊到机器上。通过负载均衡,可以让每一台服务器负责处理适合自己能力的负载,达到让高负载服务器分流,以及避免资源浪费的目的。
LoadBalance是一个接口,只提供一个对外暴露的方法。
@SPI("random")
public interface LoadBalance {
@Adaptive({"loadbalance"})
<T> Invoker<T> select(List<Invoker<T>> var1, URL var2, Invocation var3) throws RpcException;
}
@SPI:标注是一个扩展点,各种实现配置在配置文件里(key是实现名字,类似spring里的bean name,value是实现类)
@Adaptive:Adaptive 可注解在类或方法上。当 Adaptive 注解在类上时,Dubbo 不会为该类生成代理类。
AbstractLoadBalance:是dubbo所有负载均衡策略的父类,dubbo的负载均衡策略均实现该类。
public abstract class AbstractLoadBalance implements LoadBalance {
public AbstractLoadBalance() {
}
// 用来计算权重,随着uptime增大,ww会逐渐接近weight
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int)((float)uptime / ((float)warmup / (float)weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers != null && !invokers.isEmpty()) {
boolean mustProviderIpAndPort = Boolean.valueOf(invocation.getAttachment("_must_provider_ip_port_", "false"));
if (invokers.size() == 1 && !mustProviderIpAndPort) {
return (Invoker)invokers.get(0);
} else {
String providerIp = invocation.getAttachment("_provider_ip_", "");
String providerPort = invocation.getAttachment("_provider_port_", "");
if (!providerIp.equals("") && !providerPort.equals("")) {
Optional<Invoker<T>> optional = invokers.stream().filter((it) -> {
return it.getUrl().getHost().equals(providerIp) && it.getUrl().getPort() == Integer.valueOf(providerPort);
}).findFirst();
if (optional.isPresent()) {
return this.doSelect(Arrays.asList((Invoker)optional.get()), url, invocation);
}
if (mustProviderIpAndPort) {
throw new RpcException("No provider available in " + invokers + " ip:" + providerIp + " ,port:" + providerPort);
}
}
String consumerRegion = url.getParameter("consumer.region", "");
if (StringUtils.isNotEmpty(consumerRegion)) {
List<Invoker<T>> invokerList = (List)invokers.stream().filter((it) -> {
return it.getUrl().getParameter("provider.region", "").equals(consumerRegion);
}).collect(Collectors.toList());
if (invokerList.size() > 0) {
return this.doSelect(invokerList, url, invocation);
}
}
return this.doSelect(invokers, url, invocation);
}
} else {
return null;
}
}
// 调用负载均衡,由子类实现
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> var1, URL var2, Invocation var3);
// 权重的计算过程,该过程主要用于保证当服务运行时长小于服务预热时间时,对服务进行降权,
// 避免让服务在启动之初就处于高负载状态。
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "weight", 100);
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter("remote.timestamp", 0L);
if (timestamp > 0L) {
int uptime = (int)(System.currentTimeMillis() - timestamp);
int warmup = invoker.getUrl().getParameter("warmup", 600000);
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
}
Invoker:调用者,对应一个服务接口。Invoker是Dubbo的核心模型,其它模型都向它靠拢,或转换成它,通过invoke方法执行调用,参数为Invocation(一次具体的调用,持有调用过程中的变量,包含方法名、参数类型、参数、调用者等),返回值为Result。
dubbo负载均衡策略包括:Random LoadBalance、RoundRobin LoadBalance、LeastAction LoadBalance、ConsistentHash LoadBalance
Random LoadBalance:基于随机加权算法,是dubbo的默认负载均衡策略。
随机加权算法:一般应用在以下场景——有一个集合S,里面比如有A,B,C这三项,这时我们想随机从中生成一项,但是生成的概率不同,比如我们希望生成A的概率是50%,B的概率是30%,C的概率是20%。一般来说,我们可以给各项附一个权重,生成的概率正比于这个权重。如图。
?
只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。
源码:
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
public RandomLoadBalance() {
}
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
boolean sameWeight = true;
int firstWeight = this.getWeight((Invoker)invokers.get(0), invocation);
int totalWeight = firstWeight;
int offset;
int i;
// 计算总权重totalWeight
// 比较当前服务的权重与之前服务的的权重是否相同
for(offset = 1; offset < length; ++offset) {
i = this.getWeight((Invoker)invokers.get(offset), invocation);
totalWeight += i;
if (sameWeight && i != firstWeight) {
sameWeight = false;
}
}
// 计算随机数并判断落在那个服务上
if (totalWeight > 0 && !sameWeight) {
offset = ThreadLocalRandom.current().nextInt(totalWeight);
// 如果是三个服务A、B、C,该三个服务的权重分别为(5,3,2),
// 如果随机数是7,那么7-5=2>0表明该随机数不在A的[0,5)范围内,
// 然后继续循环,2-3=-1<0表明该随机数在B的[5,8)范围,结束循环
for(i = 0; i < length; ++i) {
offset -= this.getWeight((Invoker)invokers.get(i), invocation);
if (offset < 0) {
return (Invoker)invokers.get(i);
}
}
}
// 如果服务的权重都相同,那么随机返回一个数就可以
return (Invoker)invokers.get(ThreadLocalRandom.current().nextInt(length));
}
}
RoundRobin LoadBalance:基于加权轮询算法
轮询算法:将请求轮流分配给服务器。轮询算法假设所有服务器的处理性能都相同,不关心每台服务器的当前连接数和响应速度。这种情况现实中很难实现,所以我们采用加权轮询算法。
加权轮询算法的原理:根据服务器的不同处理能力,给每个服务器分配不同的权值,使其能够接受相应权值数的服务请求。
经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。
加权轮询算法的结果,就是要生成一个服务器序列。每当有请求到来时,就依次从该序列中取出下一个服务器用于处理该请求。
比如有三个服务A、B、C,它们的权重分别为5,3,2。那么经过加权轮询算法,会生成{A,B,A,B,A,B,A,C,A,C}这样的序列,然后根据收到的请求,会把5个请求分配给A,3个请求分配给B,2个请求分配给C。
源码:
public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin";
private static int RECYCLE_PERIOD = 60000;
private ConcurrentMap<String, ConcurrentMap<String, RoundRobinLoadBalance.WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap();
private AtomicBoolean updateLock = new AtomicBoolean();
public RoundRobinLoadBalance() {
}
protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
// 获得服务的全限定名+"."+方法名,比如com.test.TestService.selectOne
// 全限定名就是类名全称,带包路径的用点隔开,比如,java.lang.String
// 非全限定名就是短名,也就是String
String key = ((Invoker)invokers.get(0)).getUrl().getServiceKey() + "." + invocation.getMethodName();
Map<String, RoundRobinLoadBalance.WeightedRoundRobin> map = (Map)this.methodWeightMap.get(key);
return map != null ? map.keySet() : null;
}
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = ((Invoker)invokers.get(0)).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 获取 url 到 WeightedRoundRobin 映射表,如果为空,则创建一个新的
ConcurrentMap<String, RoundRobinLoadBalance.WeightedRoundRobin> map = (ConcurrentMap)this.methodWeightMap.get(key);
if (map == null) {
this.methodWeightMap.putIfAbsent(key, new ConcurrentHashMap());
map = (ConcurrentMap)this.methodWeightMap.get(key);
}
// 总权重
int totalWeight = 0;
// 最大权重
long maxCurrent = -9223372036854775808L;
// 获取当前系统时间
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
RoundRobinLoadBalance.WeightedRoundRobin selectedWRR = null;
int weight;
// invokers列表遍历
for(Iterator var13 = invokers.iterator(); var13.hasNext(); totalWeight += weight) {
Invoker<T> invoker = (Invoker)var13.next();
// 存储 url 唯一标识 identifyString 到 weightedRoundRobin 的映射关系
String identifyString = invoker.getUrl().toIdentityString();
RoundRobinLoadBalance.WeightedRoundRobin weightedRoundRobin = (RoundRobinLoadBalance.WeightedRoundRobin)map.get(identifyString);
// 获取当前权重
weight = this.getWeight(invoker, invocation);
if (weight < 0) {
weight = 0;
}
// 检测当前Invoker是否有对应的weightedRoundRobin,没有则创建
if (weightedRoundRobin == null) {
weightedRoundRobin = new RoundRobinLoadBalance.WeightedRoundRobin();
// 设置权重
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(identifyString, weightedRoundRobin);
}
// weightedRoundRobin的权重不等于当前权重,表示权重变化,更新weightedRoundRobin
if (weight != weightedRoundRobin.getWeight()) {
weightedRoundRobin.setWeight(weight);
}
// 即current+=weight
long cur = weightedRoundRobin.increaseCurrent();
// 将lastUpdate设置为当前系统时间,表示更新了。
weightedRoundRobin.setLastUpdate(now);
// 取最大值
if (cur > maxCurrent) {
maxCurrent = cur;
// 当前cur为最大值时,选择的Invoker设置为当前invoker
selectedInvoker = invoker;
// 保留weightedRoundRobin以做后用
selectedWRR = weightedRoundRobin;
}
}
// 过滤长时间未被更新的节点
if (!this.updateLock.get() && invokers.size() != map.size() && this.updateLock.compareAndSet(false, true)) {
try {
ConcurrentMap<String, RoundRobinLoadBalance.WeightedRoundRobin> newMap = new ConcurrentHashMap();
newMap.putAll(map);
// 遍历,移除长时间未更新的节点
Iterator it = newMap.entrySet().iterator();
while(it.hasNext()) {
Entry<String, RoundRobinLoadBalance.WeightedRoundRobin> item = (Entry)it.next();
if (now - ((RoundRobinLoadBalance.WeightedRoundRobin)item.getValue()).getLastUpdate() > (long)RECYCLE_PERIOD) {
it.remove();
}
}
// 更新引用
this.methodWeightMap.put(key, newMap);
} finally {
this.updateLock.set(false);
}
}
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
} else {
return (Invoker)invokers.get(0);
}
}
protected static class WeightedRoundRobin {
private int weight;
private AtomicLong current = new AtomicLong(0L);
private long lastUpdate;
protected WeightedRoundRobin() {
}
public int getWeight() {
return this.weight;
}
public void setWeight(int weight) {
this.weight = weight;
this.current.set(0L);
}
public long increaseCurrent() {
return this.current.addAndGet((long)this.weight);
}
public void sel(int total) {
this.current.addAndGet((long)(-1 * total));
}
public long getLastUpdate() {
return this.lastUpdate;
}
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
}
LeastActive?LoadBalance:基于最小活跃数负载均衡算法
最小活跃数负载均衡算法:假设初始情况下,每个服务提供者活跃数为0,接受到一个请求,活跃数加一,完成请求后,活跃数减一。在相同时间内,性能好的服务提供者能够更快速的完成请求,活跃数减少得快,因此能够分配到更多的请求。
而LeastActiveLoadBalance在此基础上引入了权重值。比如,一个请求来临时,两个服务提供者的权重值更高的那个可以接受请求,两者权重值相等时,随机选择一者。
源码:
public class LeastActiveLoadBalance extends AbstractLoadBalance {
public static final String NAME = "leastactive";
public LeastActiveLoadBalance() {
}
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
// 最小活跃数
int leastActive = -1;
// 记录最小活跃数相同的服务提供者Invoker的数量
int leastCount = 0;
// 记录具有相同最小活跃数的Invoker在invokers列表的下标位置
int[] leastIndexes = new int[length];
int totalWeight = 0;
// 第一个具有最小活跃数的Invoker的权重值
int firstWeight = 0;
boolean sameWeight = true;
// 遍历invokers列表
int offsetWeight;
int leastIndex;
for(offsetWeight = 0; offsetWeight < length; ++offsetWeight) {
//获取对应权重的Invoker
Invoker<T> invoker = (Invoker)invokers.get(offsetWeight);
// 获取Invoker对应的活跃数
leastIndex = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// 获取权重
int afterWarmup = this.getWeight(invoker, invocation);
// 未发现最小活跃数
if (leastActive != -1 && leastIndex >= leastActive) {
// 当前活跃数与最小活跃数相同
if (leastIndex == leastActive) {
// 具有最小活跃数的Invoker数量累加,记录invokers列表下标位置
leastIndexes[leastCount++] = offsetWeight;
// 累加总权重
totalWeight += afterWarmup;
// 如果当前权重不等于firstWeight,则将sameWeight设置为false
if (sameWeight && offsetWeight > 0 && afterWarmup != firstWeight) {
sameWeight = false;
}
}
} else {// 发现更小活跃数,重新开始
leastActive = leastIndex;
leastCount = 1;
leastIndexes[0] = offsetWeight;
totalWeight = afterWarmup;
firstWeight = afterWarmup;
sameWeight = true;
}
}
// 具有最小活跃数的Invoker只有一个时,直接返回即可
if (leastCount == 1) {
return (Invoker)invokers.get(leastIndexes[0]);
} else {
// 当具有最小活跃数的Invoker有多个且权重不相等时
if (!sameWeight && totalWeight > 0) {
// 获得一个[1,totalWeight+1)范围的随机数
offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight) + 1;
for(int i = 0; i < leastCount; ++i) {
leastIndex = leastIndexes[i];
// 获取权重值,让随机数减去权重值
offsetWeight -= this.getWeight((Invoker)invokers.get(leastIndex), invocation);
if (offsetWeight <= 0) {
return (Invoker)invokers.get(leastIndex);
}
}
}
// 当有多个具有最小活跃值的Invoker且权重值都相等时,随机返回一个即可
return (Invoker)invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}
}
ConsistentHashLoadBalance:基于哈希一致性算法
哈希算法:根据结点的关键码值来确定其存储地址。
举例:有三台服务器A、B、C,有十个请求需要分配给三台服务器处理,可以通过hash(请求地址)%3获得一个数,这个数一定在0,1,2三个数据中,然后0对应A服务器,1对应B服务器,2对应C服务器就可以处理这些请求了。
但是当服务器增多后,不再是模3了,计算结果会改变,从而导致之前计算的数据都会失效,这种情况下,就可以采用一致性哈希算法。
一致性哈希算法:与哈希算法类似,不过公式变为hash(请求地址)%2^32,并且需要两者都要通过公式进行计算映射。
可以将2^32想象为一个圆,假设这个圆是由2^32个点组成。
?我们把这个由2的32次方个点组成的圆环称为hash环。
在一致性哈希算法中,需要对服务器进行计算,还是之前的三个服务器A、B、C,对于每个服务器都hash(服务器Ip地址)%2^32,然后将服务器对应到hash环上。
?有二十个请求,需要分配到三个服务器上,同样也用公式计算,hash(请求地址)%2^32,然后将请求映射到hash环上,从请求映射的位置出发,顺时针旋转,遇到的第一个服务器,就是给请求分配的服务器。
ConsistentHashLoadBalance就是根据这种思想实现的。同时在这里引入了虚拟节点。
虚拟节点:让 Invoker 在圆环上分散开来,避免数据倾斜问题。
数据倾斜问题:由于节点不够分散,导致大量请求落到了同一个节点上,而其他节点只会接收到了少量请求的情况。
源码:
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";
private final ConcurrentMap<String, ConsistentHashLoadBalance.ConsistentHashSelector<?>> selectors = new ConcurrentHashMap();
public ConsistentHashLoadBalance() {
}
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
// 获得服务的全限定名+"."+方法名
String key = ((Invoker)invokers.get(0)).getUrl().getServiceKey() + "." + methodName;
// 获取invokers列表原始的Hashcode
int identityHashCode = System.identityHashCode(invokers);
ConsistentHashLoadBalance.ConsistentHashSelector<T> selector = (ConsistentHashLoadBalance.ConsistentHashSelector)this.selectors.get(key);
// 服务提供者的数量发生了变化时
if (selector == null || selector.identityHashCode != identityHashCode) {
// 创建新的 ConsistentHashSelector
this.selectors.put(key, new ConsistentHashLoadBalance.ConsistentHashSelector(invokers, methodName, identityHashCode));
selector = (ConsistentHashLoadBalance.ConsistentHashSelector)this.selectors.get(key);
}
// 返回选择的Invoker
return selector.select(invocation);
}
private static final class ConsistentHashSelector<T> {
// 用TreeMap存储Invoker的虚拟节点
private final TreeMap<Long, Invoker<T>> virtualInvokers = new TreeMap();
private final int replicaNumber;
private final int identityHashCode;
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.identityHashCode = identityHashCode;
URL url = ((Invoker)invokers.get(0)).getUrl();
// 获取虚拟节点数,默认值为160
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
// 获取参与hash计算的参数下标值,默认对第一个参数进行hash计算
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
this.argumentIndex = new int[index.length];
for(int i = 0; i < index.length; ++i) {
this.argumentIndex[i] = Integer.parseInt(index[i]);
}
// 对invokers列表进行遍历
Iterator var14 = invokers.iterator();
while(var14.hasNext()) {
Invoker<T> invoker = (Invoker)var14.next();
String address = invoker.getUrl().getAddress();
for(int i = 0; i < this.replicaNumber / 4; ++i) {
// 对address+i进行mds5计算,获得一个长度为16的字节数组
byte[] digest = this.md5(address + i);
// 对于digest进行四次hash计算
for(int h = 0; h < 4; ++h) {
long m = this.hash(digest, h);
this.virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
// 将参数转为 key
String key = this.toKey(invocation.getArguments());
// 对key进行md5计算
byte[] digest = this.md5(key);
return this.selectForKey(this.hash(digest, 0));
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
int[] var3 = this.argumentIndex;
int var4 = var3.length;
for(int var5 = 0; var5 < var4; ++var5) {
int i = var3[var5];
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
private Invoker<T> selectForKey(long hash) {
Entry<Long, Invoker<T>> entry = this.virtualInvokers.ceilingEntry(hash);
// 如果 hash 大于 Invoker 在圆环上最大的位置,此时 entry = null,
// 需要将 TreeMap 的头节点赋值给 entry
if (entry == null) {
entry = this.virtualInvokers.firstEntry();
}
return (Invoker)entry.getValue();
}
private long hash(byte[] digest, int number) {
return ((long)(digest[3 + number * 4] & 255) << 24 | (long)(digest[2 + number * 4] & 255) << 16 | (long)(digest[1 + number * 4] & 255) << 8 | (long)(digest[number * 4] & 255)) & 4294967295L;
}
private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException var6) {
throw new IllegalStateException(var6.getMessage(), var6);
}
md5.reset();
byte[] bytes;
try {
bytes = value.getBytes("UTF-8");
} catch (UnsupportedEncodingException var5) {
throw new IllegalStateException(var5.getMessage(), var5);
}
md5.update(bytes);
return md5.digest();
}
}
}
|