IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> schedulerx 分布式任务 -> 正文阅读

[大数据]schedulerx 分布式任务


schedulerx 分布式任务

??????????

官网:https://help.aliyun.com/document_detail/303822.html

???????????

???????????????????

?????????????????????????????????????

分布式编程模型

??????????

单机:一个任务实例随机在一台机器上执行,支持所有任务类型

广播:任务实例在该组下所有机器上执行,所有机器执行完成,该任务执行成功

?????????

map模型:调用map方法,可实现分布式跑批任务

# 注意事项
子任务运行失败会重试,可能执行多次,需要实现幂等功能;
不支持LocalDateTime、BigDecimal数据类型
 
# 执行方式
并行计算:最多支持300任务,有子任务列表,秒级别任务不要选择并行计算
内存网格:基于内存计算,最多支持50,000以下子任务,速度快
网格计算:基于文件计算,最多支持1,000,000子任务

?????????

MapReduce模型:map模型的拓展,新增reduce接口

# 注意事项
MapReduce模型只有一个Reduce方法,
所有子任务完成后执行Reduce方法,可在Reduce方法中返回执行结果
如果有子任务失败,Reduce不会执行,Reduce失败,整个任务实例也失败
 
# 执行方式
并行计算:最多支持300任务,有子任务列表,秒级别任务不要选择并行计算
内存网格:基于内存计算,最多支持50,000以下子任务,速度快
网格计算:基于文件计算,最多支持1,000,000子任务

?????????

分片模型

# 分片方式
静态分片:处理固定的分片数,例如分库分表中固定1024张表,需要若干台机器分布式去处理
动态分片:分布式处理未知数据量的数据,例如一张大表在不停变更,需要分布式跑批,动态分片暂未开源
 
# 功能特性
兼容elastic-job的静态分片模型
支持Java、Python、Shell、Go四种语言
高可用:分片模型基于Map模型开发,可以继承Map模型高可用特性,
       某台worker执行过程中发生异常,master worker会把分片failover到其它slave节点执行
流量控制:分片模型基于Map模型开发,可以继承Map模型流量控制特性,即可以控制单机子任务并发度
        例如有1000个分片,一共10台机器,可以控制最多5个分片并发跑,其它在队列中等待
分片自动失败重试:分片模型基于Map模型开发,可以继承Map模型子任务失败自动重试特性

???????????????

?????????

????????????????

?????????????????????????????????????

相关类与接口

??????

JavaProcessor

public abstract class JavaProcessor implements JobProcessorEx {
    public JavaProcessor() {
    }

    public void preProcess(JobContext context) {
    }

    public ProcessResult postProcess(JobContext context) {
        return null;
    }

    public void kill(JobContext context) {
    }

    public ProcessResult process(JobContext context) throws Exception {
        return null;
    }
}

?????????

MapJobProcessor

public abstract class MapJobProcessor extends JavaProcessor {
    private LogCollector logCollector = LogCollectorFactory.get();
    private static final Logger LOGGER = LogFactory.getLogger(MapJobProcessor.class);
    private static final Integer MAX_RETRY_COUNT = 3;

    public MapJobProcessor() {
    }

