很多情况下任务并非需要立即执行,而是需要在指定时间或指定频率执行,这不可能人工去操作,所以定时任务就出现了。
定时任务四种实现方案
-
Timer:这是java自带的java.util.Timer类,这个类允许你调度一个java.util.TimerTask任务。使用这种方式可以让你的程序按照某一个频度执行,但不能在指定时间运行。一般用的较少。 -
ScheduledExecutorService:也jdk自带的一个类;是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响。 -
Spring Task:Spring3.0以后自带的task,可以将它看成一个轻量级的Quartz,而且使用起来比Quartz简单许多。 -
Quartz:这是一个功能比较强大的的调度器,可以让你的程序在指定时间执行,也可以按照某一个频度执行,配置起来稍显复杂。 -
为什么不选用quartz? 虽然quartz性能更强,但spring Task比较简单,写了一个demo可以满足需求,就选用这个。当时项目赶进度,就使用这个了。 先有后优
本文主要介绍Spring Task的使用和实现原理。
Spring Task特点
SprngTask没有专门的包,是Spring 3.0自带的定时任务,其核心类位于spring-context包中,所以引入spring的核心包此功能即可使用。可以将它看作成一个轻量级的Quartz,功能虽然没有Quartz那样强大,但是使用起来非常简单,无需增加额外的依赖,可直接上手使用。它具备如下特点
-
默认单线程同步执行 SpringTask 默认是单线程的,不同定时任务使用的都是同一个线程;当存在多个定时任务时,若当前任务执行时间过长则可能导致下一个任务无法执行。 在实际开发中,不希望所有的任务都运行在一个线程中。可在配置类中使用ScheduledTaskRegistrar#setTaskScheduler(TaskScheduler taskScheduler),SpringTask提供一个基于多线程的TaskScheduler接口的实现类(譬如ThreadPoolTaskScheduler类),Spring默认使用ConcurrentTaskScheduler。 -
对异常的处理 在SpringTask中,一旦某个任务在执行过程中抛出异常,则整个定时器生命周期就结束,以后永远不会再执行定时器任务。需要手动处理异常 -
默认不适用分布式环境 Spring Task 并不是为分布式环境设计的,在分布式环境下,这种定时任务是不支持集群配置的,如果部署到多个节点上,各个节点之间并没有任何协调通讯机制,集群的节点之间是不会共享任务信息的,每个节点上的任务都会按时执行,导致任务的重复执行。 我们可以使用支持分布式的定时任务调度框架,比如 Quartz、XXL-Job、Elastic Job。 当然你可以借助 zookeeper 、 redis 等实现分布式锁来处理各个节点的协调问题。 或者把所有的定时任务抽成单独的服务单独部署。 本项目即是需要使用分布式锁
在spring boot中Spring Task的三种使用方式
基于@Scheduled注解(静态调度,单线程)
使用该方式只需完成两步操作
-
使用@EnableScheduling 注解开启定时任务,该注解放在启动类上。 @SpringBootApplication
@EnableScheduling
public class ScheduledDemoApplication
{
public static void main(String[] args)
{
SpringApplication.run(ScheduledDemoApplication.class, args);
}
}
-
使用@Scheduled 注解定义定时任务 public class SaticScheduleTask {
@Scheduled(cron = "0/5 * * * * ?")
private void configureTasks() {
System.err.println("执行静态定时任务时间: " + LocalDateTime.now());
}
}
其中,使用这个注解的方法必需无参,无返回值,若有返回值将被忽略。 @Scheduled 注解的参数有以下四种,分别对应四种任务调度策略
-
cron表达式 “秒 分 时 天 月 周”,其他语法参照资料 cron表达式配置了在哪一刻执行任务,会在配置的任务开始时间判断任务是否可以执行,如果能则执行,不能则会跳过本次执行;每次任务不一定都会执行。 强调在某时某分某刻执行定时任务 -
fixedDelay (固定的延迟,结束-开始的间隔) 设定上一个任务结束后多久执行下一个任务,即上一任务的结束时间和下一任务的开始时间 每次任务都会执行 -
fixedRate (固定的频率,开始-开始的间隔) 设定上一个任务开始后多久执行下一个任务,即上一个任务的开始时间到下一个任务开始时间的间隔 特别地,若到达任务的开始执行时间,但上一个任务却没有完成时,spring会等待上一个任务执行完,并立即开始执行本次任务。 每次任务都会执行 -
initialDelay(初始化延迟) 需要配合fixedDelay或fixedRate使用 设定延迟多长时间后开始执行第一次定时任务,其后按照fixedDelay或fixedRate的逻辑执行。
基于SchedulingConfigurer接口(动态调度,单线程)
在基于基于@Scheduled注解(静态方式)中,每个一个定时任务都在系统启动前定义完成,启动之后无法修改。而基于SchedulingConfigurer接口的方式可以实现系统可在启动之后,从数据库中读取定时任务调度策略,实现动态调度定时任务。
包含以下操作:
-
配置环境 引入maven依赖、配置数据库连接、插入cron表达式 -
实现SchedulingConfigurer接口,重写configureTasks,并使用@EnableScheduling注解开启定时任务 @Configuration
@EnableScheduling
public class DynamicScheduleTask implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.addTriggerTask(
() -> System.out.println("执行动态定时任务: " + LocalDateTime.now().toLocalTime()),
triggerContext -> {
String cron = cronMapper.getCron();
if (StringUtils.isEmpty(cron)) {
}
return new CronTrigger(cron).nextExecutionTime(triggerContext);
}
);
}
}
基于SchedulingConfigurer接口(动态调度,多线程)
需要注意的是,SchedulingConfigurer 默认使用的也是单线程的方式,如果需要配置多线程,则需要指定线程池,有以下两种方式
-
在SchedulingConfigurer #configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar)方法中,使用ScheduledTaskRegistrar.setTaskScheduler(TaskScheduler taskScheduler)设置线程池。 线程池使用ThreadPoolTaskScheduler类型。
@Configuration
@EnableScheduling
public class AsyncTaskConfig implements SchedulingConfigurer{
private int corePoolSize = 5;
@Bean
public ThreadPoolTaskScheduler taskScheduler()
{
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.initialize();
scheduler.setPoolSize(corePoolSize);
return scheduler;
}
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
scheduledTaskRegistrar.setTaskScheduler(taskScheduler());
}
}
@Component
public class ChooseCourseTask {
private static final Logger LOGGER = LoggerFactory.getLogger(ChooseCourseTask.class);
@Scheduled(cron="0/3 * * * * *")
public void task1(){
LOGGER.info(Thread.currentThread().getName()+"===task run");
LOGGER.info("===============测试定时任务1开始===============");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOGGER.info("===============测试定时任务1结束===============");
}
@Scheduled(cron="0/3 * * * * *")
public void task2(){
LOGGER.info(Thread.currentThread().getName()+"===task run");
LOGGER.info("===============测试定时任务2开始===============");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOGGER.info("===============测试定时任务2结束===============");
}
2022-06-15 14:08:02.007 [taskScheduler-4] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务1结束===============
2022-06-15 14:08:02.007 [taskScheduler-8] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务2结束===============
2022-06-15 14:08:03.000 [taskScheduler-6] INFO c.guoj.order.mq.ChooseCourseTask - taskScheduler-6===task run
2022-06-15 14:08:03.000 [taskScheduler-6] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务2开始===============
2022-06-15 14:08:03.000 [taskScheduler-4] INFO c.guoj.order.mq.ChooseCourseTask - taskScheduler-4===task run
2022-06-15 14:08:03.000 [taskScheduler-4] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务1开始===============
2022-06-15 14:08:08.013 [taskScheduler-4] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务1结束===============
2022-06-15 14:08:08.013 [taskScheduler-6] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务2结束===============
2022-06-15 14:08:09.009 [taskScheduler-4] INFO c.guoj.order.mq.ChooseCourseTask - taskScheduler-4===task run
2022-06-15 14:08:09.009 [taskScheduler-7] INFO c.guoj.order.mq.ChooseCourseTask - taskScheduler-7===task run
2022-06-15 14:08:09.009 [taskScheduler-4] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务1开始===============
2022-06-15 14:08:09.009 [taskScheduler-7] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务2开始===============
-
@Async异步+线程池的两种方式 这种方式需要实现AsyncConfigurer、SchedulingConfigurer接口,在实现类中配置线程池信息。同时在调度任务处使用@Async注解 @Async注解可用在类或方法上。
@Configuration
@EnableScheduling
public class AsTest implements AsyncConfigurer {
@Bean
public ThreadPoolTaskScheduler taskScheduler()
{
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.initialize();
scheduler.setPoolSize(10);
return scheduler;
}
@Override
public Executor getAsyncExecutor() {
Executor executor = taskScheduler();
return executor;
}
@Component
public class ChooseCourseTask {
private static final Logger LOGGER = LoggerFactory.getLogger(ChooseCourseTask.class);
@Scheduled(cron="0/3 * * * * *")
@Async
public void task1(){
LOGGER.info(Thread.currentThread().getName()+"===task run");
LOGGER.info("===============测试定时任务1开始===============");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOGGER.info("===============测试定时任务1结束===============");
}
@Scheduled(cron="0/3 * * * * *")
@Async
public void task2(){
LOGGER.info(Thread.currentThread().getName()+"===task run");
LOGGER.info("===============测试定时任务2开始===============");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOGGER.info("===============测试定时任务2结束===============");
}
2022-06-15 14:12:21.007 [taskScheduler-2] INFO c.guoj.order.mq.ChooseCourseTask - taskScheduler-2===task run
2022-06-15 14:12:21.007 [taskScheduler-1] INFO c.guoj.order.mq.ChooseCourseTask - taskScheduler-1===task run
2022-06-15 14:12:21.007 [taskScheduler-2] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务2开始===============
2022-06-15 14:12:21.007 [taskScheduler-1] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务1开始===============
2022-06-15 14:12:26.013 [taskScheduler-1] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务1结束===============
2022-06-15 14:12:26.013 [taskScheduler-2] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务2结束===============
2022-06-15 14:12:27.009 [taskScheduler-3] INFO c.guoj.order.mq.ChooseCourseTask - taskScheduler-3===task run
2022-06-15 14:12:27.009 [taskScheduler-4] INFO c.guoj.order.mq.ChooseCourseTask - taskScheduler-4===task run
2022-06-15 14:12:27.009 [taskScheduler-3] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务2开始===============
2022-06-15 14:12:27.009 [taskScheduler-4] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务1开始===============
2022-06-15 14:12:32.011 [taskScheduler-4] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务1结束===============
2022-06-15 14:12:32.011 [taskScheduler-3] INFO c.guoj.order.mq.ChooseCourseTask - ===============测试定时任务2结束===============
实现原理
Spring Task定时任务的基本使用中,主要使用了@EnableScheduling 和@Scheduled ,前者用户开启定时任务,后者用于标记定时任务的执行逻辑,即先开启后使用。因此先分析@EnableScheduling 注解。
@EnableScheduling注解
该注解的代码源码很简单,其核心在于引入了一个SchedulingConfiguration.class 配置类
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({SchedulingConfiguration.class})
@Documented
public @interface EnableScheduling {
}
SchedulingConfiguration.class 配置类的源码也很简单,只是创建了一个ScheduledAnnotationBeanPostProcessor 实例。从名字上看,该实例一种BeanPostProcessor (后置处理器)。
后置处理器作用是在Bean对象在实例化和依赖注入完毕后,在显示调用初始化方法的前后添加我们自己的逻辑。注意是Bean实例化完毕后及依赖注入完成后触发的。
@Configuration
@Role(2)
public class SchedulingConfiguration {
public SchedulingConfiguration() {
}
@Bean(
name = {"org.springframework.context.annotation.internalScheduledAnnotationProcessor"}
)
@Role(2)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
综上,@EnableScheduling 注解是通过创建一个ScheduledAnnotationBeanPostProcessor 实例实现开启定时任务的。
ScheduledAnnotationBeanPostProcessor类
查看源码可知,该类实现一堆接口,此处不再逐个介绍这些接口,主要分析该类实现开启定时任务的实现逻辑。
public class ScheduledAnnotationBeanPostProcessor implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor, Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware, SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
....
}
postProcessAfterInitialization方法
该方法位于ScheduledAnnotationBeanPostProcessor类中,主要作用是找出全部被@Scheduled 注解标记的方法,并调用processScheduled 方法进行下一步处理。
public Object postProcessAfterInitialization(Object bean, String beanName) {
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass)) {
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (method) -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);
return !scheduledMethods.isEmpty() ? scheduledMethods : null;
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (this.logger.isTraceEnabled()) {
this.logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass());
}
} else {
annotatedMethods.forEach((method, scheduledMethods) -> {
scheduledMethods.forEach((scheduled) -> {
this.processScheduled(scheduled, method, bean);
});
});
if (this.logger.isDebugEnabled()) {
this.logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods);
}
}
}
return bean;
}
processScheduled方法
该方法位于ScheduledAnnotationBeanPostProcessor类中,该类主要作用是
- 1)检测被@Scheduled注解是否有参数
- 2)被@Scheduled注解标注的方法是否是无参且无返回值
- 3)使用ScheduledTaskRegistrar注册定时任务,后加入任务列表
public class ScheduledAnnotationBeanPostProcessor implements xxx{
private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar();
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
boolean processedSchedule = false;
String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet(4);
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0L) {
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0L) {
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
((Set)registeredTasks).addAll(tasks);
}
}
ScheduledTaskRegistrar类
该类实现了三个接口,其中
- InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。
- DisposableBean接口和InitializingBean接口一样,为bean提供了释放资源方法的方式,它只包括destroy方法,凡是继承该接口的类,在bean被销毁之前都会执行该方法。
- ScheduledTaskHolder接口定义返回当前实例的任务列表。
public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean {
@Nullable
private TaskScheduler taskScheduler;
@Nullable
private ScheduledExecutorService localExecutor;
@Nullable
private List<TriggerTask> triggerTasks;
@Nullable
private List<CronTask> cronTasks;
@Nullable
private List<IntervalTask> fixedRateTasks;
@Nullable
private List<IntervalTask> fixedDelayTasks;
private final Map<Task, ScheduledTask> unresolvedTasks = new HashMap<>(16);
private final Set<ScheduledTask> scheduledTasks = new LinkedHashSet<>(16);
public void setTaskScheduler(TaskScheduler taskScheduler) {
Assert.notNull(taskScheduler, "TaskScheduler must not be null");
this.taskScheduler = taskScheduler;
}
public void setFixedDelayTasksList(List<IntervalTask> fixedDelayTasks) {
this.fixedDelayTasks = fixedDelayTasks;
}
public List<IntervalTask> getFixedDelayTaskList() {
return (this.fixedDelayTasks != null ? Collections.unmodifiableList(this.fixedDelayTasks) :
Collections.emptyList());
}
public void addCronTask(CronTask task) {
if (this.cronTasks == null) {
this.cronTasks = new ArrayList<>();
}
this.cronTasks.add(task);
}
@Override
public void afterPropertiesSet() {
scheduleTasks();
}
@SuppressWarnings("deprecation")
protected void scheduleTasks() {
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
@Nullable
public ScheduledTask scheduleCronTask(CronTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
if (this.taskScheduler != null) {
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
}
else {
addCronTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
@Deprecated
@Nullable
public ScheduledTask scheduleFixedRateTask(IntervalTask task) {
FixedRateTask taskToUse = (task instanceof FixedRateTask ? (FixedRateTask) task :
new FixedRateTask(task.getRunnable(), task.getInterval(), task.getInitialDelay()));
return scheduleFixedRateTask(taskToUse);
}
@Override
public Set<ScheduledTask> getScheduledTasks() {
return Collections.unmodifiableSet(this.scheduledTasks);
}
@Override
public void destroy() {
for (ScheduledTask task : this.scheduledTasks) {
task.cancel();
}
if (this.localExecutor != null) {
this.localExecutor.shutdownNow();
}
}
}
总结
- spring启动过程中读取到EnableScheduling注解,然后执行解析封装等操作,解析完成之后,就创建了ScheduledAnnotationBeanPostProcessor实例
- ScheduledAnnotationBeanPostProcessor#postProcessAfterInitialization()方法拦截被@Scheduled修饰方法
- ScheduledAnnotationBeanPostProcessor#processScheduled()方法解析@Scheduled注解的参数,加入任务列表,注册定时任务
- ScheduledTaskRegistrar在spring初始化Bean时触发定时任务的执行,具体执行由ConcurrentTaskScheduler实例完成
参考资料
|