IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 【xxl-job源码篇】xxl-job的10种路由策略源码解读 -> 正文阅读

[Java知识库]【xxl-job源码篇】xxl-job的10种路由策略源码解读

📖导读

xxl-job支持注册多个executor到注册中心,以保证任务能够稳定的执行,那么这些executor会以怎样的策略去执行呢,本章将从源码层面去解析xxl-job的策略的执行原理。

xxl-job为我们提供了如下策略

  • 第一个
  • 最后一个
  • 轮询
  • 随机
  • 一致性HASH
  • 最不经常使用
  • 最久未使用
  • 故障转移
  • 忙碌转移
  • 分片执行

下面将会从执行流程到路由策略做详细解读。

??执行流程

策略的执行流程如下

  1. 执行任务
  2. 选择策略
  3. 通过策略选举出要执行的服务端地址
  4. 通过http调度executor去执行任务

首先我们定位到com.xxl.job.admin.core.thread.JobTriggerPoolHelper

该类就是负责任务执行的helper,它本身是饿汉单例模式,当初始化时会调用toStart方法,通过单例实例调用了start方法,我们看一下该方法源码

public void start(){
    // 快执行线程池
    fastTriggerPool = new ThreadPoolExecutor(
            10,
            XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(1000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                }
            });

    // 慢执行线程池
    slowTriggerPool = new ThreadPoolExecutor(
            10,
            XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(2000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                }
            });
}

可以看到此处初始化了一个快线程池,一个慢线程池,为什么要用两个线程池呢,我们先看addTrigger方法

public void addTrigger(final int jobId,
                       final TriggerTypeEnum triggerType,
                       final int failRetryCount,
                       final String executorShardingParam,
                       final String executorParam,
                       final String addressList) {

    // 默认使用fastTriggerPool
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    // 如果发现任务一分钟内有大于10次的慢执行,换slowTriggerPool线程池
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {
        triggerPool_ = slowTriggerPool;
    }

    // 线程池执行
    triggerPool_.execute(new Runnable() {
        @Override
        public void run() {
            long start = System.currentTimeMillis();
            try {
                // 触发
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                // 到达下一个周期则清理上一个周期数据
                long minTim_now = System.currentTimeMillis()/60000;
                if (minTim != minTim_now) {
                    minTim = minTim_now;
                    jobTimeoutCountMap.clear();
                }

                // 记录慢任务执行次数
                long cost = System.currentTimeMillis()-start;
                if (cost > 500) {
                    // 执行时间超过500毫秒,则认定为慢任务
                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                    if (timeoutCount != null) {
                        // 记录慢任务慢的次数
                        timeoutCount.incrementAndGet();
                    }
                }

            }

        }
    });
}

执行任务时,首先判断这个任务是否是个慢任务,如果是个慢任务且慢执行的次数超过了10次将会使用slowTriggerPool慢线程池,它的统计周期为60秒,这里是个优化点,当有大量的任务被执行时,为了防止任务被阻塞,尽可能的会先让执行快的任务优先执行

我们顺着进入到XxlJobTrigger.trigger方法

public static void trigger(int jobId,   // 任务id
                           TriggerTypeEnum triggerType, // 执行来源
                           int failRetryCount,  // 失败重试次数
                           String executorShardingParam,    // 分片广播参数
                           String executorParam,    // 执行入参
                           String addressList) {    // 可用执行器的地址,用逗号分割

    // 获得任务对象
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
        return;
    }
    // 设置任务入参
    if (executorParam != null) {
        jobInfo.setExecutorParam(executorParam);
    }
    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
    // 获得执行器
    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

    // 录入执行器地址
    if (addressList!=null && addressList.trim().length()>0) {
        group.setAddressType(1);
        group.setAddressList(addressList.trim());
    }

    // 分片广播的逻辑
    int[] shardingParam = null;
    if (executorShardingParam!=null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]);
        }
    }
    // 如果是分片广播则特殊处理
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
            && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
            && shardingParam==null) {
        for (int i = 0; i < group.getRegistryList().size(); i++) {
            // 分片广播会通知每一个执行器
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
        }
    } else {
        if (shardingParam == null) {
            shardingParam = new int[]{0, 1};
        }
        // 其他执行策略
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
    }

}

