schedulerx 分布式任务
??????????
官网:https://help.aliyun.com/document_detail/303822.html
???????????
???????????????????
?????????????????????????????????????
分布式编程模型
??????????
单机:一个任务实例随机在一台机器上执行,支持所有任务类型
广播:任务实例在该组下所有机器上执行,所有机器执行完成,该任务执行成功
?????????
map模型:调用map方法,可实现分布式跑批任务
# 注意事项
子任务运行失败会重试,可能执行多次,需要实现幂等功能;
不支持LocalDateTime、BigDecimal数据类型
# 执行方式
并行计算:最多支持300任务,有子任务列表,秒级别任务不要选择并行计算
内存网格:基于内存计算,最多支持50,000以下子任务,速度快
网格计算:基于文件计算,最多支持1,000,000子任务
?????????
MapReduce模型:map模型的拓展,新增reduce接口
# 注意事项
MapReduce模型只有一个Reduce方法,
所有子任务完成后执行Reduce方法,可在Reduce方法中返回执行结果
如果有子任务失败,Reduce不会执行,Reduce失败,整个任务实例也失败
# 执行方式
并行计算:最多支持300任务,有子任务列表,秒级别任务不要选择并行计算
内存网格:基于内存计算,最多支持50,000以下子任务,速度快
网格计算:基于文件计算,最多支持1,000,000子任务
?????????
分片模型
# 分片方式
静态分片:处理固定的分片数,例如分库分表中固定1024张表,需要若干台机器分布式去处理
动态分片:分布式处理未知数据量的数据,例如一张大表在不停变更,需要分布式跑批,动态分片暂未开源
# 功能特性
兼容elastic-job的静态分片模型
支持Java、Python、Shell、Go四种语言
高可用:分片模型基于Map模型开发,可以继承Map模型高可用特性,
某台worker执行过程中发生异常,master worker会把分片failover到其它slave节点执行
流量控制:分片模型基于Map模型开发,可以继承Map模型流量控制特性,即可以控制单机子任务并发度
例如有1000个分片,一共10台机器,可以控制最多5个分片并发跑,其它在队列中等待
分片自动失败重试:分片模型基于Map模型开发,可以继承Map模型子任务失败自动重试特性
???????????????
?????????
????????????????
?????????????????????????????????????
相关类与接口
??????
JavaProcessor
public abstract class JavaProcessor implements JobProcessorEx {
public JavaProcessor() {
}
public void preProcess(JobContext context) {
}
public ProcessResult postProcess(JobContext context) {
return null;
}
public void kill(JobContext context) {
}
public ProcessResult process(JobContext context) throws Exception {
return null;
}
}
?????????
MapJobProcessor
public abstract class MapJobProcessor extends JavaProcessor {
private LogCollector logCollector = LogCollectorFactory.get();
private static final Logger LOGGER = LogFactory.getLogger(MapJobProcessor.class);
private static final Integer MAX_RETRY_COUNT = 3;
public MapJobProcessor() {
}
public ProcessResult map(List<? extends Object> taskList, String taskName) {
ProcessResult result = new ProcessResult(false);
JobContext context = ContainerFactory.getContainerPool().getContext();
ActorSelection masterAkkaSelection = SchedulerxWorker.actorSystem.actorSelection(context.getInstanceMasterActorPath());
if (masterAkkaSelection == null) {
String errMsg = "get taskMaster akka path error, path=" + context.getInstanceMasterActorPath();
LOGGER.error(errMsg);
result.setResult(errMsg);
return result;
} else if (CollectionUtils.isEmpty(taskList)) {
result.setResult("task list is empty");
return result;
} else {
int batchSize = ConfigUtil.getWorkerConfig().getInt("worker.map.page.size", 1000);
int size = taskList.size();
LOGGER.info("map task list, jobInstanceId={}, taskName={}, size={}, batchSize={}", new Object[]{context.getJobInstanceId(), taskName, size, batchSize});
int quotient = size / batchSize;
int remainder = size % batchSize;
int batchNumber = remainder > 0 ? quotient + 1 : quotient;
List<Builder> builders = Lists.newArrayList();
int position;
for(position = 0; position < batchNumber; ++position) {
builders.add(WorkerMapTaskRequest.newBuilder());
}
position = 0;
int maxTaskBodySize = ConfigUtil.getWorkerConfig().getInt("task.body.size.max", 65536);
try {
Iterator var14 = taskList.iterator();
while(var14.hasNext()) {
Object task = var14.next();
this.checkTaskObject(task);
int batchIdx = position++ / batchSize;
byte[] taskBody = HessianUtil.toBytes(task);
if (taskBody.length > maxTaskBodySize) {
throw new IOException("taskBody size more than " + maxTaskBodySize + "B!");
}
((Builder)builders.get(batchIdx)).addTaskBody(ByteString.copyFrom(taskBody));
}
position = 0;
var14 = builders.iterator();
while(var14.hasNext()) {
Builder builder = (Builder)var14.next();
builder.setJobId(context.getJobId());
builder.setJobInstanceId(context.getJobInstanceId());
builder.setTaskId(context.getTaskId());
builder.setTaskName(taskName);
WorkerMapTaskResponse response = null;
byte retryCount = 0;
try {
TaskMaster taskMaster = TaskMasterPool.INSTANCE.get(context.getJobInstanceId());
if (taskMaster != null && taskMaster instanceof MapTaskMaster) {
response = this.handleMapTask(taskMaster, builder.build());
} else {
response = (WorkerMapTaskResponse)FutureUtils.awaitResult(masterAkkaSelection, builder.build(), 30L);
}
} catch (TimeoutException var19) {
LOGGER.warn("JobInstanceId={} WorkerMapTaskRequest dispatch timeout.", new Object[]{context.getJobInstanceId(), var19});
if (retryCount >= MAX_RETRY_COUNT) {
throw var19;
}
Thread.sleep(10000L);
masterAkkaSelection = SchedulerxWorker.actorSystem.actorSelection(context.getInstanceMasterActorPath());
response = (WorkerMapTaskResponse)FutureUtils.awaitResult(masterAkkaSelection, builder.build(), 30L);
int var25 = retryCount + 1;
}
if (!response.getSuccess()) {
LOGGER.error(response.getMessage());
this.logCollector.collect(context.getUniqueId(), response.getMessage());
result.setResult(response.getMessage());
return result;
}
builders.set(position++, (Object)null);
if (response.hasOverload() && response.getOverload()) {
LOGGER.warn("Task Master is busy, sleeping a while {}s...", new Object[]{10});
Thread.sleep(10000L);
}
}
result.setStatus(true);
} catch (Throwable var20) {
LOGGER.error("JobInstanceId={} WorkerMapTaskRequest dispatch error.", new Object[]{context.getJobInstanceId(), var20});
this.logCollector.collect(context.getUniqueId(), ExceptionUtil.getTrace(var20));
result.setResult(ExceptionUtil.getMessage(var20));
}
return result;
}
}
private void checkTaskObject(Object taskObject) {
JobContext context = ContainerFactory.getContainerPool().getContext();
boolean isAdvancedVersion = GroupManager.INSTANCE.isAdvancedVersion(context.getGroupId());
if (isAdvancedVersion && taskObject instanceof BizSubTask) {
BizSubTask bizSubTask = (BizSubTask)taskObject;
Map<String, String> labelMap = bizSubTask.labelMap();
if (labelMap.size() > 3) {
throw new RuntimeException("label map size can't beyond 3.");
}
Iterator var6 = labelMap.entrySet().iterator();
while(var6.hasNext()) {
Entry<String, String> entry = (Entry)var6.next();
if (((String)entry.getKey()).length() > 60 || ((String)entry.getValue()).length() > 180) {
LOGGER.error("Job instance={} label map<{}, {}> content can't beyond max size(60,180).", new Object[]{context.getJobInstanceId(), ((String)entry.getKey()).length(), entry.getValue()});
throw new RuntimeException("label map content can't beyond max size(60,180).");
}
}
}
}
private WorkerMapTaskResponse handleMapTask(TaskMaster taskMaster, WorkerMapTaskRequest request) throws Exception {
WorkerMapTaskResponse response = null;
try {
long jobInstanceId = request.getJobInstanceId();
if (taskMaster != null) {
if (!(taskMaster instanceof MapTaskMaster)) {
response = WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage("TaskMaster is not MapTaskMaster").build();
taskMaster.updateNewInstanceStatus(taskMaster.getSerialNum(), InstanceStatus.FAILED, "TaskMaster is not MapTaskMaster");
} else {
try {
long startTime = System.currentTimeMillis();
boolean overload = ((MapTaskMaster)taskMaster).map(request.getTaskBodyList(), request.getTaskName());
LOGGER.debug("jobInstanceId={} map, cost={}ms", new Object[]{jobInstanceId, System.currentTimeMillis() - startTime});
response = WorkerMapTaskResponse.newBuilder().setSuccess(true).setOverload(overload).build();
} catch (Exception var9) {
LOGGER.error("jobInstanceId={} map error", var9);
taskMaster.updateNewInstanceStatus(taskMaster.getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(var9));
throw var9;
}
}
} else {
response = WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage("can't found TaskMaster by jobInstanceId=" + jobInstanceId).build();
}
} catch (Throwable var10) {
LOGGER.error("jobInstanceId={}, handleMapTask error.", new Object[]{request.getJobInstanceId(), var10});
response = WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(var10)).build();
}
return response;
}
protected boolean isRootTask(JobContext context) {
return context.getTaskName().equals("MAP_TASK_ROOT");
}
}
??????????
MapReduceJobProcessor
public abstract class MapReduceJobProcessor extends MapJobProcessor {
private static final Logger LOGGER = LogFactory.getLogger(MapReduceJobProcessor.class);
public MapReduceJobProcessor() {
}
public abstract ProcessResult reduce(JobContext var1) throws Exception;
/** @deprecated */
@Deprecated
public ProcessResult postProcess(JobContext context) {
LOGGER.warn("MapReduceJobProcessor not support postProcess, please use reduce to instead of");
return null;
}
public boolean runReduceIfFail(JobContext context) {
return true;
}
}
??????????????
??????????????????
?????????????????????????????????????
官网示例
??????
TestMapJobProcessor
@Component
public class TestMapJobProcessor extends MapJobProcessor {
//发送50条消息的Demo示例(适用于Map模型)
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
int dispatchNum = 50;
if (isRootTask(context)) { //根任务
System.out.println("start root task");
List<String> msgList = Lists.newArrayList();
for (int i = 0; i <= dispatchNum; i++) {
msgList.add("msg_" + i);
}
return map(msgList, "Level1Dispatch"); //msgList:需要分割的任务列表,
//Level1Dispatch:子任务名称
//map方法:将根任务进行分片
} else if (taskName.equals("Level1Dispatch")) {
String task = (String)context.getTask();
System.out.println(task);
return new ProcessResult(true);
} //子任务处理操作
return new ProcessResult(false);
}
}
???????????
ScanSingleTableJobProcessor:将单表分片处理,每个子任务处理500条数据
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
//处理单表数据的Demo示例(适用于Map或MapReduce模型)
@Service
private XXXService xxxService;
private final int PAGE_SIZE = 500; //每个子任务处理的数据量
static class PageTask {
private long startId;
private long endId;
public PageTask(long startId, long endId) {
this.startId = startId;
this.endId = endId;
}
public long getStartId() {
return startId;
}
public long getEndId() {
return endId;
}
}
@Override
public ProcessResult process(JobContext context) throws Exception {
String tableName = context.getJobParameters(); //多个Job后端代码可以一致,通过控制台配置Job参数表示表名。
String taskName = context.getTaskName(); //任务名称
Object task = context.getTask(); //获取任务
if (isRootTask(context)) {
Pair<Long, Long> idPair = queryMinAndMaxId(tableName); //根据表名获取数据
long minId = idPair.getFirst();
long maxId = idPair.getSecond();
List<PageTask> tasks = Lists.newArrayList(); //任务列表
int step = (int) ((maxId - minId) / PAGE_SIZE); //计算分页数量
for (long i = minId; i < maxId; i+=step) {
tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
}
return map(tasks, "PageTask"); //子任务名称为PageTask
} else if (taskName.equals("PageTask")) {
PageTask pageTask = (PageTask)task;
long startId = pageTask.getStartId();
long endId = pageTask.getEndId();
List<Record> records = queryRecord(tableName, startId, endId);
//TODO handle records
return new ProcessResult(true);
} //子任务处理操作
return new ProcessResult(false);
}
private Pair<Long, Long> queryMinAndMaxId(String tableName) {
//TODO select min(id),max(id) from [tableName]
return new Pair<Long, Long>(1L, 10000L);
}
private List<Record> queryRecord(String tableName, long startId, long endId) {
List<Record> records = Lists.newArrayList();
//TODO select * from [tableName] where id>=[startId] and id<[endId]
return records;
}
}
??????????
ScanShardingTableJobProcessor
@Component
public class ScanShardingTableJobProcessor extends MapJobProcessor {
//处理分库分表数据的Demo示例(适用于Map或MapReduce模型)
@Service
private XXXService xxxService;
private final int PAGE_SIZE = 500; //每个子任务处理500条数据
static class PageTask {
private String tableName;
private long startId;
private long endId;
public PageTask(String tableName, long startId, long endId) {
this.tableName = tableName;
this.startId = startId;
this.endId = endId;
}
public String getTableName() {
return tableName;
}
public long getStartId() {
return startId;
}
public long getEndId() {
return endId;
}
}
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) { //先分库
//先分库
List<String> dbList = getDbList();
return map(dbList, "DbTask");
} else if (taskName.equals("DbTask")) { //对分库后的子任务进一步拆分,进行分表
//根据分库去分表
String dbName = (String)task;
List<String> tableList = getTableList(dbName);
return map(tableList, "TableTask");
} else if (taskName.equals("TableTask")) { //对分库分表后的子任务进一步拆分,进行分页
//如果一个分表也很大,再分页
String tableName = (String)task;
Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
long minId = idPair.getFirst();
long maxId = idPair.getSecond();
List<PageTask> tasks = Lists.newArrayList();
int step = (int) ((maxId - minId) / PAGE_SIZE); //计算分页数量
for (long i = minId; i < maxId; i+=step) {
tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));
}
return map(tasks, "PageTask");
} else if (taskName.equals("PageTask")) {
PageTask pageTask = (PageTask)task;
String tableName = pageTask.getTableName();
long startId = pageTask.getStartId();
long endId = pageTask.getEndId();
List<Record> records = queryRecord(tableName, startId, endId);
//TODO handle records
return new ProcessResult(true);
} //分库、分表、分页后的子任务处理
return new ProcessResult(false);
}
private List<String> getDbList() {
List<String> dbList = Lists.newArrayList();
//TODO 返回分库列表
return dbList;
}
private List<String> getTableList(String dbName) {
List<String> tableList = Lists.newArrayList();
//TODO 返回分表列表
return tableList;
}
private Pair<Long, Long> queryMinAndMaxId(String tableName) {
//TODO select min(id),max(id) from [tableName]
return new Pair<Long, Long>(1L, 10000L);
}
private List<Record> queryRecord(String tableName, long startId, long endId) {
List<Record> records = Lists.newArrayList();
//TODO select * from [tableName] where id>=[startId] and id<[endId]
return records;
}
}
????????
TestMapReduceJobProcessor
@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {
//处理50条消息并且返回子任务结果由Reduce汇总的Demo示例(适用于MapReduce模型)
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
int dispatchNum = 50;
if (context.getJobParameters() != null) {
dispatchNum = Integer.valueOf(context.getJobParameters());
}
if (isRootTask(context)) {
System.out.println("start root task");
List<String> msgList = Lists.newArrayList();
for (int i = 0; i <= dispatchNum; i++) {
msgList.add("msg_" + i);
}
return map(msgList, "Level1Dispatch");
} else if (taskName.equals("Level1Dispatch")) {
String task = (String)context.getTask();
Thread.sleep(2000);
return new ProcessResult(true, task);
}
return new ProcessResult(false);
}
@Override
public ProcessResult reduce(JobContext context) throws Exception {
for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
} //获取处理结果,key为tadkId,value为处理结果
return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
}
}
????????????????
????????????????????
?????????????????????????????????????
使用示例
??
应用1(端口8080)、应用2(端口8081),其余配置及类完全相同
????????????????????
?????????
application.yml
spring:
schedulerx2:
aliyunAccessKey: ***
aliyunSecretKey: ***
regionId: public
endpoint: acm.aliyun.com
namespaceName: lihu-test
namespace: ***
appName: lihu-job
groupId: lihu-group
appKey: OOs+XPhRsAa1Ia6+Op+NAw==
jobs:
mapJob:
jobModel: parallel
className: com.example.demo.job.CustomMapJob
oneTime: "2022-04-18 12:35:00"
#cron: "0 */1 * * * ?"
mapReduceJob:
jobModel: parallel
className: com.example.demo.job.CustomMapReduceJob
oneTime: "2022-04-18 12:40:00"
#cron: "0 */1 * * * ?"
????????????
DataConfig
@Configuration
public class DataConfig {
@Bean
public JobSyncService initJobSyncService(){
return new JobSyncService();
}
}
???????
CustomMapJob
public class CustomMapJob extends MapJobProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
if (isRootTask(context)){
List<String> tasks = new ArrayList<>();
for (int i=1;i<=10;i++){
tasks.add("任务 "+i);
}
return map(tasks,"子任务");
}else if ("子任务".equals(taskName)){
String task = (String) context.getTask();
System.out.println("子任务是:"+task+" "+ LocalDateTime.now());
return new ProcessResult(true);
}
return super.process(context);
}
}
????????
CustomMapReduceJob
@Data
class Task{
private Integer startNum;
private Integer endNum;
}
public class CustomMapReduceJob extends MapReduceJobProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
if (isRootTask(context)){
List<Task> tasks = new ArrayList<>();
for (int i=0;i<10;i++){
Task task = new Task();
task.setStartNum(i*10);
task.setEndNum(i*10+10-1);
tasks.add(task);
}
return map(tasks,"子任务");
}else if ("子任务".equals(taskName)){
Task task = (Task) context.getTask();
int sum=0;
for (int i=task.getStartNum();i<= task.getEndNum();i++){
sum+=i;
}
System.out.println("子任务"+context.getTaskId()+" ==> "+sum);
return new ProcessResult(true,String.valueOf(sum));
}
return super.process(context);
}
@Override
public ProcessResult reduce(JobContext jobContext) throws Exception {
Map<Long,String> results = jobContext.getTaskResults();
AtomicInteger sum= new AtomicInteger(0);
results.forEach((key,value) -> {
System.out.println("key:"+key+" ==> "+value);
sum.addAndGet("".equals(value)?0:Integer.parseInt(value));
});
System.out.println("reduce方法计算所得的总和为:"+sum);
return new ProcessResult(true);
}
}
????????
????????????????????
?????????????????????????????????????
使用测试
??????????
启动应用1、应用2,控制台查看任务列表
?????????????????
???????????????
??????????
应用1:控制台输出???????????????
2022-04-18 12:35:01.362 INFO 2392 --- [MapTaskMaster-6] c.a.s.worker.master.MapTaskMaster : jobInstanceId=997943958, batch start containers successfully, size:5 , worker=010010000047_2392_43290@192.168.5.10:51542, cost=1ms
2022-04-18 12:35:01.363 INFO 2392 --- [940_997943958-1] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_1 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.363 INFO 2392 --- [940_997943958-2] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_3 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.363 INFO 2392 --- [940_997943958-3] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_5 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.364 INFO 2392 --- [940_997943958-4] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_7 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.364 INFO 2392 --- [940_997943958-0] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_9 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
# map任务:部分在应用1执行,部分在应用2执行
子任务是:任务 9 2022-04-18T12:35:01.374
子任务是:任务 3 2022-04-18T12:35:01.374
子任务是:任务 1 2022-04-18T12:35:01.374
子任务是:任务 5 2022-04-18T12:35:01.374
子任务是:任务 7 2022-04-18T12:35:01.374
????????
应用2:控制台输出
2022-04-18 12:35:01.350 INFO 2394 --- [r-container-177] c.a.s.worker.actor.ContainerActor : jobInstanceId=997943958, batch start containers, size:5
2022-04-18 12:35:01.390 INFO 2394 --- [940_997943958-2] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_6 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.390 INFO 2394 --- [940_997943958-3] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_8 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.390 INFO 2394 --- [940_997943958-0] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_2 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.390 INFO 2394 --- [940_997943958-4] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_10 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:35:01.390 INFO 2394 --- [940_997943958-1] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_4 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
# map任务:部分在应用1执行、部分在应用2执行
子任务是:任务 10 2022-04-18T12:35:01.401
子任务是:任务 2 2022-04-18T12:35:01.401
子任务是:任务 8 2022-04-18T12:35:01.401
子任务是:任务 4 2022-04-18T12:35:01.401
子任务是:任务 6 2022-04-18T12:35:01.401
2022-04-18 12:35:01.401 INFO 2394 --- [940_997943958-4] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_10 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=null]
2022-04-18 12:35:01.401 INFO 2394 --- [940_997943958-3] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_8 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=null]
2022-04-18 12:35:01.401 INFO 2394 --- [940_997943958-0] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_2 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=null]
2022-04-18 12:35:01.401 INFO 2394 --- [940_997943958-1] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715940_997943958_4 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=null]
。。。
# mapReduce任务:该实例全部在应用2执行
2022-04-18 12:40:00.050 INFO 2394 --- [941_997946922-0] c.a.s.worker.master.ParallelTaskMater : map taskList, jobInstanceId=997946922, taskName=子任务, taskList size=10
2022-04-18 12:40:00.054 INFO 2394 --- [941_997946922-0] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715941_997946922_0 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=null]
2022-04-18 12:40:01.020 INFO 2394 --- [ead-997946922-1] c.a.s.w.m.p.ServerTaskPersistence : batch create tasks to Server successfully, jobInstanceId=997946922, size=5
2022-04-18 12:40:01.021 INFO 2394 --- [r-container-278] c.a.s.worker.actor.ContainerActor : jobInstanceId=997946922, batch start containers, size:5
2022-04-18 12:40:01.022 INFO 2394 --- [MapTaskMaster-4] c.a.s.worker.master.MapTaskMaster : jobInstanceId=997946922, batch start containers successfully, size:5 , worker=010010000047_2394_44529@192.168.5.10:51549, cost=1ms
2022-04-18 12:40:01.022 INFO 2394 --- [941_997946922-1] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715941_997946922_1 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
子任务1 ==> 45
2022-04-18 12:40:01.022 INFO 2394 --- [941_997946922-2] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715941_997946922_3 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:40:01.022 INFO 2394 --- [941_997946922-1] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715941_997946922_1 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=45]
子任务3 ==> 245
2022-04-18 12:40:01.022 INFO 2394 --- [941_997946922-2] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715941_997946922_3 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=245]
2022-04-18 12:40:01.022 INFO 2394 --- [941_997946922-3] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715941_997946922_5 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
子任务5 ==> 445
2022-04-18 12:40:01.022 INFO 2394 --- [941_997946922-3] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715941_997946922_5 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=445]
2022-04-18 12:40:01.023 INFO 2394 --- [941_997946922-4] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715941_997946922_7 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
2022-04-18 12:40:01.023 INFO 2394 --- [941_997946922-0] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715941_997946922_9 submitResult=true, processResult=ProcessResult [status=RUNNING, result=null]
子任务7 ==> 645
子任务9 ==> 845
2022-04-18 12:40:01.023 INFO 2394 --- [941_997946922-4] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715941_997946922_7 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=645]
2022-04-18 12:40:01.023 INFO 2394 --- [941_997946922-0] c.a.s.worker.container.ThreadContainer : reportTaskStatus instanceId=715941_997946922_9 submitResult=true, processResult=ProcessResult [status=SUCCESS, result=845]
2022-04-18 12:40:01.044 INFO 2394 --- [ead-997946922-1] c.a.s.w.batch.ContainerStatusReqHandler : jobInstanceId=997946922 batch report status=3 to task master, size:12
2022-04-18 12:40:01.046 INFO 2394 --- [atcher-task-281] c.a.schedulerx.worker.actor.TaskActor : jobInstanceId=997946922, batch receive task status reqs, size:12
2022-04-18 12:40:01.050 INFO 2394 --- [ead-997946922-1] c.a.s.w.m.p.ServerTaskPersistence : batch create tasks to Server successfully, jobInstanceId=997946922, size=5
2022-04-18 12:40:01.050 INFO 2394 --- [ead-997946922-1] c.a.s.worker.batch.TaskPushReqHandler : jobInstance=997946922, batch dispatch cost:64 ms, dispatchSize:256, size:10
2022-04-18 12:40:01.055 INFO 2394 --- [MapTaskMaster-6] c.a.s.worker.master.MapTaskMaster : jobInstanceId=997946922, batch start containers successfully, size:5 , worker=010010000047_2392_43290@192.168.5.10:51542, cost=5ms
2022-04-18 12:40:01.994 INFO 2394 --- [hread-997946922] c.a.s.worker.batch.TMStatusReqHandler : jobInstanceId=997946922, batch update status cost:6ms, size:12
2022-04-18 12:40:02.063 INFO 2394 --- [atcher-task-284] c.a.schedulerx.worker.actor.TaskActor : jobInstanceId=997946922, batch receive task status reqs, size:10
2022-04-18 12:40:02.098 INFO 2394 --- [hread-997946922] c.a.s.worker.batch.TMStatusReqHandler : jobInstanceId=997946922, batch update status cost:1ms, size:10
key:0 ==>
key:1 ==> 45
key:2 ==> 145
key:3 ==> 245
key:4 ==> 345
key:5 ==> 445
key:6 ==> 545
key:7 ==> 645
key:8 ==> 745
key:9 ==> 845
key:10 ==> 945
reduce方法计算所得的总和为:4950
??????????? ??
??????????????????
|