    public ProcessResult map(List<? extends Object> taskList, String taskName) {
        ProcessResult result = new ProcessResult(false);
        JobContext context = ContainerFactory.getContainerPool().getContext();
        ActorSelection masterAkkaSelection = SchedulerxWorker.actorSystem.actorSelection(context.getInstanceMasterActorPath());
        if (masterAkkaSelection == null) {
            String errMsg = "get taskMaster akka path error, path=" + context.getInstanceMasterActorPath();
            LOGGER.error(errMsg);
            result.setResult(errMsg);
            return result;
        } else if (CollectionUtils.isEmpty(taskList)) {
            result.setResult("task list is empty");
            return result;
        } else {
            int batchSize = ConfigUtil.getWorkerConfig().getInt("worker.map.page.size", 1000);
            int size = taskList.size();
            LOGGER.info("map task list, jobInstanceId={}, taskName={}, size={}, batchSize={}", new Object[]{context.getJobInstanceId(), taskName, size, batchSize});
            int quotient = size / batchSize;
            int remainder = size % batchSize;
            int batchNumber = remainder > 0 ? quotient + 1 : quotient;
            List<Builder> builders = Lists.newArrayList();

            int position;
            for(position = 0; position < batchNumber; ++position) {
                builders.add(WorkerMapTaskRequest.newBuilder());
            }

            position = 0;
            int maxTaskBodySize = ConfigUtil.getWorkerConfig().getInt("task.body.size.max", 65536);

            try {
                Iterator var14 = taskList.iterator();

                while(var14.hasNext()) {
                    Object task = var14.next();
                    this.checkTaskObject(task);
                    int batchIdx = position++ / batchSize;
                    byte[] taskBody = HessianUtil.toBytes(task);
                    if (taskBody.length > maxTaskBodySize) {
                        throw new IOException("taskBody size more than " + maxTaskBodySize + "B!");
                    }

                    ((Builder)builders.get(batchIdx)).addTaskBody(ByteString.copyFrom(taskBody));
                }

                position = 0;
                var14 = builders.iterator();

                while(var14.hasNext()) {
                    Builder builder = (Builder)var14.next();
                    builder.setJobId(context.getJobId());
                    builder.setJobInstanceId(context.getJobInstanceId());
                    builder.setTaskId(context.getTaskId());
                    builder.setTaskName(taskName);
                    WorkerMapTaskResponse response = null;
                    byte retryCount = 0;

                    try {
                        TaskMaster taskMaster = TaskMasterPool.INSTANCE.get(context.getJobInstanceId());
                        if (taskMaster != null && taskMaster instanceof MapTaskMaster) {
                            response = this.handleMapTask(taskMaster, builder.build());
                        } else {
                            response = (WorkerMapTaskResponse)FutureUtils.awaitResult(masterAkkaSelection, builder.build(), 30L);
                        }
                    } catch (TimeoutException var19) {
                        LOGGER.warn("JobInstanceId={} WorkerMapTaskRequest dispatch timeout.", new Object[]{context.getJobInstanceId(), var19});
                        if (retryCount >= MAX_RETRY_COUNT) {
                            throw var19;
                        }

                        Thread.sleep(10000L);
                        masterAkkaSelection = SchedulerxWorker.actorSystem.actorSelection(context.getInstanceMasterActorPath());
                        response = (WorkerMapTaskResponse)FutureUtils.awaitResult(masterAkkaSelection, builder.build(), 30L);
                        int var25 = retryCount + 1;
                    }

                    if (!response.getSuccess()) {
                        LOGGER.error(response.getMessage());
                        this.logCollector.collect(context.getUniqueId(), response.getMessage());
                        result.setResult(response.getMessage());
                        return result;
                    }

                    builders.set(position++, (Object)null);
                    if (response.hasOverload() && response.getOverload()) {
                        LOGGER.warn("Task Master is busy, sleeping a while {}s...", new Object[]{10});
                        Thread.sleep(10000L);
                    }
                }

                result.setStatus(true);
            } catch (Throwable var20) {
                LOGGER.error("JobInstanceId={} WorkerMapTaskRequest dispatch error.", new Object[]{context.getJobInstanceId(), var20});
                this.logCollector.collect(context.getUniqueId(), ExceptionUtil.getTrace(var20));
                result.setResult(ExceptionUtil.getMessage(var20));
            }

            return result;
        }
    }

    private void checkTaskObject(Object taskObject) {
        JobContext context = ContainerFactory.getContainerPool().getContext();
        boolean isAdvancedVersion = GroupManager.INSTANCE.isAdvancedVersion(context.getGroupId());
        if (isAdvancedVersion && taskObject instanceof BizSubTask) {
            BizSubTask bizSubTask = (BizSubTask)taskObject;
            Map<String, String> labelMap = bizSubTask.labelMap();
            if (labelMap.size() > 3) {
                throw new RuntimeException("label map size can't beyond 3.");
            }

            Iterator var6 = labelMap.entrySet().iterator();

            while(var6.hasNext()) {
                Entry<String, String> entry = (Entry)var6.next();
                if (((String)entry.getKey()).length() > 60 || ((String)entry.getValue()).length() > 180) {
                    LOGGER.error("Job instance={} label map<{}, {}> content can't beyond max size(60,180).", new Object[]{context.getJobInstanceId(), ((String)entry.getKey()).length(), entry.getValue()});
                    throw new RuntimeException("label map content can't beyond max size(60,180).");
                }
            }
        }

    }