可以看到分片广播是做了特殊处理的,进入processTrigger后就可以看到正式的调度了

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

    // 并行还是串行
    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
    // 路由执行策略
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    // 分片广播参数
    String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

    // 存储任务执行日志
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setJobGroup(jobInfo.getJobGroup());
    jobLog.setJobId(jobInfo.getId());
    jobLog.setTriggerTime(new Date());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

    // 初始化请求参数
    TriggerParam triggerParam = new TriggerParam();
    triggerParam.setJobId(jobInfo.getId());
    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
    triggerParam.setLogId(jobLog.getId());
    triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
    triggerParam.setGlueType(jobInfo.getGlueType());
    triggerParam.setGlueSource(jobInfo.getGlueSource());
    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
    triggerParam.setBroadcastIndex(index);
    triggerParam.setBroadcastTotal(total);

    // 决策路由执行地址
    String address = null;
    ReturnT<String> routeAddressResult = null;
    if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
            // 分片广播逻辑
            if (index < group.getRegistryList().size()) {
                address = group.getRegistryList().get(index);
            } else {
                address = group.getRegistryList().get(0);
            }
        } else {
            // 通过我们指定的策略选择地址
            routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
            if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                address = routeAddressResult.getContent();
            }
        }
    } else {
        routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
    }

    // 触发远程执行
    ReturnT<String> triggerResult = null;
    if (address != null) {
        triggerResult = runExecutor(triggerParam, address);
    } else {
        triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
    }

    // 日志信息拼接
    StringBuffer triggerMsgSb = new StringBuffer();
    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
            .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
    if (shardingParam != null) {
        triggerMsgSb.append("("+shardingParam+")");
    }
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

    triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
            .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

    // 存储任务执行日志信息
    jobLog.setExecutorAddress(address);
    jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
    jobLog.setExecutorParam(jobInfo.getExecutorParam());
    jobLog.setExecutorShardingParam(shardingParam);
    jobLog.setExecutorFailRetryCount(finalFailRetryCount);
    //jobLog.setTriggerTime();
    jobLog.setTriggerCode(triggerResult.getCode());
    jobLog.setTriggerMsg(triggerMsgSb.toString());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

    logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}

此处逻辑包含请求地址的决策,执行记录的存储,远程调度

🚩路由策略

通过以上代码我们可以看到决策最终调度的是ExecutorRouteStrategyEnum中的router下的route方法,route方法传入了当前可用的所有executor地址和TriggerParam请求参数。

public enum ExecutorRouteStrategyEnum {

    FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
    LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
    ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
    RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
    CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
    LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
    LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
    BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);

    ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
        this.title = title;
        this.router = router;
    }

    // 策略名
    private String title;
    // executor抽象类
    private ExecutorRouter router;

    public String getTitle() {
        return title;
    }
    public ExecutorRouter getRouter() {
        return router;
    }

    public static ExecutorRouteStrategyEnum match(String name, ExecutorRouteStrategyEnum defaultItem){
        if (name != null) {
            for (ExecutorRouteStrategyEnum item: ExecutorRouteStrategyEnum.values()) {
                if (item.name().equals(name)) {
                    return item;
                }
            }
        }
        return defaultItem;
    }

}

可以看到这是一个enum类,提供了ExecutorRouter抽象类,如果需要实现新的策略,只需要继承该类,实现route方法并增加一个新的范型即可

接下来我们通过源码解读一下各个策略分别是如何实现的

第一个ExecutorRouteFirst

public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){
    return new ReturnT<String>(addressList.get(0));
}

最后一个ExecutorRouteLast

public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
    return new ReturnT<String>(addressList.get(addressList.size()-1));
}

轮询ExecutorRouteRound

轮询并非是从第一个开始,而是随机选择开始的位置,每次通过自增后取模来定位到下一个地址,为了防止integer无限增大,每24小时会清除一次位置信息,重新随机定位。

private static ConcurrentMap<Integer, AtomicInteger> routeCountEachJob = new ConcurrentHashMap<>();
private static long CACHE_VALID_TIME = 0;

