13、路由策略:执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;
xxl-job官网
- FIRST
- LAST
- ROUND
- RANDOM
- CONSISTENT_HASH
- LEAST_FREQUENTLY_USED
- LEAST_RECENTLY_USED
- FAILOVER
- BUSYOVER
- SHARDING_BROADCAST
结合源码看路由策略
各个路由策略(SHARDING_BROADCAST除外)都有一个单独的类处理,继承了ExecutorRouter 抽象类,实现各自的route方法。
FIRST:第一个。使用注册地址列表中的第一个address
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){
return new ReturnT<String>(addressList.get(0));
}
LAST:最后一个。使用注册地址列表中的最后一个address
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
return new ReturnT<String>(addressList.get(addressList.size()-1));
}
ROUND :轮询。
routeCountEachJob记录了(jobId -> count)键值对。count初始化时为100以内的随机数。每次job执行时count++。
private static ConcurrentMap<Integer, AtomicInteger> routeCountEachJob = new ConcurrentHashMap<>();
执行时count(triggerParam.getJobId())获取新count(已+1),count对addressList.size()取余,会比上一次的余数也+1,由此实现轮询获取address。
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = addressList.get(count(triggerParam.getJobId())%addressList.size());
return new ReturnT<String>(address);
}
RANDOM :随机
private static Random localRandom = new Random();
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = addressList.get(localRandom.nextInt(addressList.size()));
return new ReturnT<String>(address);
}
CONSISTENT_HASH:一致性HASH
分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;
LEAST_FREQUENTLY_USED:最不经常使用
单个JOB对应的每个执行器,使用频率最低的优先被选举
LEAST_RECENTLY_USED :最近最久未使用
单个JOB对应的每个执行器,最久未使用的优先被选举
FAILOVER :故障转移
执行机器出现故障(catch (Exception e)),则使用下一个address执行。
BUSYOVER :忙碌转移
实现方法与(FAILOVER :故障转移)类似,当要执行的机器正忙(无可执行线程/需要排队),则使用下一个address执行。
SHARDING_BROADCAST:分片广播
当系统判断当前任务的路由策略是分片广播时, 就会遍历执行器的集群机器列表, 给每一台机器都发送执行消息,分片总数为集群机器数量,分片标记从0开始
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
}
.........................
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
}
|