    private WorkerMapTaskResponse handleMapTask(TaskMaster taskMaster, WorkerMapTaskRequest request) throws Exception {
        WorkerMapTaskResponse response = null;

        try {
            long jobInstanceId = request.getJobInstanceId();
            if (taskMaster != null) {
                if (!(taskMaster instanceof MapTaskMaster)) {
                    response = WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage("TaskMaster is not MapTaskMaster").build();
                    taskMaster.updateNewInstanceStatus(taskMaster.getSerialNum(), InstanceStatus.FAILED, "TaskMaster is not MapTaskMaster");
                } else {
                    try {
                        long startTime = System.currentTimeMillis();
                        boolean overload = ((MapTaskMaster)taskMaster).map(request.getTaskBodyList(), request.getTaskName());
                        LOGGER.debug("jobInstanceId={} map, cost={}ms", new Object[]{jobInstanceId, System.currentTimeMillis() - startTime});
                        response = WorkerMapTaskResponse.newBuilder().setSuccess(true).setOverload(overload).build();
                    } catch (Exception var9) {
                        LOGGER.error("jobInstanceId={} map error", var9);
                        taskMaster.updateNewInstanceStatus(taskMaster.getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(var9));
                        throw var9;
                    }
                }
            } else {
                response = WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage("can't found TaskMaster by jobInstanceId=" + jobInstanceId).build();
            }
        } catch (Throwable var10) {
            LOGGER.error("jobInstanceId={}, handleMapTask error.", new Object[]{request.getJobInstanceId(), var10});
            response = WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(var10)).build();
        }

        return response;
    }

    protected boolean isRootTask(JobContext context) {
        return context.getTaskName().equals("MAP_TASK_ROOT");
    }
}

??????????

MapReduceJobProcessor

public abstract class MapReduceJobProcessor extends MapJobProcessor {
    private static final Logger LOGGER = LogFactory.getLogger(MapReduceJobProcessor.class);

    public MapReduceJobProcessor() {
    }

    public abstract ProcessResult reduce(JobContext var1) throws Exception;

    /** @deprecated */
    @Deprecated
    public ProcessResult postProcess(JobContext context) {
        LOGGER.warn("MapReduceJobProcessor not support postProcess, please use reduce to instead of");
        return null;
    }

    public boolean runReduceIfFail(JobContext context) {
        return true;
    }
}

??????????????

??????????????????

?????????????????????????????????????

官网示例

??????

TestMapJobProcessor

@Component
public class TestMapJobProcessor extends MapJobProcessor {
             //发送50条消息的Demo示例(适用于Map模型)

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum = 50;
        if (isRootTask(context)) {                 //根任务
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");  //msgList:需要分割的任务列表,
                                                    //Level1Dispatch:子任务名称
                                                    //map方法:将根任务进行分片
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            System.out.println(task);
            return new ProcessResult(true);
        }   //子任务处理操作

        return new ProcessResult(false);
    }

}            

???????????

ScanSingleTableJobProcessor:将单表分片处理,每个子任务处理500条数据

@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
             //处理单表数据的Demo示例(适用于Map或MapReduce模型)

    @Service
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;  //每个子任务处理的数据量

    static class PageTask {
        private long startId;
        private long endId;

        public PageTask(long startId, long endId) {
            this.startId = startId;
            this.endId = endId;
        }

        public long getStartId() {
            return startId;
        }

        public long getEndId() {
            return endId;
        }
    }

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName = context.getJobParameters(); //多个Job后端代码可以一致,通过控制台配置Job参数表示表名。
        String taskName = context.getTaskName();   //任务名称
        Object task = context.getTask();           //获取任务
        if (isRootTask(context)) {
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);  //根据表名获取数据
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();    //任务列表
            int step = (int) ((maxId - minId) / PAGE_SIZE); //计算分页数量
            for (long i = minId; i < maxId; i+=step) {
                tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
            }
            return map(tasks, "PageTask");         //子任务名称为PageTask
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }   //子任务处理操作

        return new ProcessResult(false);
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}            

