我们在ThreadPoolExecutor源码分析中分析了ThreadPoolExecutor的一些关键字段和方法,本文主要对ThreadPoolExecutor的子类ScheduledThreadPoolExecutor进行一些源码分析,本文基于JDK1.8.0_202。
JavaDoc介绍
首先介绍下ScheduledThreadPoolExecutor的javadoc中值得关注的点。
- ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,提供延迟执行或者周期执行任务的能力。
- 当任务在执行前被取消,任务不会执行,但是在默认情况下,任务直到过期才会从队列中移除。如果想让任务在取消时就从队列中移除,需要设置setRemoveOnCancelPolicy为true。如果我们要监控ScheduledThreadPoolExecutor的队列长度,需要额外关注下是否需要调用setRemoveOnCancelPolicy方法。
- scheduleAtFixedRate或者scheduleWithFixedDelay中任务的连续执行是不会相互重叠的,不同的执行可能在不同的线程中。
- 关于ThreadPoolExecutor的参数
- 在ScheduledThreadPoolExecutor中,由于使用的是无界队列,ThreadPoolExecutor中maximumPoolSize参数是无效的(因为线程数到达corePoolSize后,新增的任务会放到队列中)。
- 最好不要将corePoolSize设置成0、最好不要调用**allowCoreThreadTimeOut(true)**方法,因为这样设置的话,线程池中的线程数量可能会是0,这样有任务需要运行时需要新建线程,影响性能。
- 关于扩展:ScheduledThreadPoolExecutor提供decorateTask方法来定制化具体执行命令的任务,如下所示。
public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
static class CustomTask<V> implements RunnableScheduledFuture<V> {
// ...
}
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable r, RunnableScheduledFuture<V> task) {
return new CustomTask<V>(r, task);
}
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> c, RunnableScheduledFuture<V> task) {
return new CustomTask<V>(c, task);
} // ... add constructors, etc.
}
ScheduledThreadPoolExecutor的继承体系
ScheduledThreadPoolExecutor的继承体系如下所示,继承了ThreadPoolExecutor,实现了ScheduledExecutorService接口。
ScheduledExecutorService接口是对ExecutorService的扩展,提供延迟执行任务、固定间隔执行任务、固定延迟执行任务的能力。其中固定间隔和固定延迟执行都是周期性地执行任务,延迟执行任务是一次性的。
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
内部类分析
ScheduledThreadPoolExecutor的内部类包括ScheduledFutureTask和DelayedWorkQueue,ScheduledFutureTask是FutureTask的子类,是ScheduledThreadPoolExecutor默认执行的任务。DelayedWorkQueue是自定义的基于小根堆的延迟队列。
ScheduledFutureTask
ScheduledFutureTask的继承关系如下图所示。
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/** 序列号,当time相同时序列号小的排在前面*/
private final long sequenceNumber;
/** 任务应该执行的时间,单位是纳秒*/
private long time;
/**
* period的单位是纳秒。
* 如果period等于0,表示一次性任务。
* 如果period大于0,表示固定频率的任务。
* 如果period小于0,表示固定延迟的任务。
*/
private final long period;
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;
/**
* Index into delay queue, to support faster cancellation.
* 任务进入DelayedWorkQueue时,在数组中的索引,主要是在任务取消时使用,提高效率
*/
int heapIndex;
/**
* Creates a one-shot action with given nanoTime-based trigger time.
*/
ScheduledFutureTask(Runnable r, V result, long ns) {
// 调用FutureTask的构造方法
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* Creates a periodic action with given nano time and period.
*/
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* Creates a one-shot action with given nanoTime-based trigger time.
*/
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
/** 获取任务的延迟时间,单位是参数unit。也就是任务应该运行的时间减去当前时间 */
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
/** 实现Comparable接口 */
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
// 如果需要比较的任务是ScheduledFutureTask,则通过time和sequenceNumber进行比较
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
// diff 等于0则序列号小的更小
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
// 如果需要比较的任务不是ScheduledFutureTask,则通过getDelay进行比较
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
/**
* Returns {@code true} if this is a periodic (not a one-shot) action.
*
* @return {@code true} if periodic
*/
public boolean isPeriodic() {
// period等于0是一次性任务,大于0是固定频率任务,小于0是固定延迟任务
return period != 0;
}
/**
* Sets the next time to run for a periodic task.
*/
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
// p小于0表示固定延迟任务
time = triggerTime(-p);
}
public boolean cancel(boolean mayInterruptIfRunning) {
// 调用FutureTask的cancel
boolean cancelled = super.cancel(mayInterruptIfRunning);
// cancelled是true并且cancel时需要从队列中移除任务并且任务仍在队列中
if (cancelled && removeOnCancel && heapIndex >= 0)
// 从队列里移除任务
remove(this);
return cancelled;
}
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
// 线程池不是running状态或已经关闭了,则取消任务
cancel(false);
else if (!periodic)
// 使用FutureTask的run方法执行一次性任务
ScheduledFutureTask.super.run();
// 使用FutureTask的runAndReset执行周期性任务
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次运行时间
setNextRunTime();
// 将任务添加到队列中准备再次运行
reExecutePeriodic(outerTask);
}
}
}
DelayedWorkQueue
关于堆的简单介绍
堆是一棵完全二叉树,堆中某个结点的值总是不大于或不小于其父结点的值。将根结点最大的堆叫做最大堆或大根堆,根结点最小的堆叫做最小堆或小根堆。如图是一个小根堆。堆通常使用数组来保存。
- 堆中元素的索引。对于index是i的元素,它的父节点的index是(i - 1) / 2,它的左孩子index是2i + 1,右孩子的索引是2i + 2。
- 堆的主要操作。堆的两个核心方法是siftUp和siftDown,siftUp方法用于添加节点时的上溯过程,而siftDown方法用于删除节点时的下溯过程。这两个方法的目的就是在往堆中添加元素或者从堆中删除元素时,需要调整数组结构,使其重新成为一个堆。
DelayedWorkQueue的继承关系
DelayedWorkQueue是ScheduledThreadPoolExecutor中使用的队列,为了符合ThreadPoolExecutor的规范,DelayedWorkQueue也实现了BlockingQueue接口,尽管DelayedWorkQueue中只保存RunnableScheduledFuture。 与其他的DelayQueue和PriorityQueue一样,DelayedWorkQueue也是基于堆实现的,
- 当DelayedWorkQueue存储ScheduledFutureTask时,由于ScheduledFutureTask的headIndex字段可以记录任务在数组中的索引,因此在取消任务时,可以立刻找到相应的任务。在siftUp和siftDown方法中需要修改ScheduledFutureTask的heapIndex。当移除任务时,要将heapIndex设置为-1。
- 当DelayedWorkQueue存储RunnableScheduledFuture时,由于没有索引,因此在取消任务时需要遍历数组。
DelayedWorkQueue的字段如下所示,DelayedWorkQueue中的方法我们会在下文分析。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
/** 队列初始长度是16 */
private static final int INITIAL_CAPACITY = 16;
/** 内部使用RunnableScheduledFuture数组保存堆中的元素 */
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
/** 操作队列元素时使用的锁 */
private final ReentrantLock lock = new ReentrantLock();
/** 堆的大小 */
private int size = 0;
/**
* Thread designated to wait for the task at the head of the
* queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with a
* task with an earlier expiration time, the leader field is
* invalidated by being reset to null, and some waiting
* thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
* 这里使用了Leader-Follower多线程模式,目的是最小化不必要的线程等待。
* leader线程用于等待队列头部的任务。当一个线程成为leader时,它等待队列头部任务出队列,
* 其他线程则是在无限等待。leader线程在task或者poll返回前要通知其他线程。
* 当队列中加入新的过期时间更早的任务时,leader线程需要被置为null,waiting线程被通知。
* 因此waiting线程需要在等待时能够获取和失去leadership。
*/
private Thread leader = null;
}
scheduledAtFixedRate分析
scheduledAtFixedRate方法主要有三步
- 检查用户传入的参数是否合法
- 将Runnable包装成ScheduledFutureTask,进一步包装成RunnableScheduledFuture
- 使用delayedExecute执行RunnableScheduledFuture
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
// 参数检查
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 将任务封装成ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
// decorateTask提供扩展能力,默认返回sft
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 执行RunnableScheduledFuture
delayedExecute(t);
return t;
}
构造ScheduledFutureTask
构造出ScheduledFutureTask的代码如下,主要是使用triggerTime设置一下任务应该在什么时间执行,这里考虑了传入的delay过大时的溢出情况。
/**
* Returns the trigger time of a delayed action.
*/
private long triggerTime(long delay, TimeUnit unit) {
// 将triggerTime接收的参数变成大于等于0的值
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
* Returns the trigger time of a delayed action.
*/
long triggerTime(long delay) {
// delay小于Long.MAX_VALUE的一半则直接加在now上,否则需要处理下溢出
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
/**
* Constrains the values of all delays in the queue to be within
* Long.MAX_VALUE of each other, to avoid overflow in compareTo.
* This may occur if a task is eligible to be dequeued, but has
* not yet been, while some other task is added with a delay of
* Long.MAX_VALUE.
*/
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
// 当前延迟队列的头元素的delay小于0,说明到达延迟时间了但是还没出队列,
// delay - headDelay < 0说明溢出了
if (headDelay < 0 && (delay - headDelay < 0))
// 此时让delay变成一个正数,使其不能比队里头部的元素小,也就是队列头部的元素要最先出队列
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
// 序列号,delayTime相同时用序列号判断
this.sequenceNumber = sequencer.getAndIncrement();
}
delayedExecute
RunnableScheduledFuture的实际执行在delayedExecute方法中
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 调用父类ThreadPoolExecutor的isShutdown方法,线程池不是RUNNING状态则执行拒绝策略
if (isShutdown())
reject(task);
else {
// 将任务添加到DelayedWorkQueue
super.getQueue().add(task);
// 添加任务后可能线程池状态发生了变化,这里要再判断下是否能够执行任务,如果不能执行需要把刚刚添加的任务从延迟队列移除
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
// 线程池关闭了并且不能运行任务并且移除任务成功,则取消任务的执行
task.cancel(false);
else
// 走到这里说明线程池可以正常执行任务,调用ThreadPoolExecutor的 ensurePrestart方法看下是否需要增加线程
ensurePrestart();
}
}
判断线程池状态的一些方法
// ThreadPoolExecutor的isShutdown方法
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
// ThreadPoolExecutor的isRunning方法,ctl小于SHUTDOWN即为RUNNING状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// periodic不等于0表示周期性任务(包括fixRate和fixDelay),否则为只执行一次的延迟任务
// continueExistingPeriodicTasksAfterShutdown默认是false,也就是线程池是SHUTDOWN状态时不能继续执行周期性任务,可以通过setContinueExistingPeriodicTasksAfterShutdownPolicy方法进行设置
// executeExistingDelayedTasksAfterShutdown默认是true,也就是线程池是SHUTDOWN状态时可以继续执行一次性的延迟任务,可以通过setExecuteExistingDelayedTasksAfterShutdownPolicy方法进行设置
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
// ThreadPoolExecutor的isRunningOrShutdown方法
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
DelayedWorkQueue的add和remove方法
add方法
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
// 给DelayedWorkQueue加锁,这里final ReentrantLock lock = this.lock;仍是DougLea的极端优化手段,我们在ThreadPoolExecutor中说过
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 如果添加元素时超过队列长度,则扩容50%
if (i >= queue.length)
grow();
// 因为添加了1个元素,所以size增加1
size = i + 1;
if (i == 0) {
// 队列原来是空,则直接将新增元素放在index为0的位置
queue[0] = e;
setIndex(e, 0);
} else {
// 加e添加到数组末尾,需要执行上溯操作,维护堆的结构
siftUp(i, e);
}
if (queue[0] == e) {
// 如果新增的元素是延迟最小的,则不管有没有leader线程,都将其置为null
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
/**
* Resizes the heap array. Call only when holding lock.
*/
private void grow() {
int oldCapacity = queue.length;
// 扩容50%
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
// 如果扩容后越界了,则将newCapacity设置为Integer.MAX_VALUE
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
// 拷贝原数组
queue = Arrays.copyOf(queue, newCapacity);
}
/**
* Sets f's heapIndex if it is a ScheduledFutureTask.
*/
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
// 只有ScheduledFutureTask能够设置heapIndex
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}
/**
* Sifts element added at bottom up to its heap-ordered spot.
* Call only when holding lock.
* 从index k开始,小根堆向上调整的操作
*/
private void siftUp(int k, RunnableScheduledFuture<?> key) {
// 当k<=0的时候说明已经上溯到根节点了
while (k > 0) {
// 找到当前节点的parent
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
// 新增元素比parent大,则直接跳出循环,不需要调整堆结构
if (key.compareTo(e) >= 0)
break;
// 新增元素比parent小,需要调整k和parent的内容。这里把parent的内容设置到位置k
queue[k] = e;
// 设置heapIndex,这个方法要是在Spring中大概会被命名为setIndexIfNecessary
setIndex(e, k);
// k走到parent的位置继续循环
k = parent;
}
// 新增元素后,已经调整完小顶堆的结构了,把key设置到k位置
queue[k] = key;
// 设置k的index
setIndex(key, k);
}
remove方法
public boolean remove(Object x) {
// 同样要加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列中不存在x则直接返回false
int i = indexOf(x);
if (i < 0)
return false;
// 删除时将heapIndex设置为-1
setIndex(queue[i], -1);
int s = --size;
// 得到堆中最后一个元素,相当于把最后一个元素放到i位置然后调整堆
RunnableScheduledFuture<?> replacement = queue[s];
// 将堆中最后一个元素置为null
queue[s] = null;
// 如果要删除的元素就是堆中最后一个元素,则不需要调整堆结构
if (s != i) {
// 要删除的元素不是堆中最后一个元素,则需要从i位置进行向下调整的堆操作
siftDown(i, replacement);
// 如果调整后,i位置的元素和replacement相等,则要进行堆的向上调整操作
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
/**
* Finds index of given object, or -1 if absent.
*/
private int indexOf(Object x) {
if (x != null) {
if (x instanceof ScheduledFutureTask) {
// 如果x是ScheduledFutureTask,则获取heapIndex
int i = ((ScheduledFutureTask) x).heapIndex;
// Sanity check; x could conceivably be a
// ScheduledFutureTask from some other pool.
// 完备性校验,确保x还在队列中
if (i >= 0 && i < size && queue[i] == x)
return i;
} else {
// x不是ScheduledFutureTask,则遍历数组
for (int i = 0; i < size; i++)
if (x.equals(queue[i]))
return i;
}
}
return -1;
}
/**
* Sifts element added at top down to its heap-ordered spot.
* Call only when holding lock.
* 从index k开始,小顶堆向下调整的操作,在删除k位置的元素时使用。
*/
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
// k>=half说明到达叶子节点了,不需要再调整堆结构了
while (k < half) {
// 这块的整体逻辑是说,如果k比它的左孩子和右孩子中最小的要大,则需要交换k和最小值的位置
// k的左孩子
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
// k的右孩子
int right = child + 1;
// 右孩子存在并且左孩子比右孩子大
if (right < size && c.compareTo(queue[right]) > 0)
// child为右孩子,c也是右孩子的值
c = queue[child = right];
// 走到这里,c是左右孩子中的较小值,child是左右孩子较小值对应的索引
// 如果key比左右孩子最小值小,则不需要调整堆结构了
if (key.compareTo(c) <= 0)
break;
// 否则,k位置保存左右孩子的较小值,也就是把左右孩子的较小值上移到了k位置
queue[k] = c;
// 设置heapIndex
setIndex(c, k);
// k往下移动到左右孩子的较小值处继续循环
k = child;
}
// 堆结构调整完毕,把key设置到k位置,并设置heapIndex
queue[k] = key;
setIndex(key, k);
}
cancel方法
ScheduledFutureTask的cancel方法如下
public boolean cancel(boolean mayInterruptIfRunning) {
// 调用父类FutureTask的方法
boolean cancelled = super.cancel(mayInterruptIfRunning);
// 取消成功并且允许删除节点,并且当前节点存在于小顶堆中,则删除
// removeOnCancel默认时false,可以通过setRemoveOnCancelPolicy进行设置
if (cancelled && removeOnCancel && heapIndex >= 0)
// 从延迟队列中移除任务
remove(this);
return cancelled;
}
ensurePrestart方法
ensurePrestart定义在ThreadPoolExecutor中
/**
* Same as prestartCoreThread except arranges that at least one
* thread is started even if corePoolSize is 0.
*/
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 当前线程数量小于corePoolSize则增加一个核心线程
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
// 线程数量等于0,增加一个非核心线程
addWorker(null, false);
}
获取任务
ThreadPoolExecutor中的方法
在ThreadPoolExecutor中我们说过,addWorker方法成功增加线程后,Worker线程启动,执行ThreadPoolExecutor的runWorker方法,runWorker方法有两步,分别是获取任务和执行任务。
// Worker的run方法
public void run() {
runWorker(this);
}
// ThreadPoolExecutor的runWorker方法
final void runWorker(Worker w) {
Runnable task = w.firstTask;
...
while (task != null || (task = getTask()) != null) {
...
task.run();
...
}
}
// getTask获取任务,在ScheduledThreadPoolExecutor中allowCoreThreadTimeOut通常设置成false,并且DelayWorkQueue是无界队列,因此这里的timed通常是false,会走到队列的take方法
private Runnable getTask() {
...
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
...
DelayedWorkQueue的take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 在ThreadPoolExecutor的getTask方法中会调用take方法,这里要能够响应中断(ThreadPoolExecutor的getTask方法会处理中断),所以使用了lock.lockInterruptibly();
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
// 队列为空,进行阻塞(先释放掉锁,再阻塞)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// delay<=0说明已经到了延迟时间,可以出队列了
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
// leader不是null,那么当前线程需要变成follower线程,进行阻塞
available.await();
else {
// leader是null,则当前线程变成leader线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 超时阻塞。如果被唤醒的话可能是因为到达了延迟时间从而醒来;也有可能是被别的线程signal唤醒了;还有可能是中断被唤醒。正常情况下是等到达了延迟时间后,这里会醒来并进入到下一次循环中的finishPoll方法中,剔除队头节点并最终返回
available.awaitNanos(delay);
} finally {
// awaitNano唤醒之后,如果当前线程是leader线程,则将leader置为null,也就是当前线程不再是leader线程了
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 在退出本方法之前,如果leader线程为null并且删除队头后的延迟队列仍然不为空的话,说明此时有其他的延迟任务
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// s是最后一个元素的索引,这里要把队列最后一个元素和要删除的元素f进行交换,然后调整堆结构
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
// 将最后一个元素置为null
queue[s] = null;
if (s != 0)
// s不是0,也就是队列中还有其他元素,要向下调整堆结构
siftDown(0, x);
// 因为f从堆中移除了,要把index设为-1
setIndex(f, -1);
return f;
}
执行任务
ThreadPoolExecutor的runWorker方法最终会执行到ScheduledFutureTask的run方法,如下所示。
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
// 线程池不是running状态或已经关闭了并且不能执行任务了,则取消任务
cancel(false);
else if (!periodic)
// 使用FutureTask的run方法执行一次性任务
ScheduledFutureTask.super.run();
// 使用FutureTask的runAndReset执行周期性任务
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次运行时间
setNextRunTime();
// 将任务添加到队列中准备再次运行
reExecutePeriodic(outerTask);
}
}
public boolean isPeriodic() {
// period等于0是一次性任务,大于0是固定频率任务,小于0是固定延迟任务
return period != 0;
}
/**
* 设置周期性任务下次运行的时间
*/
private void setNextRunTime() {
long p = period;
if (p > 0)
// 固定频率的任务,也就是scheduleAtFixedRate,下次运行时间是当前延迟时间time加上间隔,也就是第一次在initialDelay时执行,第二次在initialDelay+period时执行,第三次在(initialDelay+period)+period时执行。
// 如果任务的运行时间大于period,则前一次运行结束后下一次会马上执行,因为前一次执行结束后,调用setNextRunTime方法设置的运行时间已经比现在小了,getDelay会返回小于等于0的值,也就是任务应该马上出队列了。
time += p;
else
// p小于0表示固定延迟任务,也就是scheduleWithFixedDelay。triggerTime我们之前分析过,是在当前时间now()加上间隔
time = triggerTime(-p);
}
/**
* Returns the trigger time of a delayed action.
*/
private long triggerTime(long delay, TimeUnit unit) {
// 将triggerTime接收的参数变成大于等于0的值
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
* Returns the trigger time of a delayed action.
*/
long triggerTime(long delay) {
// delay小于Long.MAX_VALUE的一半则直接加在now上,否则需要处理下溢出
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
// 和delayedExecute类似,除了delayedExecute可能会执行拒绝策略
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 当前线程池的状态运行执行任务
if (canRunInCurrentRunState(true)) {
// 下面的操作和delayedExecute类似
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
scheduleWithFixedDelay分析
scheduleWithFixedDelay方法和scheduleAtFixedRate方法中的绝大部分代码是相同的,只有unit.toNanos(-delay)这一处不同。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
// 这里的period是-delay,也就是小于0的值
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
在ScheduledFutureTask的run方法中,scheduleWithFixedDelay方法和scheduleAtFixedRate使用的也是相同的run方法。只有setNextRunTime方法会根据period的正负来决定下次运行时间。
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
|