源码
org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
String identifyString = invoker.getUrl().toIdentityString();
int weight = getWeight(invoker, invocation);
WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
WeightedRoundRobin wrr = new WeightedRoundRobin();
wrr.setWeight(weight);
return wrr;
});
if (weight != weightedRoundRobin.getWeight()) {
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
if (invokers.size() != map.size()) {
map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
}
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
return invokers.get(0);
}
protected static class WeightedRoundRobin {
private int weight;
private AtomicLong current = new AtomicLong(0);
private long lastUpdate;
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
public long increaseCurrent() {
return current.addAndGet(weight);
}
public void sel(int total) {
current.addAndGet(-1 * total);
}
public long getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
算法测试
package com.jiangzheng.course.dubbo.provider.loadbalance;
import com.google.common.collect.Lists;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class MyLoadBalance {
private ConcurrentMap<String, ConcurrentMap<String, MyLoadBalance.WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, MyLoadBalance.WeightedRoundRobin>>();
private static final int RECYCLE_PERIOD = 60000;
public static void main(String[] args) {
MyLoadBalance myLoadBalance = new MyLoadBalance();
List<Invoker> invokers = Lists.newArrayList();
for (int i = 1; i < 6; i++) {
Invoker invoker = new Invoker("IP:" + i, "--------------ServiceDemo--------------", i);
invokers.add(invoker);
}
for (int i = 0; i < 10; i++) {
Invoker invoker = myLoadBalance.doSelect(invokers, "toSelf");
System.out.println("选举成功:" + invoker.getIp());
}
}
protected Invoker doSelect(List<Invoker> invokers, String methodName) {
String key = invokers.get(0).getServiceKey() + "." + methodName;
ConcurrentMap<String, MyLoadBalance.WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker selectedInvoker = null;
MyLoadBalance.WeightedRoundRobin selectedWRR = null;
for (Invoker invoker : invokers) {
String identifyString = invoker.toIdentityString();
int weight = invoker.getWeight();
MyLoadBalance.WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
MyLoadBalance.WeightedRoundRobin wrr = new MyLoadBalance.WeightedRoundRobin();
wrr.setWeight(weight);
return wrr;
});
if (weight != weightedRoundRobin.getWeight()) {
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
if (invokers.size() != map.size()) {
map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
}
System.out.println("计算后");
print_table(map.keySet().stream().sorted().collect(Collectors.toList()), map.keySet().stream().sorted().map(e -> map.get(e).current.get() + "").collect(Collectors.toList()));
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
}
System.out.println("选举后");
print_table(map.keySet().stream().sorted().collect(Collectors.toList()), map.keySet().stream().sorted().map(e -> map.get(e).current.get() + "").collect(Collectors.toList()));
return selectedInvoker != null ? selectedInvoker : invokers.get(0);
}
protected static class Invoker {
private String ip;
private String serviceKey;
private int weight;
public Invoker(String ip, String serviceKey, int weight) {
this.ip = ip;
this.serviceKey = serviceKey;
this.weight = weight;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getServiceKey() {
return serviceKey;
}
public void setServiceKey(String serviceKey) {
this.serviceKey = serviceKey;
}
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
}
public String toIdentityString() {
return ip;
}
@Override
public String toString() {
return "Invoker{" +
"ip='" + ip + '\'' +
", serviceKey='" + serviceKey + '\'' +
", weight=" + weight +
'}';
}
}
protected static class WeightedRoundRobin {
private int weight;
private AtomicLong current = new AtomicLong(0);
private long lastUpdate;
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
public long increaseCurrent() {
return current.addAndGet(weight);
}
public void sel(int total) {
current.addAndGet(-1 * total);
}
public long getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
@Override
public String toString() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
return "WeightedRoundRobin{" +
"weight=" + weight +
", current=" + current +
", lastUpdate=" + simpleDateFormat.format(new Date(lastUpdate)) +
'}';
}
}
static void print_table(List<String> title, List<String> table) {
print_table(title.toArray(new String[title.size()]), title.size(), title.size());
print_table(table.toArray(new String[table.size()]), table.size(), table.size());
}
static void print_table(String[] table, int size, int wide) {
for (int i = 0; i < wide; ++i) {
System.out.print("|");
int len = table[i].length();
int left_space = (size - len) % 2 == 0 ? (size - len) / 2 : (size - len) / 2 + 1;
int right_space = (size - len) / 2;
for (int j = 0; j < left_space; ++j) {
System.out.print(" ");
}
System.out.print(table[i]);
for (int j = 0; j < right_space; ++j) {
System.out.print(" ");
}
}
System.out.print("|\n");
for (int i = 0; i < wide; ++i) {
System.out.print("+");
for (int j = 0; j < size; ++j) {
System.out.print("-");
}
}
System.out.print("+\n");
}
}
输出结果
计算后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| 1 | 2 | 3 | 4 | 5 |
+-----+-----+-----+-----+-----+
选举后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| 1 | 2 | 3 | 4 | -10 |
+-----+-----+-----+-----+-----+
选举成功:IP:5
计算后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| 2 | 4 | 6 | 8 | -5 |
+-----+-----+-----+-----+-----+
选举后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| 2 | 4 | 6 | -7 | -5 |
+-----+-----+-----+-----+-----+
选举成功:IP:4
计算后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| 3 | 6 | 9 | -3 | 0 |
+-----+-----+-----+-----+-----+
选举后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| 3 | 6 | -6 | -3 | 0 |
+-----+-----+-----+-----+-----+
选举成功:IP:3
计算后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| 4 | 8 | -3 | 1 | 5 |
+-----+-----+-----+-----+-----+
选举后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| 4 | -7 | -3 | 1 | 5 |
+-----+-----+-----+-----+-----+
选举成功:IP:2
计算后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| 5 | -5 | 0 | 5 | 10 |
+-----+-----+-----+-----+-----+
选举后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| 5 | -5 | 0 | 5 | -5 |
+-----+-----+-----+-----+-----+
选举成功:IP:5
计算后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| 6 | -3 | 3 | 9 | 0 |
+-----+-----+-----+-----+-----+
选举后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| 6 | -3 | 3 | -6 | 0 |
+-----+-----+-----+-----+-----+
选举成功:IP:4
计算后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| 7 | -1 | 6 | -2 | 5 |
+-----+-----+-----+-----+-----+
选举后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| -8 | -1 | 6 | -2 | 5 |
+-----+-----+-----+-----+-----+
选举成功:IP:1
计算后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| -7 | 1 | 9 | 2 | 10 |
+-----+-----+-----+-----+-----+
选举后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| -7 | 1 | 9 | 2 | -5 |
+-----+-----+-----+-----+-----+
选举成功:IP:5
计算后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| -6 | 3 | 12 | 6 | 0 |
+-----+-----+-----+-----+-----+
选举后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| -6 | 3 | -3 | 6 | 0 |
+-----+-----+-----+-----+-----+
选举成功:IP:3
计算后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| -5 | 5 | 0 | 10 | 5 |
+-----+-----+-----+-----+-----+
选举后
| IP:1| IP:2| IP:3| IP:4| IP:5|
+-----+-----+-----+-----+-----+
| -5 | 5 | 0 | -5 | 5 |
+-----+-----+-----+-----+-----+
选举成功:IP:4
Process finished with exit code 0
|