在Java 1.3中 发布了早期的定时器Timer(已经过时,存在设计问题)
TimerTask
public abstract class TimerTask implements Runnable {
/**
* This object is used to control access to the TimerTask internals.
*/
final Object lock = new Object();
int state = VIRGIN;
static final int VIRGIN = 0;
static final int SCHEDULED = 1;
static final int EXECUTED = 2;
static final int CANCELLED = 3;
}
TaskQueue
class TaskQueue {
private TimerTask[] queue = new TimerTask[128];
}
TimerThread
class TimerThread extends Thread {
boolean newTasksMayBeScheduled = true;
private TaskQueue queue;
TimerThread(TaskQueue queue) {
this.queue = queue;
}
public void run() {
try {
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // Eliminate obsolete references
}
}
}
/**
* The main timer loop. (See class comment.)
*/
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// Wait for queue to become non-empty
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // Queue is empty and will forever remain; die
// Queue nonempty; look at first evt and do the right thing
long currentTime, executionTime;
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
if (taskFired = (executionTime<=currentTime)) {
if (task.period == 0) { // Non-repeating, remove
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}
}
Timer
public class Timer {
//定时器任务队列
private final TaskQueue queue = new TaskQueue();
//定时器线程
private final TimerThread thread = new TimerThread(queue);
}
由此可见Timer使用单线程(TimerThread)轮询任务队列(TimerQueue)的方式执行任务,并且由于在java1.3版本的时候,还没有Lock的实现,线程同步是通过Synchronized关键字实现,因此一旦某个任务失败,那么后续的任务都将受到影响。并且此时的TimerTask都是实现Runnable接口,因此任务执行后也不能返回对应的返回结果
总结:
- Timer 单线程调度
- 所有任务的执行不能返回执行结果
- 一旦某一任务执行失败,后续的任务都会受到影响
- 简单易用
在Java 1.5中 发布了ScheduledExecutorService来取代Timer,优化了诸多缺点
ScheduledExecutorService
/*
* @since 1.5
*/
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
//调度方式1:在上一任务执行开始时延迟(initialDelay)时间再继续执行任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
//调度方式2: 在上一任务执行结束后延迟(initialDelay)时间再继续执行任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {
//使用阻塞队列
private final BlockingQueue<Runnable> workQueue;
//使用重入锁
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
//使用线程工厂创建线程
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler;
//初始化线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
//工作线程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}
}
ScheduledThreadPoolExecutor
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
/** The time the task is enabled to execute in nanoTime units */
private long time;
/**
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* indicates fixed-delay execution. A value of 0 indicates a
* non-repeating task.
*/
private final long period;
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;
}
// 延迟的任务队列,初始化容量为16
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
.......
}
}
}
对于过时且存在设计缺陷的Timer, ScheduledThreadPoolExecutor通过设置线程池来实现多线程的执行,同时将任务放入阻塞队列中,使用ReentrantLock实现线程之间的同步(并且在任务执行异常后并不会对后续任务的执行造成影响),同时自Java1.5后,任务允许加入无返回结果的Runnable实现也允许加入存在返回结果的Callable的实现。
总结:
- ScheduledThreadPoolExecutor多线程调度
- 即允许执行存在返回值的任务(Callable) ,也允许执行无返回结果的任务(Runnable)
- 使用重入锁,任务执行异常不会对其他任务执行造成影响
- 使用线程池的概念,线程的利用率高
|