??????????

ScanShardingTableJobProcessor

@Component
public class ScanShardingTableJobProcessor extends MapJobProcessor {
             //处理分库分表数据的Demo示例(适用于Map或MapReduce模型)

    @Service
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;    //每个子任务处理500条数据

    static class PageTask {
        private String tableName;
        private long startId;
        private long endId;

        public PageTask(String tableName, long startId, long endId) {
            this.tableName = tableName;
            this.startId = startId;
            this.endId = endId;
        }

        public String getTableName() {
            return tableName;
        }

        public long getStartId() {
            return startId;
        }

        public long getEndId() {
            return endId;
        }
    }

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {          //先分库
            //先分库
            List<String> dbList = getDbList();
            return map(dbList, "DbTask");
        } else if (taskName.equals("DbTask")) {      //对分库后的子任务进一步拆分,进行分表
            //根据分库去分表
            String dbName = (String)task;
            List<String> tableList = getTableList(dbName);
            return map(tableList, "TableTask");
        } else if (taskName.equals("TableTask")) {   //对分库分表后的子任务进一步拆分,进行分页
            //如果一个分表也很大,再分页
            String tableName = (String)task;
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); //计算分页数量
            for (long i = minId; i < maxId; i+=step) {
                tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            String tableName = pageTask.getTableName();
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }   //分库、分表、分页后的子任务处理

        return new ProcessResult(false);
    }

    private List<String> getDbList() {
        List<String> dbList = Lists.newArrayList();
        //TODO 返回分库列表
        return dbList;
    }

    private List<String> getTableList(String dbName) {
        List<String> tableList = Lists.newArrayList();
        //TODO 返回分表列表
        return tableList;
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}            

????????

TestMapReduceJobProcessor

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {
             //处理50条消息并且返回子任务结果由Reduce汇总的Demo示例(适用于MapReduce模型)

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum = 50;
        if (context.getJobParameters() != null) {
            dispatchNum = Integer.valueOf(context.getJobParameters());
        }
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            Thread.sleep(2000);
            return new ProcessResult(true, task);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
            System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
        }   //获取处理结果,key为tadkId,value为处理结果
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }
}            

????????????????

????????????????????

?????????????????????????????????????

使用示例

??

应用1(端口8080)、应用2(端口8081),其余配置及类完全相同

????????????????????

?????????

application.yml

spring:
  schedulerx2:
    aliyunAccessKey: ***
    aliyunSecretKey: ***
    regionId: public
    endpoint: acm.aliyun.com
    namespaceName: lihu-test
    namespace: ***
    appName: lihu-job
    groupId: lihu-group
    appKey: OOs+XPhRsAa1Ia6+Op+NAw==
    jobs:
      mapJob:
        jobModel: parallel
        className: com.example.demo.job.CustomMapJob
        oneTime: "2022-04-18 12:35:00"
        #cron: "0 */1 * * * ?"
      mapReduceJob:
        jobModel: parallel
        className: com.example.demo.job.CustomMapReduceJob
        oneTime: "2022-04-18 12:40:00"
        #cron: "0 */1 * * * ?"

????????????

DataConfig

@Configuration
public class DataConfig {

    @Bean
    public JobSyncService initJobSyncService(){
        return new JobSyncService();
    }
}

???????

CustomMapJob

public class CustomMapJob extends MapJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        if (isRootTask(context)){
            List<String> tasks = new ArrayList<>();
            for (int i=1;i<=10;i++){
                tasks.add("任务 "+i);
            }

            return map(tasks,"子任务");
        }else if ("子任务".equals(taskName)){
            String task = (String) context.getTask();
            System.out.println("子任务是:"+task+" "+ LocalDateTime.now());

            return new ProcessResult(true);
        }

        return super.process(context);
    }
}

????????

CustomMapReduceJob

@Data
class Task{

    private Integer startNum;
    private Integer endNum;
}

