一:大概内容
QuartzSchedulerThread
二:线程的创建和启动
这里主要回顾一下QuartzSchedulerThread是在什么时候创建的,又是在什么时候start的。
大概流程如下图
- StdSchedulerFactory.instantiate():生产StdScheduler过程中会new一个QuartzScheduler实例
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
- 在QuartzScheduler的构造器方法里面可以看到创建QuartzSchedulerThread的代码逻辑,并通过QuartzSchedulerResources对象获取ThreadExecutor对象,最后execute新建的QuartzSchedulerThread。
// QuartzSchedulerThread创建和启动的逻辑
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
- DefaultThreadExecutor是ThreadExecutor接口的唯一实现类,传入指定的Thread对象,便启动该线程。到这里,QuartzSchedulerThread启动了。
public class DefaultThreadExecutor implements ThreadExecutor {
public void initialize() {
}
public void execute(Thread thread) {
thread.start();
}
}
三:线程run()方法逻辑
上一篇章里面有解析到scheduler.start()方法,最终调用了QuartzSchedulerThread.togglePause()方法,发出了唤醒线程的信号。
线程的协作通过Object sigLock来实现,关于sigLock.wait()方法都在QuartzSchedulerThread的run方法里面,所以sigLock唤醒的是只有线程QuartzSchedulerThread。
唤醒线程QuartzSchedulerThread,执行的就是run方法,run()方法的大致流程:
?接下来到具体的代码看看具体的逻辑。
public void run() {
int acquiresFailed = 0;
// 只有调用了halt()方法,才会退出这个死循环
while (!halted.get()) {
try {
// Part A:如果是暂停状态,那么循环超时等待1000毫秒
// wait a bit, if reading from job store is consistently failing (e.g. DB is down or restarting)..
// blockForAvailableThreads的语义:阻塞直到有空闲的线程可用,然后返回其数量
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) {
// Part B:获取acquire状态的Trigger列表
if (triggers != null && !triggers.isEmpty()) {
// Part C:获取List第一个Trigger的下次触发时刻
// Part D:设置Triggers为'executing'状态
// Part E:执行Job
}
} else {
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue;
}
// Part F:
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
}
qs = null;
qsRsrcs = null;
}
四:创建JobRunShell
JobRunShell实例在initialize()方法就会把包含业务逻辑类的JobDetailImpl设置为它的成员属性,为后面执行业务逻辑代码做准备。 执行业务逻辑代码在runInThread(shell)方法里面。
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
五:ThreadPool.runInThread()
ThreadPool的具体实例是SimpleThreadPool,这个类维护了3个列表:
// 保存所有WorkerThread的集合
private List<WorkerThread> workers;
// 空闲的WorkerThread集合
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
// 忙碌的WorkerThread集合
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
/**
* 维护workers、availWorkers和busyWorkers三个列表数据
* 有任务需要一个线程出来执行:availWorkers.removeFirst();busyWorkers.add()
* 然后调用WorkThread.run(runnable)方法
*/
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (nextRunnableLock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}
return true;
}
六:WorkerThread.run(runnable)
WorkerThread是在SimpleThreadPool的内部类。 WorkerThread.run(runnable)主要是赋值并唤醒lock对象的等待线程队列。
public void run(Runnable newRunnable) {
synchronized(lock) {
if(runnable != null) {
throw new IllegalStateException("Already running a Runnable!");
}
runnable = newRunnable;
lock.notifyAll();
}
}
七:WorkerThread.run()
上面方法执行lock.notifyAll()后,对应的WorkerThread就会来到run()方法了。 在这里,我们终于来到了执行业务的execute()方法的倒数第二步。 runnable对象是一个JobRunShell对象,下面在看JobRunShell.run()方法。
public void run() {
boolean ran = false;
while (run.get()) {
try {
synchronized(lock) {
while (runnable == null && run.get()) {
lock.wait(500);
}
if (runnable != null) {
ran = true;
// todo: 这里不需要创建一个Thread对象来执行业务逻辑,使用Thread另起一个执行流,不方便知道执行的结果
// 既然不需要启动一个执行流,为什么需要一个Runnable对象?
runnable.run();
}
}
} catch (InterruptedException unblock) {
// do nothing (loop will terminate if shutdown() was called
try {
getLog().error("Worker thread was interrupt()'ed.", unblock);
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
} catch (Throwable exceptionInRunnable) {
try {
getLog().error("Error while executing the Runnable: ",
exceptionInRunnable);
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
} finally {
synchronized(lock) {
runnable = null;
}
// repair the thread in case the runnable mucked it up...
if(getPriority() != tp.getThreadPriority()) {
setPriority(tp.getThreadPriority());
}
if (runOnce) {
run.set(false);
clearFromBusyWorkersList(this);
} else if(ran) {
ran = false;
makeAvailable(this);
}
}
}
//if (log.isDebugEnabled())
try {
getLog().debug("WorkerThread is shut down.");
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
}
八:JobRunShell.run()
这个方法里面有很多通知Listener的代码逻辑,这不是我们目前的关注重点,下面的代码省略这些代码。 经过前面这么多跋山涉水,我们终于看到了调用业务的execute()方法的代码逻辑,就是那一行"job.execute(jec)"。
public void run() {
qs.addInternalSchedulerListener(this);
try {
OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
JobDetail jobDetail = jec.getJobDetail();
do {
// 其他代码
long startTime = System.currentTimeMillis();
long endTime = startTime;
// execute the job
try {
log.debug("Calling execute on job " + jobDetail.getKey());
job.execute(jec);
endTime = System.currentTimeMillis();
} catch (JobExecutionException jee) {
endTime = System.currentTimeMillis();
jobExEx = jee;
getLog().info("Job " + jobDetail.getKey() +
" threw a JobExecutionException: ", jobExEx);
} catch (Throwable e) {
endTime = System.currentTimeMillis();
getLog().error("Job " + jobDetail.getKey() +
" threw an unhandled Exception: ", e);
SchedulerException se = new SchedulerException(
"Job threw an unhandled exception.", e);
qs.notifySchedulerListenersError("Job ("
+ jec.getJobDetail().getKey()
+ " threw an exception.", se);
jobExEx = new JobExecutionException(se, false);
}
jec.setJobRunTime(endTime - startTime);
// 其他代码
} while (true);
} finally {
qs.removeInternalSchedulerListener(this);
}
}
九:WorkerThread是什么时候被初始化的
回过头来看下WorkerThread的初始化代码逻辑。
- StdSchedulerFactory.instantiate()创建了ThreadPool tp
- tp.initialize()里面有初始化WorkerThread的逻辑
// create the worker threads and start them
Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
while(workerThreads.hasNext()) {
WorkerThread wt = workerThreads.next();
wt.start();
availWorkers.add(wt);
}
|