private static int count(int jobId) {
    if (System.currentTimeMillis() > CACHE_VALID_TIME) {
        // 缓存超时清除所有任务的位置
        routeCountEachJob.clear();
        // 缓存24小时
        CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
    }

    // 获得任务的随机数
    AtomicInteger count = routeCountEachJob.get(jobId);
    if (count == null || count.get() > 1000000) {
        // 初始化时主动Random一次,缓解首次压力
        count = new AtomicInteger(new Random().nextInt(100));
    } else {
        // 获得下一个任务
        count.addAndGet(1);
    }
    routeCountEachJob.put(jobId, count);
    return count.get();
}

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
    // 通过取模来定位到下一个执行的地址
    String address = addressList.get(count(triggerParam.getJobId())%addressList.size());
    return new ReturnT<String>(address);
}

随机ExecutorRouteRandom

很简单,就是随机数取一个

private static Random localRandom = new Random();

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
    String address = addressList.get(localRandom.nextInt(addressList.size()));
    return new ReturnT<String>(address);
}

一致性HASHExecutorRouteConsistentHash

为了保证任务能够均匀的分散在各个机器上,采用了一致性hash算法,并预设了100个虚拟节点,使地址能够尽量均匀分布

public String hashJob(int jobId, List<String> addressList) {

    // ------A1------A2-------A3------
    // -----------J1------------------
    // 使用treemap使之有序
    TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
    // 遍历所有地址
    for (String address: addressList) {
        // 生成100个虚拟节点
        for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
            long addressHash = hash("SHARD-" + address + "-NODE-" + i);
            addressRing.put(addressHash, address);
        }
    }

    // hash节点位置
    long jobHash = hash(String.valueOf(jobId));
    // 获取到在hash环中的位置
    SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
    if (!lastRing.isEmpty()) {
        // 如果不在hash环最后面则拿到下一个最近的节点
        return lastRing.get(lastRing.firstKey());
    }
    // 如果在hash环最后的位置则取环中第一个节点
    return addressRing.firstEntry().getValue();
}

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
    String address = hashJob(triggerParam.getJobId(), addressList);
    return new ReturnT<String>(address);
}

最不经常使用ExecutorRouteLFU

原理是维护了一个以任务id为单位的地址计数器,当第一次进入时,不知道谁使用最少,以随机的形式先给各个地址初始化一个数,最大的计数器值不超过地址总量。

// 外层key为jobId,value-key为地址,value-value为计数器
private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();
private static long CACHE_VALID_TIME = 0;

public String route(int jobId, List<String> addressList) {

    if (System.currentTimeMillis() > CACHE_VALID_TIME) {
        // 超过一天后重置缓存
        jobLfuMap.clear();
        // 缓存周期为一天
        CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
    }

    // 初始化结构
    HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId);     // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;
    if (lfuItemMap == null) {
        lfuItemMap = new HashMap<String, Integer>();
        jobLfuMap.putIfAbsent(jobId, lfuItemMap);   // 避免重复覆盖
    }

    for (String address: addressList) {
        if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
            lfuItemMap.put(address, new Random().nextInt(addressList.size()));  // 初始化时主动Random一次,缓解首次压力
        }
    }

    // 移除无效的地址
    List<String> delKeys = new ArrayList<>();
    for (String existKey: lfuItemMap.keySet()) {
        if (!addressList.contains(existKey)) {
            delKeys.add(existKey);
        }
    }
    if (delKeys.size() > 0) {
        for (String delKey: delKeys) {
            lfuItemMap.remove(delKey);
        }
    }

    // 根据value进行排序
    List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());
    Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
        @Override
        public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
            return o1.getValue().compareTo(o2.getValue());
        }
    });

    // 第0个就是使用最少的
    Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
    String minAddress = addressItem.getKey();
    // 将本次使用的地址计数器做+1操作
    addressItem.setValue(addressItem.getValue() + 1);

    return addressItem.getKey();
}

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
    String address = route(triggerParam.getJobId(), addressList);
    return new ReturnT<String>(address);
}