public class CustomMapReduceJob extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        if (isRootTask(context)){
            List<Task> tasks = new ArrayList<>();
            for (int i=0;i<10;i++){
                Task task = new Task();
                task.setStartNum(i*10);
                task.setEndNum(i*10+10-1);

                tasks.add(task);
            }

            return map(tasks,"子任务");
        }else if ("子任务".equals(taskName)){
            Task task = (Task) context.getTask();
            int sum=0;
            for (int i=task.getStartNum();i<= task.getEndNum();i++){
                sum+=i;
            }

            System.out.println("子任务"+context.getTaskId()+" ==> "+sum);
            return new ProcessResult(true,String.valueOf(sum));
        }

        return super.process(context);
    }

    @Override
    public ProcessResult reduce(JobContext jobContext) throws Exception {
        Map<Long,String> results = jobContext.getTaskResults();
        AtomicInteger sum= new AtomicInteger(0);
        results.forEach((key,value) -> {
            System.out.println("key:"+key+" ==> "+value);
            sum.addAndGet("".equals(value)?0:Integer.parseInt(value));
        });
        System.out.println("reduce方法计算所得的总和为:"+sum);

        return new ProcessResult(true);
    }
}

????????

????????????????????

?????????????????????????????????????

使用测试

??????????

启动应用1、应用2,控制台查看任务列表

?????????????????

???????????????

??????????

应用1:控制台输出???????????????

2022-04-18 12:35:01.362  INFO 2392 --- [MapTaskMaster-6] c.a.s.worker.master.MapTaskMaster        : jobInstanceId=997943958, batch start containers successfully, size:5 , worker=010010000047_2392_43290@192.168.5.10:51542, cost=1ms
2022-04-18 12:35:01.363  INFO 2392 --- [940_997943958-1] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_1 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.363  INFO 2392 --- [940_997943958-2] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_3 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.363  INFO 2392 --- [940_997943958-3] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_5 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.364  INFO 2392 --- [940_997943958-4] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_7 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.364  INFO 2392 --- [940_997943958-0] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_9 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]

# map任务:部分在应用1执行,部分在应用2执行
子任务是:任务 9 2022-04-18T12:35:01.374
子任务是:任务 3 2022-04-18T12:35:01.374
子任务是:任务 1 2022-04-18T12:35:01.374
子任务是:任务 5 2022-04-18T12:35:01.374
子任务是:任务 7 2022-04-18T12:35:01.374

????????

应用2:控制台输出

2022-04-18 12:35:01.350  INFO 2394 --- [r-container-177] c.a.s.worker.actor.ContainerActor        : jobInstanceId=997943958, batch start containers, size:5
2022-04-18 12:35:01.390  INFO 2394 --- [940_997943958-2] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_6 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.390  INFO 2394 --- [940_997943958-3] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_8 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.390  INFO 2394 --- [940_997943958-0] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_2 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.390  INFO 2394 --- [940_997943958-4] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_10 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.390  INFO 2394 --- [940_997943958-1] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_4 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]

# map任务:部分在应用1执行、部分在应用2执行
子任务是:任务 10 2022-04-18T12:35:01.401
子任务是:任务 2 2022-04-18T12:35:01.401
子任务是:任务 8 2022-04-18T12:35:01.401
子任务是:任务 4 2022-04-18T12:35:01.401
子任务是:任务 6 2022-04-18T12:35:01.401
2022-04-18 12:35:01.401  INFO 2394 --- [940_997943958-4] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_10 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=null]
2022-04-18 12:35:01.401  INFO 2394 --- [940_997943958-3] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_8 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=null]
2022-04-18 12:35:01.401  INFO 2394 --- [940_997943958-0] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_2 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=null]
2022-04-18 12:35:01.401  INFO 2394 --- [940_997943958-1] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715940_997943958_4 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=null]

。。。


