前言
通过Eureka的学习,我们知道Eureka客户端会定期同步服务端的微服务列表,那拿到这些服务列表又做了哪些操作呢?今天的主角ribbon的作用就是跟这些微服务打交道。
Ribbon是什么?
ribbon是款客户端的负载均衡组件。 自带了如下负载均衡算法(当然也可以自定义): (1)RoundRobinRule:轮询;
(2)RandomRule:随机;
(3)AvailabilityFilteringRule:会先过滤掉由于多次访问故障而处于断路器状态的服务,还有并发的连接数量超过阈值的服务,然后对剩余的服务列表按照轮询策略进行访问;
(4)WeightedResponseTimeRule:根据平均响应时间计算所有服务的权重,响应时间越快的服务权重越大被选中的概率越大。刚启动时如果统计信息不足,则使用RoundRobinRule(轮询)策略,等统计信息足够,会切换到WeightedResponseTimeRule;
(5)RetryRule:先按照RoundRobinRule(轮询)策略获取服务,如果获取服务失败则在指定时间内进行重试,获取可用的服务;
(6)BestAvailableRule:会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务;
(7)ZoneAvoidanceRule:复合判断Server所在区域的性能和Server的可用性选择服务器
Ribbon的原理
ribbon的主要作用:
- 服务列表:ribbon内部维护了两个服务列表,其一是所有从注册中心同步获取的服务列表A,另一个是所有可用状态的服务列表B;
- 定时ping:ribbon内部会有一个定时任务,通过ping的方式同步服务列表A中的状态,如果是可使用状态则维护在可用服务列表B中;
深入理解Ribbon
根据上面的流程我们知道RibbonLoadBalancerClient是Ribbon的入口 这里的IloadBalancer是ZoneAwareLoadBalancer; 类图如下: 我们先来看一张调用流程图,对照这流程图看源码会轻松一下
1、这里首先看一下ZoneAwareLoadBalancer构造方法
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
2、通过上面的类图关系,我们知道DynamicServerListLoadBalancer是ZoneAwareLoadBalancer的父类
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.isSecure = false;
this.useTunnel = false;
this.serverListUpdateInProgress = new AtomicBoolean(false);
this.updateAction = new NamelessClass_1();
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter)filter).setLoadBalancerStats(this.getLoadBalancerStats());
}
this.restOfInit(clientConfig);
}
3、重点在这一步
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
this.setEnablePrimingConnections(false);
this.enableAndInitLearnNewServersFeature();
this.updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections().primeConnections(this.getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
3.1、enableAndInitLearnNewServersFeature();
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", this.serverListUpdater.getClass().getSimpleName());
this.serverListUpdater.start(this.updateAction);
}
PollingServerListUpdater#start(final UpdateAction updateAction)
public synchronized void start(final UpdateAction updateAction) {
if (this.isActive.compareAndSet(false, true)) {
Runnable wrapperRunnable = new Runnable() {
public void run() {
if (!PollingServerListUpdater.this.isActive.get()) {
if (PollingServerListUpdater.this.scheduledFuture != null) {
PollingServerListUpdater.this.scheduledFuture.cancel(true);
}
} else {
try {
updateAction.doUpdate();
PollingServerListUpdater.this.lastUpdated = System.currentTimeMillis();
} catch (Exception var2) {
PollingServerListUpdater.logger.warn("Failed one update cycle", var2);
}
}
}
};
this.scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable, this.initialDelayMs, this.refreshIntervalMs, TimeUnit.MILLISECONDS);
} else {
logger.info("Already active, no-op");
}
}
public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
this.isSecure = false;
this.useTunnel = false;
this.serverListUpdateInProgress = new AtomicBoolean(false);
class NamelessClass_1 implements UpdateAction {
NamelessClass_1() {
}
public void doUpdate() {
DynamicServerListLoadBalancer.this.updateListOfServers();
}
}
this.updateAction = new NamelessClass_1();
this.initWithNiwsConfig(clientConfig);
}
3.2、updateListOfServers
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList();
if (this.serverListImpl != null) {
servers = this.serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
if (this.filter != null) {
servers = this.filter.getFilteredListOfServers((List)servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
}
}
this.updateAllServerList((List)servers);
}
BaseLoadBalancer#forceQuickPing()
public void forceQuickPing() {
if (!this.canSkipPing()) {
logger.debug("LoadBalancer [{}]: forceQuickPing invoking", this.name);
try {
(new BaseLoadBalancer.Pinger(this.pingStrategy)).runPinger();
} catch (Exception var2) {
logger.error("LoadBalancer [{}]: Error running forceQuickPing()", this.name, var2);
}
}
}
|