一、摘要
- springboot + quartz + postgres实现持久化分布式调度
- 集群环境任务调度测试
二、Quartz 集群架构
Quartz 是 Java 领域最著名的开源任务调度工具。
在上篇文章中,我们详细的介绍了 Quartz 的单体应用实践,如果只在单体环境中应用,Quartz 未必是最好的选择,例如Spring Scheduled 一样也可以实现任务调度,并且与SpringBoot 无缝集成,支持注解配置,非常简单,但是它有个缺点就是在集群环境下,会导致任务被重复调度!
而与之对应的 Quartz 提供了极为广用的特性,如任务持久化、集群部署和分布式调度任务等等,正因如此,基于 Quartz 任务调度功能在系统开发中应用极为广泛!
在集群环境下,Quartz 集群中的每个节点是一个独立的 Quartz 应用,没有负责集中管理的节点,而是通过数据库表来感知另一个应用,利用数据库锁的方式来实现集群环境下进行并发控制,每个任务当前运行的有效节点有且只有一个!
特别需要注意的是:分布式部署时需要保证各个节点的系统时间一致!
三、数据表初始化
数据库表结构去官网下载,地址:quartz官网地址
这里下载2.3.0?
?
?
选择对应的数据库脚本,我们选择的是postgres
?
-- Thanks to Patrick Lightbody for submitting this...
--
-- In your Quartz properties file, you'll need to set
-- org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
DROP TABLE IF EXISTS qrtz_fired_triggers;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS qrtz_simple_triggers;
DROP TABLE IF EXISTS qrtz_cron_triggers;
DROP TABLE IF EXISTS qrtz_simprop_triggers;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS qrtz_triggers;
DROP TABLE IF EXISTS qrtz_job_details;
DROP TABLE IF EXISTS qrtz_calendars;
CREATE TABLE qrtz_job_details
(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE BOOL NOT NULL,
IS_NONCONCURRENT BOOL NOT NULL,
IS_UPDATE_DATA BOOL NOT NULL,
REQUESTS_RECOVERY BOOL NOT NULL,
JOB_DATA BYTEA NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);
CREATE TABLE qrtz_triggers
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT NULL,
PREV_FIRE_TIME BIGINT NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT NOT NULL,
END_TIME BIGINT NULL,
CALENDAR_NAME VARCHAR(200) NULL,
MISFIRE_INSTR SMALLINT NULL,
JOB_DATA BYTEA NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
);
CREATE TABLE qrtz_simple_triggers
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
REPEAT_COUNT BIGINT NOT NULL,
REPEAT_INTERVAL BIGINT NOT NULL,
TIMES_TRIGGERED BIGINT NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE qrtz_cron_triggers
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
CRON_EXPRESSION VARCHAR(120) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE qrtz_simprop_triggers
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
STR_PROP_1 VARCHAR(512) NULL,
STR_PROP_2 VARCHAR(512) NULL,
STR_PROP_3 VARCHAR(512) NULL,
INT_PROP_1 INT NULL,
INT_PROP_2 INT NULL,
LONG_PROP_1 BIGINT NULL,
LONG_PROP_2 BIGINT NULL,
DEC_PROP_1 NUMERIC(13,4) NULL,
DEC_PROP_2 NUMERIC(13,4) NULL,
BOOL_PROP_1 BOOL NULL,
BOOL_PROP_2 BOOL NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE qrtz_blob_triggers
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
BLOB_DATA BYTEA NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
CREATE TABLE qrtz_calendars
(
SCHED_NAME VARCHAR(120) NOT NULL,
CALENDAR_NAME VARCHAR(200) NOT NULL,
CALENDAR BYTEA NOT NULL,
PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
);
CREATE TABLE qrtz_paused_trigger_grps
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
);
CREATE TABLE qrtz_fired_triggers
(
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
FIRED_TIME BIGINT NOT NULL,
SCHED_TIME BIGINT NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(200) NULL,
JOB_GROUP VARCHAR(200) NULL,
IS_NONCONCURRENT BOOL NULL,
REQUESTS_RECOVERY BOOL NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID)
);
CREATE TABLE qrtz_scheduler_state
(
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
LAST_CHECKIN_TIME BIGINT NOT NULL,
CHECKIN_INTERVAL BIGINT NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
);
CREATE TABLE qrtz_locks
(
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME)
);
create index idx_qrtz_j_req_recovery on qrtz_job_details(SCHED_NAME,REQUESTS_RECOVERY);
create index idx_qrtz_j_grp on qrtz_job_details(SCHED_NAME,JOB_GROUP);
create index idx_qrtz_t_j on qrtz_triggers(SCHED_NAME,JOB_NAME,JOB_GROUP);
create index idx_qrtz_t_jg on qrtz_triggers(SCHED_NAME,JOB_GROUP);
create index idx_qrtz_t_c on qrtz_triggers(SCHED_NAME,CALENDAR_NAME);
create index idx_qrtz_t_g on qrtz_triggers(SCHED_NAME,TRIGGER_GROUP);
create index idx_qrtz_t_state on qrtz_triggers(SCHED_NAME,TRIGGER_STATE);
create index idx_qrtz_t_n_state on qrtz_triggers(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
create index idx_qrtz_t_n_g_state on qrtz_triggers(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
create index idx_qrtz_t_next_fire_time on qrtz_triggers(SCHED_NAME,NEXT_FIRE_TIME);
create index idx_qrtz_t_nft_st on qrtz_triggers(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
create index idx_qrtz_t_nft_misfire on qrtz_triggers(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
create index idx_qrtz_t_nft_st_misfire on qrtz_triggers(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
create index idx_qrtz_t_nft_st_misfire_grp on qrtz_triggers(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);
create index idx_qrtz_ft_trig_inst_name on qrtz_fired_triggers(SCHED_NAME,INSTANCE_NAME);
create index idx_qrtz_ft_inst_job_req_rcvry on qrtz_fired_triggers(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
create index idx_qrtz_ft_j_g on qrtz_fired_triggers(SCHED_NAME,JOB_NAME,JOB_GROUP);
create index idx_qrtz_ft_jg on qrtz_fired_triggers(SCHED_NAME,JOB_GROUP);
create index idx_qrtz_ft_t_g on qrtz_fired_triggers(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
create index idx_qrtz_ft_tg on qrtz_fired_triggers(SCHED_NAME,TRIGGER_GROUP);
commit;
其中,QRTZ_LOCKS 就是 Quartz 集群实现同步机制的行锁表!?
四、Quartz?springboot代码实现
4.1、创建springboot项目,导入maven依赖包?
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
?4.2、注册 Quartz 任务工厂
/**
* @Author haylee
* @Date 2021/4/13 15:50
* @Version 1.0
* @Description 解决job中service注入为空的问题。
*/
@Component
public class SpringJobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//调用父类的方法
Object jobInstance = super.createJobInstance(bundle);
//进行注入
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
4.3、注册调度工厂
这里我们使用spring默认的数据源,网上基本都是独立数据源,很少人想到怎么使用spring数据源。因为spring数据源datasource已经被spring容器管理,我们只需要实例SchedulerFactoryBean是注入就可以了。?
/**
* @Author haylee
* @Date 2021/4/13 15:50
* @Version 1.0
* @Description TODO
*/
@Configuration
public class QuartzConfig {
/**
* 重写AdaptableJobFactory,解决service注入为null的问题
*/
@Autowired
private SpringJobFactory springJobFactory;
@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setDataSource(dataSource);
//quartz参数
Properties prop = new Properties();
prop.put("org.quartz.scheduler.instanceName", "safeScheduler");
prop.put("org.quartz.scheduler.instanceId", "AUTO");
//线程池配置
prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
prop.put("org.quartz.threadPool.threadCount", "20");
prop.put("org.quartz.threadPool.threadPriority", "5");
//JobStore配置
prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
//集群配置
prop.put("org.quartz.jobStore.isClustered", "true");
prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000");
prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
prop.put("org.quartz.jobStore.misfireThreshold", "12000");
prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?");
//PostgreSQL数据库,需要打开此注释
prop.put("org.quartz.jobStore.driverDelegateClass", "org.quartz.impl.jdbcjobstore.PostgreSQLDelegate");
factory.setQuartzProperties(prop);
factory.setSchedulerName("safeScheduler");
//延时启动
factory.setStartupDelay(30);
factory.setApplicationContextSchedulerContextKey("applicationContextKey");
//可选,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了
factory.setOverwriteExistingJobs(true);
//设置自动启动,默认为true
factory.setAutoStartup(true);
factory.setJobFactory(springJobFactory);
return factory;
}
@Bean
public Scheduler scheduler(DataSource dataSource) {
return schedulerFactoryBean(dataSource).getScheduler();
}
}
4.4、编写quartz任务实现类
/**
* @Author haylee
* @Date 2021/4/13 15:50
* @Version 1.0
* @Description TODO
*/
@Service
public class QuartzJobService {
@Autowired
private Scheduler scheduler;
/**
* 创建定时任务Simple
* quartzBean.getInterval()==null表示单次提醒,
* 否则循环提醒(quartzBean.getEndTime()!=null)
* @param quartzBean
*/
public void createScheduleJobSimple(QuartzBean quartzBean) throws Exception{
//获取到定时任务的执行类 必须是类的绝对路径名称
//定时任务类需要是job类的具体实现 QuartzJobBean是job的抽象类。
Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(quartzBean.getJobClass());
// 构建定时任务信息
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.withIdentity(quartzBean.getJobName(), ObjectUtils.isNotEmpty(quartzBean.getJobGroup()) ?quartzBean.getJobGroup():null)
.setJobData(quartzBean.getJobDataMap())
.build();
// 设置定时任务执行方式
SimpleScheduleBuilder simpleScheduleBuilder = null;
if (quartzBean.getInterval() == null) { //单次
simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule();
} else { //循环
simpleScheduleBuilder = SimpleScheduleBuilder.repeatMinutelyForever(quartzBean.getInterval());
}
// 构建触发器trigger
Trigger trigger = null;
if (quartzBean.getInterval() == null) { //单次
trigger = TriggerBuilder.newTrigger()
.withIdentity(quartzBean.getJobName(),ObjectUtils.isNotEmpty(quartzBean.getJobGroup()) ?quartzBean.getJobGroup():null)
.withSchedule(simpleScheduleBuilder)
.startAt(quartzBean.getStartTime())
.build();
} else { //循环
trigger = TriggerBuilder.newTrigger()
.withIdentity(quartzBean.getJobName(),ObjectUtils.isNotEmpty(quartzBean.getJobGroup()) ?quartzBean.getJobGroup():null)
.withSchedule(simpleScheduleBuilder)
.startAt(quartzBean.getStartTime())
.endAt(quartzBean.getEndTime())
.build();
}
scheduler.scheduleJob(jobDetail, trigger);
}
/**
* 创建定时任务Cron
* 定时任务创建之后默认启动状态
* @param quartzBean 定时任务信息类
* @throws Exception
*/
public void createScheduleJobCron(QuartzBean quartzBean) throws Exception{
//获取到定时任务的执行类 必须是类的绝对路径名称
//定时任务类需要是job类的具体实现 QuartzJobBean是job的抽象类。
Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(quartzBean.getJobClass());
// 构建定时任务信息
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(quartzBean.getJobName(),ObjectUtils.isNotEmpty(quartzBean.getJobGroup()) ?quartzBean.getJobGroup():null).setJobData(quartzBean.getJobDataMap()).build();
// 设置定时任务执行方式
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(quartzBean.getCronExpression());
// 构建触发器trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(quartzBean.getJobName()).withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
}
/**
* 根据任务名称暂停定时任务
* @param jobName 定时任务名称
* @param jobGroup 任务组(没有分组传值null)
* @throws Exception
*/
public void pauseScheduleJob(String jobName,String jobGroup) throws Exception{
JobKey jobKey = JobKey.jobKey(jobName,ObjectUtils.isNotEmpty(jobGroup) ?jobGroup:null);
scheduler.pauseJob(jobKey);
}
/**
* 根据任务名称恢复定时任务
* @param jobName 定时任务名
* @param jobGroup 任务组(没有分组传值null)
* @throws SchedulerException
*/
public void resumeScheduleJob(String jobName,String jobGroup) throws Exception {
JobKey jobKey = JobKey.jobKey(jobName,ObjectUtils.isNotEmpty(jobGroup) ?jobGroup:null);
scheduler.resumeJob(jobKey);
}
/**
* 根据任务名称立即运行一次定时任务
* @param jobName 定时任务名称
* @param jobGroup 任务组(没有分组传值null)
* @throws SchedulerException
*/
public void runOnce(String jobName,String jobGroup) throws Exception{
JobKey jobKey = JobKey.jobKey(jobName,ObjectUtils.isNotEmpty(jobGroup) ?jobGroup:null);
scheduler.triggerJob(jobKey);
}
/**
* 更新定时任务Simple
* @param quartzBean 定时任务信息类
* @throws SchedulerException
*/
public void updateScheduleJobSimple(QuartzBean quartzBean) throws Exception {
//获取到对应任务的触发器
TriggerKey triggerKey = TriggerKey.triggerKey(quartzBean.getJobName(), ObjectUtils.isNotEmpty(quartzBean.getJobGroup()) ?quartzBean.getJobGroup():null);
// 设置定时任务执行方式
SimpleScheduleBuilder simpleScheduleBuilder = null;
if (quartzBean.getInterval() == null) { //单次
simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule();
} else { //循环
simpleScheduleBuilder = SimpleScheduleBuilder.repeatMinutelyForever(quartzBean.getInterval());
}
// 构建触发器trigger
Trigger trigger = null;
if (quartzBean.getInterval() == null) { //单次
trigger = TriggerBuilder.newTrigger()
.withIdentity(quartzBean.getJobName(), ObjectUtils.isNotEmpty(quartzBean.getJobGroup()) ?quartzBean.getJobGroup():null)
.withSchedule(simpleScheduleBuilder)
.startAt(quartzBean.getStartTime())
.build();
} else { //循环
TriggerBuilder.newTrigger()
.withIdentity(quartzBean.getJobName(), ObjectUtils.isNotEmpty(quartzBean.getJobGroup()) ?quartzBean.getJobGroup():null)
.withSchedule(simpleScheduleBuilder)
.startAt(quartzBean.getStartTime())
.endAt(quartzBean.getEndTime())
.build();
}
//重置对应的job
scheduler.rescheduleJob(triggerKey, trigger);
}
/**
* 更新定时任务Cron
* @param quartzBean 定时任务信息类
* @throws SchedulerException
*/
public void updateScheduleJobCron(QuartzBean quartzBean) throws Exception {
//获取到对应任务的触发器
TriggerKey triggerKey = TriggerKey.triggerKey(quartzBean.getJobName());
//设置定时任务执行方式
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(quartzBean.getCronExpression());
//重新构建任务的触发器trigger
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
//重置对应的job
scheduler.rescheduleJob(triggerKey, trigger);
}
/**
* 根据定时任务名称从调度器当中删除定时任务
* @param jobName 定时任务名称
* @param jobGroup 任务组(没有分组传值null)
* @throws SchedulerException
*/
public void deleteScheduleJob(String jobName,String jobGroup) throws Exception {
JobKey jobKey = JobKey.jobKey(jobName,ObjectUtils.isNotEmpty(jobGroup) ?jobGroup:null);
scheduler.deleteJob(jobKey);
}
/**
* 获取任务状态
* @param jobName
* @param jobGroup 任务组(没有分组传值null)
* @return
* (" BLOCKED ", " 阻塞 ");
* ("COMPLETE", "完成");
* ("ERROR", "出错");
* ("NONE", "不存在");
* ("NORMAL", "正常");
* ("PAUSED", "暂停");
*/
public String getScheduleJobStatus(String jobName,String jobGroup) throws Exception {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName,ObjectUtils.isNotEmpty(jobGroup) ?jobGroup:null);
Trigger.TriggerState state = scheduler.getTriggerState(triggerKey);
return state.name();
}
/**
* 根据定时任务名称来判断任务是否存在
* @param jobName 定时任务名称
* @param jobGroup 任务组(没有分组传值null)
* @throws SchedulerException
*/
public Boolean checkExistsScheduleJob(String jobName,String jobGroup) throws Exception {
JobKey jobKey = JobKey.jobKey(jobName, ObjectUtils.isNotEmpty(jobGroup) ?jobGroup:null);
return scheduler.checkExists(jobKey);
}
/**
* 根据任务組刪除定時任务
* @param jobGroup 任务组
* @throws SchedulerException
*/
public Boolean deleteGroupJob(String jobGroup) throws Exception {
GroupMatcher<JobKey> matcher = GroupMatcher.groupEquals(jobGroup);
Set<JobKey> jobkeySet = scheduler.getJobKeys(matcher);
List<JobKey> jobkeyList = new ArrayList<JobKey>();
jobkeyList.addAll(jobkeySet);
return scheduler.deleteJobs(jobkeyList);
}
/**
* 根据任务組批量刪除定時任务
* @param jobkeyList
* @throws SchedulerException
*/
public Boolean batchDeleteGroupJob(List<JobKey> jobkeyList) throws Exception {
return scheduler.deleteJobs(jobkeyList);
}
/**
* 根据任务組批量查询出jobkey
* @param jobGroup 任务组
* @throws SchedulerException
*/
public void batchQueryGroupJob(List<JobKey> jobkeyList,String jobGroup) throws Exception {
GroupMatcher<JobKey> matcher = GroupMatcher.groupEquals(jobGroup);
Set<JobKey> jobkeySet = scheduler.getJobKeys(matcher);
jobkeyList.addAll(jobkeySet);
}
/**
* 根据jobkey查询job详情
* @param jobKey
* @return
*/
public JobDetail queryJobDetail(JobKey jobKey){
JobDetail jobDetail = null;
try {
jobDetail = scheduler.getJobDetail(jobKey);
} catch (SchedulerException e) {
e.printStackTrace();
}
return jobDetail;
}
}
4.5、编写quartz任务实体类
/**
* @Author haylee
* @Date 2021/4/13 15:50
* @Version 1.0
* @Description 任务实体
*/
@Data
public class QuartzBean {
/** 任务id */
private String id;
/** 任务名称 */
private String jobName;
/** 任务组 */
private String jobGroup;
/** 任务执行类 */
private String jobClass;
/** 任务状态 启动还是暂停*/
private Integer status;
/**
* 任务开始时间
*/
private Date startTime;
/**
* 任务循环间隔-单位:分钟
*/
private Integer interval;
/**
* 任务结束时间
*/
private Date endTime;
/** 任务运行时间表达式 */
private String cronExpression;
private JobDataMap jobDataMap;
}
?4.6、编写quartz任务类
@Component
public class MyTask extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
JobKey jobKey = context.getJobDetail().getKey();
JobDataMap map = context.getJobDetail().getJobDataMap();
String userId = map.getString("userId");
System.out.println("SimpleJob says: " + jobKey + ", userId: " + userId + " executing at " + new Date());
}
}
??4.7、编写quartz任务操作
?
@RestController
@RequestMapping("/api/quartz/")
public class JobController {
@Autowired
private QuartzJobService quartzJobService;
//创建&启动
@GetMapping("startSimpleJob")
public String startSimpleJob() throws SchedulerException, ClassNotFoundException, ParseException {
QuartzBean quartzBean = new QuartzBean();
quartzBean.setJobClass("com.quartz.demo.job.MyTask");
quartzBean.setJobName("job1");
JobDataMap map = new JobDataMap();
map.put("userId", "123456");
quartzBean.setJobDataMap(map);
Date now = new Date();
quartzBean.setStartTime(DateUtils.addSeconds(now, 10));
quartzBean.setInterval(10);
quartzBean.setEndTime(DateUtils.addMinutes(now, 1));
try {
quartzJobService.createScheduleJobSimple(quartzBean);
} catch (Exception e) {
e.printStackTrace();
}
return "startJob Success!";
}
/**
* 创建cron Job
* @param quartzBean
* @return
*/
@RequestMapping("/createCronJob")
@ResponseBody
public String createJob(QuartzBean quartzBean) {
try {
//进行测试所以写死
quartzBean.setJobClass("com.quartz.demo.job.MyTask");
quartzBean.setJobName("job1");
quartzBean.setCronExpression("*/5 * * * * ?");
quartzJobService.createScheduleJobCron(quartzBean);
} catch (Exception e) {
return "创建失败";
}
return "创建成功";
}
/**
* 暂停job
* @return
*/
@RequestMapping(value = {"/pauseJob/{jobName}","/pauseJob/{jobName}/{jobGroup}"})
@ResponseBody
public String pauseJob(@PathVariable("jobName") String jobName,@PathVariable(required = false) String jobGroup) {
try {
quartzJobService.pauseScheduleJob(jobName,ObjectUtils.isNotEmpty(jobGroup) ?jobGroup:null);
} catch (Exception e) {
return "暂停失败";
}
return "暂停成功";
}
@RequestMapping(value = {"/resume/{jobName}","/resume/{jobName}/{jobGroup}"})
@ResponseBody
public String resume(@PathVariable("jobName") String jobName,@PathVariable(required = false) String jobGroup) {
try {
quartzJobService.resumeScheduleJob(jobName,ObjectUtils.isNotEmpty(jobGroup) ?jobGroup:null);
} catch (Exception e) {
return "启动失败";
}
return "启动成功";
}
@RequestMapping(value = {"/delete/{jobName}","/delete/{jobName}/{jobGroup}"})
public String delete(@PathVariable("jobName") String jobName,@PathVariable(required = false) String jobGroup) {
try {
quartzJobService.deleteScheduleJob(jobName,ObjectUtils.isNotEmpty(jobGroup) ?jobGroup:null);
} catch (Exception e) {
return "删除失败";
}
return "删除成功";
}
@RequestMapping(value = {"/check/{jobName}","/check/{jobName}/{jobGroup}"})
public String check(@PathVariable("jobName") String jobName,@PathVariable(required = false) String jobGroup) {
try {
if(quartzJobService.checkExistsScheduleJob(jobName,ObjectUtils.isNotEmpty(jobGroup) ?jobGroup:null)){
return "存在定时任务:"+jobName;
}else{
return "不存在定时任务:"+jobName;
}
} catch (Exception e) {
return "查询任务失败";
}
}
@RequestMapping(value = {"/status/{jobName}","/status/{jobName}/{jobGroup}"})
@ResponseBody
public String status(@PathVariable("jobName") String jobName,@PathVariable(required = false) String jobGroup) {
try {
return quartzJobService.getScheduleJobStatus(jobName,ObjectUtils.isNotEmpty(jobGroup) ?jobGroup:null);
} catch (Exception e) {
return "获取状态失败";
}
//return "获取状态成功";
}
}
五、springboot集群部署测试
springboot服务集群部署,各个节点的系统时间必须一致,数据必须保证连接同一个数据库,这样才能做到分布式任务调度,任务不重复执行。
六、XXL-JOB(在quartz基础上封装的一个开源调度平台,有兴趣的可以研究下)
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
官网地址:分布式任务调度平台XXL-JOB
|