1. 需求来源
业务上需要将定时器的执行周期进行配置(新增、编辑、删除),并支持热部署(直接生效,无需重启)
2. 设计
2.1 表结构(postgresql)
drop table if exists t_scheduler;
create table t_scheduler (
id int4 not null,
task_name varchar(32) not null,
cron varchar(32) not null,
delete int2 not null default 0,
constraint t_scheduler_pkey primary key ("id")
);
comment on column t_scheduler.id is '任务Id';
comment on column t_scheduler.task_name is '任务名称';
comment on column t_scheduler.cron is '任务执行周期';
comment on column t_scheduler.delete is '任务是否删除0未删除1已删除';
-- 任务数据
insert into t_scheduler values(1, 'SyncUser', '0/10 * * * * ?', 0);
insert into t_scheduler values(2, 'WeatherTask', '0/10 * * * * ?', 0);
2.2 程序设计
主配置类
里面包含一个定时任务,扫描上述任务配置表,检查有没有更新
任务接口(该接口继承Runnable)
任务定义接口,自定义的任务,均实现该接口
任务对象工厂
用来通过配置的任务名,反射获取任务对象
任务Dao
包含任务查询等
3. 代码
3.1 主配置类
package com.hz.basepro.schedule.start;
import com.hz.basepro.bean.ScheduleTask;
import com.hz.basepro.schedule.dao.businessmapper.ScheduleMapper;
import com.hz.basepro.schedule.task.TaskHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Component
@EnableScheduling
public class ScheduleTaskConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleTaskConfig.class);
@Autowired
private ScheduleMapper scheduleMapper;
/**
* 任务Map
*
*/
private Map<Integer, ScheduleTask> taskMap = new HashMap<>();
/**
* 任务对应的定时器Map
*/
private Map<Integer, ThreadPoolTaskScheduler> schedulerMap = new HashMap<>();
@Scheduled(initialDelay = 0, fixedRate = 1 * 1000)
public void checkTask() {
List<ScheduleTask> scheduleTaskList = scheduleMapper.getAllScheduleTask();
for (ScheduleTask scheduleTask : scheduleTaskList) {
if (taskMap.containsKey(scheduleTask.getId())) {
ScheduleTask oldTask = taskMap.get(scheduleTask.getId());
// cron发生改变 1. 移除现有的任务 2. 重新生成新的任务
if (!scheduleTask.getCron().equals(oldTask.getCron())) {
removeTask(oldTask.getId());
addTask(scheduleTask);
}
taskMap.remove(scheduleTask.getId());
} else {
// cron未改变,添加新任务
addTask(scheduleTask);
}
taskMap.put(scheduleTask.getId(), scheduleTask);
}
// 移除已经删除的任务
List<Integer> taskIds = scheduleTaskList.stream().map(task -> task.getId()).collect(Collectors.toList());
for(Iterator<Map.Entry<Integer, ScheduleTask>> it = taskMap.entrySet().iterator();it.hasNext();) {
Map.Entry<Integer, ScheduleTask> entry = it.next();
Integer oldTaskId = entry.getKey();
if (!taskIds.contains(oldTaskId)) {
// 1. 移除任务
removeTask(oldTaskId);
// 2. map移除任务
it.remove();
}
}
}
/**
* 添加定时任务
*
* @param task
*/
public void addTask(ScheduleTask task) {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix(task.getTaskName());
scheduler.setPoolSize(1);
scheduler.initialize();
scheduler.schedule(TaskHelper.getTask(task.getTaskName()), new CronTrigger(task.getCron()));
schedulerMap.put(task.getId(), scheduler);
LOGGER.info("定时器创建:{}.", task.getId());
}
/**
* 移除定时任务
*
* @param taskId
*/
public void removeTask(Integer taskId) {
try {
ThreadPoolTaskScheduler scheduler = schedulerMap.get(taskId);
if (scheduler == null) {
LOGGER.info("定时器不存在:{}.", taskId);
} else {
try {
scheduler.shutdown();
} catch (Exception e) {
LOGGER.error("线程池关闭失败, taskId: {}.", taskId, e);
try {
scheduler.shutdown();
} catch (Exception ex) {
LOGGER.error("线程池再次关闭失败, taskId: {}.", taskId, e);
}
}
}
} finally {
schedulerMap.remove(taskId);
}
LOGGER.info("定时器销毁:{}.", taskId);
}
}
3.2 任务对象工厂TaskHelper
package com.hz.basepro.schedule.task;
import com.hz.basepro.bean.CommonException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TaskHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskHelper.class);
private static final String basePackage = "com.hz.basepro.schedule.task.impl.";
public static Task getTask(String taskName) {
try {
Class<?> clazz = Class.forName(basePackage + taskName);
Task task = (Task)clazz.newInstance();
return task;
} catch (Exception e) {
throw new CommonException("找不到对应的任务:" + taskName, e);
}
}
}
3.3 任务对象
package com.hz.basepro.schedule.task;
public interface Task extends Runnable {
}
package com.hz.basepro.schedule.task.impl;
import com.hz.basepro.schedule.task.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
public class SyncUser implements Task {
private static final Logger LOGGER = LoggerFactory.getLogger(SyncUser.class);
private static final ThreadLocal<SimpleDateFormat> FORMAT_LOCAL = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
@Override
public void run() {
SimpleDateFormat sdf = FORMAT_LOCAL.get();
LOGGER.info("同步用户任务进行中:{}", sdf.format(new Date()));
}
}
package com.hz.basepro.schedule.task.impl;
import com.hz.basepro.schedule.task.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
public class WeatherTask implements Task {
private static final Logger LOGGER = LoggerFactory.getLogger(WeatherTask.class);
private static final ThreadLocal<SimpleDateFormat> FORMAT_LOCAL = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
@Override
public void run() {
SimpleDateFormat sdf = FORMAT_LOCAL.get();
LOGGER.info("天气任务进行中:{}", sdf.format(new Date()));
}
}
3.4 ScheduleMapper
package com.hz.basepro.schedule.dao.businessmapper;
import com.hz.basepro.bean.ScheduleTask;
import java.util.List;
public interface ScheduleMapper {
/**
* 获取所有任务
*
* @return
*/
List<ScheduleTask> getAllScheduleTask();
}
3.5 查询sql
<select id="getAllScheduleTask" resultType="com.hz.basepro.bean.ScheduleTask">
select id, task_name, cron from t_scheduler where delete = 0
</select>
3.6涉及的Bean
package com.hz.basepro.bean;
public class ScheduleTask {
private int id;
private String taskName;
private String cron;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public String getCron() {
return cron;
}
public void setCron(String cron) {
this.cron = cron;
}
@Override
public String toString() {
return "SchedulerTask{" +
"id=" + id +
", taskName='" + taskName + '\'' +
", cron='" + cron + '\'' +
'}';
}
}
4. 执行结果
4.1 初始执行结果
2022-06-29 14:54:23,281 INFO [scheduling-1] (ScheduleTaskConfig.java:99)- 定时器创建:2.
2022-06-29 14:54:23,283 INFO [scheduling-1] (ScheduleTaskConfig.java:99)- 定时器创建:1.
2022-06-29 14:54:30,001 INFO [WeatherTask1] (WeatherTask.java:19)- 天气任务进行中:2022-06-29 14:54:30
2022-06-29 14:54:30,001 INFO [SyncUser1] (SyncUser.java:20)- 同步用户任务进行中:2022-06-29 14:54:30
2022-06-29 14:54:40,000 INFO [SyncUser1] (SyncUser.java:20)- 同步用户任务进行中:2022-06-29 14:54:40
2022-06-29 14:54:40,000 INFO [WeatherTask1] (WeatherTask.java:19)- 天气任务进行中:2022-06-29 14:54:40
2022-06-29 14:54:50,001 INFO [SyncUser1] (SyncUser.java:20)- 同步用户任务进行中:2022-06-29 14:54:50
4.2 修改任务执行周期
update t_scheduler set cron = '0/5 * * * * ?' where id = 1;
4.3?修改后的执行日志
2022-06-29 14:56:22,629 INFO [scheduling-1] (ScheduleTaskConfig.java:125)- 定时器销毁:1.
2022-06-29 14:56:22,631 INFO [scheduling-1] (ScheduleTaskConfig.java:99)- 定时器创建:1.
2022-06-29 14:56:25,001 INFO [SyncUser1] (SyncUser.java:20)- 同步用户任务进行中:2022-06-29 14:56:25
2022-06-29 14:56:30,000 INFO [WeatherTask1] (WeatherTask.java:19)- 天气任务进行中:2022-06-29 14:56:30
2022-06-29 14:56:30,000 INFO [SyncUser1] (SyncUser.java:20)- 同步用户任务进行中:2022-06-29 14:56:30
2022-06-29 14:56:35,001 INFO [SyncUser1] (SyncUser.java:20)- 同步用户任务进行中:2022-06-29 14:56:35
2022-06-29 14:56:40,001 INFO [SyncUser1] (SyncUser.java:20)- 同步用户任务进行中:2022-06-29 14:56:40
2022-06-29 14:56:40,001 INFO [WeatherTask1] (WeatherTask.java:19)- 天气任务进行中:2022-06-29 14:56:40
4.4 结论
? ? ? ?符合期望结果,修改的周期的任务按照期望5s执行一次。未修改的任务仍然是10s执行一次。
|