最久未使用ExecutorRouteLRU

维护了一个以任务id为单位的map,kv都是地址,实现原理是利用了LinkedHashMap存储排序的特性。

accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;

// key=jobId,value-key=address,value-value=address
private static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();
private static long CACHE_VALID_TIME = 0;

public String route(int jobId, List<String> addressList) {

    // 每24小时清除一次缓存
    if (System.currentTimeMillis() > CACHE_VALID_TIME) {
        jobLRUMap.clear();
        CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
    }

    // init lru
    LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
    if (lruItem == null) {
        lruItem = new LinkedHashMap<String, String>(16, 0.75f, true);
        jobLRUMap.putIfAbsent(jobId, lruItem);
    }

    // 初始化地址kv都是地址
    for (String address: addressList) {
        if (!lruItem.containsKey(address)) {
            lruItem.put(address, address);
        }
    }
    // 移除无效的地址
    List<String> delKeys = new ArrayList<>();
    for (String existKey: lruItem.keySet()) {
        if (!addressList.contains(existKey)) {
            delKeys.add(existKey);
        }
    }
    if (delKeys.size() > 0) {
        for (String delKey: delKeys) {
            lruItem.remove(delKey);
        }
    }

    // 直接拿到第一个即可
    String eldestKey = lruItem.entrySet().iterator().next().getKey();
    String eldestValue = lruItem.get(eldestKey);
    return eldestValue;
}

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
    String address = route(triggerParam.getJobId(), addressList);
    return new ReturnT<String>(address);
}

故障转移ExecutorRouteFailover

当调度的机器出现无法调度的情况时,则切换为另一台机器

实现原理就是通过调用机器的beat接口查看机器的返回状态来判定是否存活,如果不存活则循环下一个继续该步骤,直到找到可用机器或者无可用机器为止

public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {

    StringBuffer beatResultSB = new StringBuffer();
    for (String address : addressList) {
        // beat
        ReturnT<String> beatResult = null;
        try {
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            // 通过调度接口的形式检查机器是否存活
            beatResult = executorBiz.beat();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
        }
        // 记录检查日志
        beatResultSB.append( (beatResultSB.length()>0)?"<br><br>":"")
                .append(I18nUtil.getString("jobconf_beat") + ":")
                .append("<br>address:").append(address)
                .append("<br>code:").append(beatResult.getCode())
                .append("<br>msg:").append(beatResult.getMsg());

        if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
            // 机器正常时则返回该机器地址
            beatResult.setMsg(beatResultSB.toString());
            beatResult.setContent(address);
            return beatResult;
        }
    }
    // 没有可用的存活机器
    return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());
}

忙碌转移ExecutorRouteBusyover

当机器的excutor处于忙碌的状态时,则转移至不忙碌的机器

实现原理就是通过调用机器的idleBeat接口查看机器的返回状态来判定是否忙碌,如果处于忙碌或不可用状态则循环下一个继续该步骤,直到找到空闲且可用的机器或者没有可用机器为止

public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
    StringBuffer idleBeatResultSB = new StringBuffer();
    for (String address : addressList) {
        ReturnT<String> idleBeatResult = null;
        try {
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            // 调度接口查看是否忙碌
            idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
        }
        idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"")
                .append(I18nUtil.getString("jobconf_idleBeat") + ":")
                .append("<br>address:").append(address)
                .append("<br>code:").append(idleBeatResult.getCode())
                .append("<br>msg:").append(idleBeatResult.getMsg());

        if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
            idleBeatResult.setMsg(idleBeatResultSB.toString());
            idleBeatResult.setContent(address);
            return idleBeatResult;
        }
    }
		// 没有可用的存活且空闲的机器
    return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
}

?结语

通过本章我们了解了xxl-job的任务执行流程,以及全部路由策略的实现原理

【xxl-job源码篇01】xxl-job源码解读 神奇的时间轮 触发流程解读

【xxl-job源码篇02】注册中心 自研RPC netty的应用?

【xxl-job源码篇03】xxl-job日志系统源码解读

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-05-11 16:17:11  更:2022-05-11 16:17:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 23:24:12-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码