Quartz动态定时任务
POM
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
数据库
DROP TABLE IF EXISTS `job_info`;
CREATE TABLE `job_info` (
`id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'ID',
`job_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '名称',
`cron_expression` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'java cron表达式(秒分时日月周[年])',
`bean_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'spring bean名称,被ioc容器管理的,需要执行调度任务的bean',
`method_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'bean里需要执行方法名',
`params` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '方法参数',
`enable` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否开启,1开启,0关闭',
`remark` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '说明',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建日期',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新日期',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '任务调度表' ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
CREATE TABLE `job_log` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`job_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '名称',
`cron_expression` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'java cron表达式(秒分时日月周[年])',
`bean_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'spring bean名称,被ioc容器管理的,需要执行调度任务的bean',
`method_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'bean里需要执行方法名',
`params` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '方法参数',
`success` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否成功,1成功,0失败',
`time` bigint(20) NOT NULL DEFAULT 0 COMMENT '执行耗时,毫秒',
`detail` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '详细内容',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建日期',
PRIMARY KEY (`id`) USING BTREE,
INDEX `idx_job_log_create_time`(`create_time`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 125 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '任务日志表' ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
对应实体类
JobInfo
import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.util.Date;
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("job_info")
@ApiModel(value="JobInfo对象", description="任务调度表")
public class JobInfo implements Serializable {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "ID")
private String id;
@ApiModelProperty(value = "名称")
@TableField("job_name")
private String jobName;
@ApiModelProperty(value = "java cron表达式(秒分时日月周[年])")
@TableField("cron_expression")
private String cronExpression;
@ApiModelProperty(value = "spring bean名称,被ioc容器管理的,需要执行调度任务的bean")
@TableField("bean_name")
private String beanName;
@ApiModelProperty(value = "bean里需要执行方法名")
@TableField("method_name")
private String methodName;
@ApiModelProperty(value = "方法参数")
private String params;
@ApiModelProperty(value = "是否开启,1开启,0关闭")
private String enable;
@ApiModelProperty(value = "说明")
private String remark;
@ApiModelProperty(value = "创建日期")
@TableField(value = "create_time",fill = FieldFill.INSERT)
private Date createTime;
@ApiModelProperty(value = "更新日期")
@TableField(value = "update_time",fill = FieldFill.INSERT_UPDATE)
private Date updateTime;
}
JobLog
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.util.Date;
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("job_log")
@ApiModel(value="JobLog对象", description="任务日志表")
public class JobLog implements Serializable {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "主键id")
@TableId(value = "id",type = IdType.AUTO)
private Integer id;
@ApiModelProperty(value = "名称")
@TableField("job_name")
private String jobName;
@ApiModelProperty(value = "java cron表达式(秒分时日月周[年])")
@TableField("cron_expression")
private String cronExpression;
@ApiModelProperty(value = "spring bean名称,被ioc容器管理的,需要执行调度任务的bean")
@TableField("bean_name")
private String beanName;
@ApiModelProperty(value = "bean里需要执行方法名")
@TableField("method_name")
private String methodName;
@ApiModelProperty(value = "方法参数")
private String params;
@ApiModelProperty(value = "是否成功,1成功,0失败")
private Boolean success;
@ApiModelProperty(value = "执行耗时,毫秒")
private Long time;
@ApiModelProperty(value = "详细内容")
private String detail;
@ApiModelProperty(value = "创建日期")
@TableField("create_time")
private Date createTime;
}
创建Quartz的Job
import cn.com.wasec.b3.utils.SpringContextHolder;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class CallMethod {
private Object target;
private Method method;
private Object params;
public CallMethod(String beanName, String methodName, Object params) throws NoSuchMethodException {
this.target = SpringContextHolder.getBean(beanName);
this.params = params;
if (null!=params) {
this.method = target.getClass().getDeclaredMethod(methodName, Object.class);
} else {
this.method = target.getClass().getDeclaredMethod(methodName);
}
}
public void call() throws InvocationTargetException, IllegalAccessException {
ReflectionUtils.makeAccessible(method);
if (null!=params) {
method.invoke(target, params);
} else {
method.invoke(target);
}
}
}
读取数据库中的任务,用于项目启动时初始化
import cn.com.wasec.b3.model.entity.JobInfo;
import cn.com.wasec.b3.service.IJobInfoService;
import cn.com.wasec.b3.service.QuartzJobService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@Slf4j
@Component
public class JobStarter implements ApplicationRunner {
@Resource
private IJobInfoService jobInfoService;
@Resource
private QuartzJobService quartzJobService;
@Override
public void run(ApplicationArguments args) throws Exception {
JobInfo jobInfo = new JobInfo();
jobInfo.setEnable("1");
List<JobInfo> jobInfoList = jobInfoService.list(new LambdaQueryWrapper<JobInfo>().eq(JobInfo::getEnable,1));
log.info("加载定时任务...");
jobInfoList.forEach(job ->{
log.info("定时任务:"+job.getJobName()+" cron:"+job.getCronExpression());
quartzJobService.addSerialJob(job);
});
log.info("定时任务加载完成...");
}
}
执行任务
import cn.com.wasec.b3.common.constant.SysCont;
import cn.com.wasec.b3.mapper.JobLogMapper;
import cn.com.wasec.b3.model.entity.JobInfo;
import cn.com.wasec.b3.model.entity.JobLog;
import cn.com.wasec.b3.service.IJobLogService;
import cn.com.wasec.b3.utils.SpringContextHolder;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.BeanUtils;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
@Slf4j
@DisallowConcurrentExecution
@Component
public class QuartzSerialJob extends QuartzJobBean {
@Resource
private JobLogMapper jobLogMapper;
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
JobInfo jobInfo = (JobInfo) context.getMergedJobDataMap().get(SysCont.JOB_KEY);
IJobLogService jobLogService = SpringContextHolder.getBean(IJobLogService.class);
JobLog jobLog = new JobLog();
BeanUtils.copyProperties(jobInfo, jobLog);
jobLog.setCreateTime(new Date());
long startTime = System.currentTimeMillis();
try {
log.info("定时任务准备执行,任务名称:{}", jobInfo.getJobName());
JSONObject jsonObject = JSONUtil.parseObj(jobInfo.getParams());
jsonObject.set("jobId",jobInfo.getId());
CallMethod callMethod = new CallMethod(jobInfo.getBeanName(), jobInfo.getMethodName(), jsonObject);
callMethod.call();
jobLog.setSuccess(true);
long times = System.currentTimeMillis() - startTime;
log.info("定时任务执行完毕,任务名称:{} 总共耗时:{} 毫秒", jobLog.getJobName(), times);
} catch (Exception e) {
log.error("定时任务执行失败,任务名称:"+ jobInfo.getJobName(), e);
jobLog.setSuccess(false);
jobLog.setDetail(e.toString());
} finally {
long times = System.currentTimeMillis() - startTime;
jobLog.setTime(times);
jobLog.setId(null);
jobLogMapper.insert(jobLog);
}
}
}
SpringContextHolder方法的实现
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {
private static ApplicationContext applicationContext = null;
public static <T> T getBean(String name) {
assertContextInjected();
return (T) applicationContext.getBean(name);
}
public static <T> T getBean(Class<T> requiredType) {
assertContextInjected();
return applicationContext.getBean(requiredType);
}
private static void assertContextInjected() {
if (applicationContext == null) {
throw new IllegalStateException("applicaitonContext属性未注入, 请在applicationContext" +
".xml中定义SpringContextHolder或在SpringBoot启动类中注册SpringContextHolder.");
}
}
private static void clearHolder() {
log.debug("清除SpringContextHolder中的ApplicationContext:"
+ applicationContext);
applicationContext = null;
}
@Override
public void destroy(){
SpringContextHolder.clearHolder();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (SpringContextHolder.applicationContext != null) {
log.warn("SpringContextHolder中的ApplicationContext被覆盖, 原有ApplicationContext为:" + SpringContextHolder.applicationContext);
}
SpringContextHolder.applicationContext = applicationContext;
}
}
定时任务增删的基本操作
import cn.com.wasec.b3.common.constant.SysCont;
import cn.com.wasec.b3.common.quartz.QuartzSerialJob;
import cn.com.wasec.b3.model.entity.JobInfo;
import cn.hutool.cron.pattern.CronPattern;
import cn.hutool.cron.pattern.CronPatternUtil;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
@Slf4j
@Service("quartzJobService")
public class QuartzJobService {
private static final String JOB_NAME = "TASK_";
@Resource
private Scheduler scheduler;
public void addSerialJob(JobInfo jobInfo){
if(!"1".equals(jobInfo.getEnable())) {
return;
}
if (null== CronPatternUtil.nextDateAfter(new CronPattern(jobInfo.getCronExpression()), new Date(), false)){
log.warn("该定时任务超过执行时间:{}",jobInfo);
return;
}
JobDetail jobDetail = JobBuilder.newJob(QuartzSerialJob.class)
.storeDurably()
.withIdentity(JOB_NAME + jobInfo.getId())
.build();
CronTrigger cronTrigger = TriggerBuilder.newTrigger()
.withIdentity(JOB_NAME + jobInfo.getId())
.startNow()
.withSchedule(CronScheduleBuilder.cronSchedule(jobInfo.getCronExpression()))
.build();
cronTrigger.getJobDataMap().put(SysCont.JOB_KEY,jobInfo);
try {
scheduler.scheduleJob(jobDetail,cronTrigger);
} catch (SchedulerException e) {
log.error("创建定时任务错误");
e.printStackTrace();
}
}
public void ResetSerialJob(JobInfo jobInfo){
deleteJob(jobInfo);
addSerialJob(jobInfo);
}
public void deleteJob(JobInfo jobInfo){
try {
JobKey jobKey = JobKey.jobKey(JOB_NAME + jobInfo.getId());
scheduler.pauseJob(jobKey);
scheduler.deleteJob(jobKey);
} catch (Exception e){
log.error("删除定时任务失败", e);
}
}
public void resumeJob(JobInfo jobInfo){
try {
TriggerKey triggerKey = TriggerKey.triggerKey(JOB_NAME + jobInfo.getId());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if(trigger == null) {
addSerialJob(jobInfo);
}
JobKey jobKey = JobKey.jobKey(JOB_NAME + jobInfo.getId());
scheduler.resumeJob(jobKey);
} catch (Exception e){
log.error("恢复定时任务失败", e);
}
}
public void pauseJob(JobInfo jobInfo){
try {
JobKey jobKey = JobKey.jobKey(JOB_NAME + jobInfo.getId());
scheduler.pauseJob(jobKey);
} catch (Exception e){
log.error("定时任务暂停失败", e);
}
}
}
任务调度表 服务类
import cn.com.wasec.b3.model.entity.JobInfo;
import com.baomidou.mybatisplus.extension.service.IService;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
public interface IJobInfoService extends IService<JobInfo> {
void startJob(String id);
void resetJob(String id);
void pauseJob(String id);
void resumeJob(String id);
void deleteJob(String id);
@Transactional(rollbackFor = Exception.class,isolation = Isolation.SERIALIZABLE)
JobInfo updateState(JobInfo jobInfo) throws Exception;
JobInfo insert(JobInfo jobInfo);
@Transactional(rollbackFor = Exception.class,isolation = Isolation.SERIALIZABLE)
JobInfo update(JobInfo jobInfo);
boolean deleteById(String id);
}
实现类
import cn.com.wasec.b3.mapper.JobInfoMapper;
import cn.com.wasec.b3.model.entity.JobInfo;
import cn.com.wasec.b3.service.IJobInfoService;
import cn.com.wasec.b3.service.QuartzJobService;
import cn.com.wasec.b3.utils.SpringContextHolder;
import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.quartz.CronExpression;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
@Service
@Slf4j
public class JobInfoServiceImpl extends ServiceImpl<JobInfoMapper, JobInfo> implements IJobInfoService {
@Resource
private JobInfoMapper jobInfoDao;
@Resource
private QuartzJobService quartzJobService;
@Override
public void startJob(String id) {
JobInfo jobInfo = jobInfoDao.selectById(id);
if(jobInfo==null) {
log.error("任务不存在");
return;
}
if("1".equals(jobInfo.getEnable())){
quartzJobService.ResetSerialJob(jobInfo);
return;
}
JobInfo updateJob = new JobInfo();
updateJob.setId(id);
updateJob.setEnable("1");
Boolean update = this.saveOrUpdate(updateJob);
if(!update) {
log.error("更新任务失败");
return;
}
jobInfo.setEnable("1");
quartzJobService.addSerialJob(jobInfo);
}
@Override
public void resetJob(String id) {
JobInfo jobInfo = jobInfoDao.selectById(id);
if(jobInfo==null) {
log.error("任务不存在");
return;
}
quartzJobService.ResetSerialJob(jobInfo);
}
@Override
public void pauseJob(String id) {
JobInfo jobInfo = jobInfoDao.selectById(id);
if(jobInfo==null) {
log.error("任务不存在");
return;
}
JobInfo updateJob = new JobInfo();
updateJob.setId(id);
updateJob.setEnable("0");
this.saveOrUpdate(updateJob);
quartzJobService.pauseJob(jobInfo);
}
@Override
public void resumeJob(String id) {
JobInfo jobInfo = jobInfoDao.selectById(id);
if(jobInfo==null) {
log.error("任务不存在");
return;
}
if(!"1".equals(jobInfo.getEnable())){
JobInfo updateJob = new JobInfo();
updateJob.setId(id);
updateJob.setEnable("1");
this.saveOrUpdate(updateJob);
}
quartzJobService.resumeJob(jobInfo);
}
@Override
public void deleteJob(String id) {
JobInfo jobInfo = jobInfoDao.selectById(id);
if(jobInfo==null) {
log.error("任务不存在");
return;
}
if("1".equals(jobInfo.getEnable())){
JobInfo updateJob = new JobInfo();
updateJob.setId(id);
updateJob.setEnable("0");
this.saveOrUpdate(updateJob);
}
quartzJobService.deleteJob(jobInfo);
}
@Override
@Transactional(rollbackFor = Exception.class,isolation = Isolation.SERIALIZABLE)
public JobInfo updateState(JobInfo jobInfo) throws Exception {
JobInfo updateJob = new JobInfo();
updateJob.setId(jobInfo.getId());
updateJob.setEnable(jobInfo.getEnable());
updateJob.setUpdateTime(new Date());
Boolean count = saveOrUpdate(updateJob);
if(!count) {
log.error("更新数据失败");
throw new Exception("更新数据失败");
}
JobInfo result = this.getById(jobInfo.getId());
if("1".equals(jobInfo.getEnable())){
this.quartzJobService.ResetSerialJob(result);
}else {
this.quartzJobService.deleteJob(jobInfo);
}
return result;
}
@Override
public JobInfo insert(JobInfo jobInfo) {
if (!CronExpression.isValidExpression(jobInfo.getCronExpression())){
log.error("cron表达式格式错误");
return null;
}
Object bean = SpringContextHolder.getBean(jobInfo.getBeanName());
log.info("新增定时任务,bean ->"+bean+" method->"+jobInfo.getMethodName());
jobInfo.setId(String.valueOf(IdUtil.getSnowflake(1,1).nextId()));
jobInfo.setCreateTime(new Date());
this.jobInfoDao.insert(jobInfo);
if("1".equals(jobInfo.getEnable())){
this.quartzJobService.addSerialJob(jobInfo);
}
return jobInfo;
}
@Override
@Transactional(rollbackFor = Exception.class,isolation = Isolation.SERIALIZABLE)
public JobInfo update(JobInfo jobInfo) {
if (!CronExpression.isValidExpression(jobInfo.getCronExpression())){
log.error("cron表达式格式错误");
return null;
}
Object bean = SpringContextHolder.getBean(jobInfo.getBeanName());
log.info("更新定时任务,bean ->"+bean+" method->"+jobInfo.getMethodName());
jobInfo.setUpdateTime(new Date());
this.saveOrUpdate(jobInfo);
JobInfo result = this.getById(jobInfo.getId());
if("1".equals(jobInfo.getEnable())){
this.quartzJobService.ResetSerialJob(result);
}
return result;
}
@Override
public boolean deleteById(String id) {
JobInfo jobInfo = this.jobInfoDao.selectById(id);
if(jobInfo==null) {
log.error("任务不存在");
return false;
}
this.quartzJobService.deleteJob(jobInfo);
return this.jobInfoDao.deleteById(id) > 0;
}
}
创建任务例子
try {
JobInfo jobInfo = new JobInfo();
String appNumber = dto.getAppNumber();
JSONObject jsonObject = JSONUtil.createObj();
jsonObject.set("appNumber",appNumber);
String cron = CronUtil.getCron(DateUtil.offsetMinute(new Date(), SysCont.DETECTION_TIMEOUT).toJdkDate());
jobInfo.setCronExpression(cron)
.setBeanName("workBenchService")
.setJobName("上传检查超时")
.setMethodName("detectionTimeout")
.setParams(jsonObject.toString())
.setEnable("1");
jobInfoService.insert(jobInfo);
} catch (Exception e) {
e.printStackTrace();
log.error("创建定时任务失败!");
}
其中BeanName是执行的类名,MethodName是方法名
java 转Cron表达式,适合执行一次任务的表达式
import java.text.SimpleDateFormat;
import java.util.Date;
public class CronUtil {
public static String getCron(Date date) {
String dateFormat = "ss mm HH dd MM ? yyyy";
SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
String formatTimeStr = null;
if (date != null) {
formatTimeStr = sdf.format(date);
}
return formatTimeStr;
}
}
|