# mapReduce任务:该实例全部在应用2执行
2022-04-18 12:40:00.050  INFO 2394 --- [941_997946922-0] c.a.s.worker.master.ParallelTaskMater    : map taskList, jobInstanceId=997946922, taskName=子任务, taskList size=10
2022-04-18 12:40:00.054  INFO 2394 --- [941_997946922-0] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715941_997946922_0 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=null]
2022-04-18 12:40:01.020  INFO 2394 --- [ead-997946922-1] c.a.s.w.m.p.ServerTaskPersistence        : batch create tasks to Server successfully, jobInstanceId=997946922, size=5
2022-04-18 12:40:01.021  INFO 2394 --- [r-container-278] c.a.s.worker.actor.ContainerActor        : jobInstanceId=997946922, batch start containers, size:5
2022-04-18 12:40:01.022  INFO 2394 --- [MapTaskMaster-4] c.a.s.worker.master.MapTaskMaster        : jobInstanceId=997946922, batch start containers successfully, size:5 , worker=010010000047_2394_44529@192.168.5.10:51549, cost=1ms
2022-04-18 12:40:01.022  INFO 2394 --- [941_997946922-1] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715941_997946922_1 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
子任务1 ==> 45
2022-04-18 12:40:01.022  INFO 2394 --- [941_997946922-2] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715941_997946922_3 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:40:01.022  INFO 2394 --- [941_997946922-1] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715941_997946922_1 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=45]
子任务3 ==> 245
2022-04-18 12:40:01.022  INFO 2394 --- [941_997946922-2] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715941_997946922_3 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=245]
2022-04-18 12:40:01.022  INFO 2394 --- [941_997946922-3] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715941_997946922_5 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
子任务5 ==> 445
2022-04-18 12:40:01.022  INFO 2394 --- [941_997946922-3] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715941_997946922_5 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=445]
2022-04-18 12:40:01.023  INFO 2394 --- [941_997946922-4] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715941_997946922_7 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:40:01.023  INFO 2394 --- [941_997946922-0] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715941_997946922_9 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
子任务7 ==> 645
子任务9 ==> 845
2022-04-18 12:40:01.023  INFO 2394 --- [941_997946922-4] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715941_997946922_7 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=645]
2022-04-18 12:40:01.023  INFO 2394 --- [941_997946922-0] c.a.s.worker.container.ThreadContainer   : reportTaskStatus instanceId=715941_997946922_9 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=845]
2022-04-18 12:40:01.044  INFO 2394 --- [ead-997946922-1] c.a.s.w.batch.ContainerStatusReqHandler  : jobInstanceId=997946922 batch report status=3 to task master, size:12
2022-04-18 12:40:01.046  INFO 2394 --- [atcher-task-281] c.a.schedulerx.worker.actor.TaskActor    : jobInstanceId=997946922, batch receive task status reqs, size:12
2022-04-18 12:40:01.050  INFO 2394 --- [ead-997946922-1] c.a.s.w.m.p.ServerTaskPersistence        : batch create tasks to Server successfully, jobInstanceId=997946922, size=5
2022-04-18 12:40:01.050  INFO 2394 --- [ead-997946922-1] c.a.s.worker.batch.TaskPushReqHandler    : jobInstance=997946922, batch dispatch cost:64 ms, dispatchSize:256, size:10
2022-04-18 12:40:01.055  INFO 2394 --- [MapTaskMaster-6] c.a.s.worker.master.MapTaskMaster        : jobInstanceId=997946922, batch start containers successfully, size:5 , worker=010010000047_2392_43290@192.168.5.10:51542, cost=5ms
2022-04-18 12:40:01.994  INFO 2394 --- [hread-997946922] c.a.s.worker.batch.TMStatusReqHandler    : jobInstanceId=997946922, batch update status cost:6ms, size:12
2022-04-18 12:40:02.063  INFO 2394 --- [atcher-task-284] c.a.schedulerx.worker.actor.TaskActor    : jobInstanceId=997946922, batch receive task status reqs, size:10
2022-04-18 12:40:02.098  INFO 2394 --- [hread-997946922] c.a.s.worker.batch.TMStatusReqHandler    : jobInstanceId=997946922, batch update status cost:1ms, size:10
key:0 ==> 
key:1 ==> 45
key:2 ==> 145
key:3 ==> 245
key:4 ==> 345
key:5 ==> 445
key:6 ==> 545
key:7 ==> 645
key:8 ==> 745
key:9 ==> 845
key:10 ==> 945
reduce方法计算所得的总和为:4950

??????????? ??

??????????????????

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-18 17:49:30  更:2022-04-18 17:52:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 3:20:18-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码