1、线程、协程与线程池
1.1、什么是线程?
线程是CPU调度资源的最小单位,线程模型分为KLT模型与ULT模型,JVM使用的KLT模型,Java线程与OS线程保持1:1的映射关系,也就是说有一个java线程也会在操作系统里有一个对应的线程。 Java线程有以下几种形态: 1、NEW,新建。 2、RUNNABLE,运行。 3、BLOCKED,阻塞。 4、WAITING,等待。 5、TIMED_WAITING,超时等待。 6、TERMINATED,终结。 在Thread类中有一个静态类就封装着状态的枚举,代码如下所示: 看源码的话会有很多注释,这里我删掉了注释
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
Java线程状态切换如下图所示:
1.2、什么是协程
协程 (纤程,用户级线程),目的是为了追求最大力度的发挥硬件性能和提升软件的速度,协程基本原理是:在某个点挂起当前的任务,并且保存栈信息,去执行另一个任务;等完成或达到某个条件时,再还原原来的栈信息并继续执行(整个过程线程不需要上下文切换)。 Java原生不支持协程,在纯java代码里需要使用协程的话需要引入第三方包,如:quasar。 自己理解:其实就相当于ULT模型,不用OS去管理线程,所以线程的操作就不需要切换到内核,也就是不用上下文切换。但是会有一个问题就是,不能发挥cpu的多核性能。 下面给一张KLT和ULT模型的对比图。
1.3、线程池
1.3.1、什么是线程池?
“线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、 调优和监控。 自己理解:因为线程的创建以及状态的切换是一个非常重的操作(涉及到用户态到内核态的切换),有的时候创建一个线程的时间可能需要10ms,但是这个线程执行任务的时间却只有0.1ms,如果不去重用这个线程,将会是一个极大的浪费。
1.3.2、什么时候使用线程池?
1、单个任务处理时间比较短。 2、需要处理的任务数量很大。
1.3.3、线程池优势
1、重用存在的线程,减少线程创建,消亡的开销,提高性能。 2、提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。 3、提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
1.3.4、线程池的参数
corePoolSize:线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。 maximumPoolSize:就是核心线程加上非核心线程的数量。 keepAliveTime:当没有任务可做的时候,线程会阻塞,直至超过了这个时间,就会被销毁掉。 unit:keepAliveTime的单位。 workQueue:用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,详情请看1.3.6。 threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用 Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程 时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。 handler:当线程都有任务且阻塞队列也满了的时候就会执行拒绝策略,详情请看1.3.7。
1.3.5、线程池的大致运行过程
1、线程池刚创建的时候里面是没有线程的,当这时候进来一个任务时,Jvm会先创建一个线程去执行这个任务,如果再进来一个任务,这个任务不会直接抛给刚才那个线程去执行,而是会再创建一个线程,将任务交给那个线程去执行。 2、当核心线程数创建满之后,再进来任务时,任务会先放到阻塞队列,核心线程会自己去拿,直到阻塞队列放满。 3、当阻塞队列也放满的时候,Jvm会再次创建线程去执行任务,这次创建的线程称之为非核心线程,再进来第二个任务也是一样的,创建非核心线程执行任务,直至非核心线程数也创建满。 4、当阻塞队列满,核心线程与非核心线程都满时,再进来任务的话,就会执行拒绝策略。 总结:核心线程= =>阻塞队列= =>非核心线程= =>拒绝策略。 下面是ThreadPoolExecutor类的源码:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
1.3.6、线程池的阻塞队列
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务。 2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene。 3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQuene。 4、priorityBlockingQuene:具有优先级的无界阻塞队列。
1.3.7、线程池的拒绝策略
jdk自带了四种拒绝策略,如下: 1、AbortPolicy 直接抛出异常,继承RuntimeException,一般不建议使用。
2、CallerRunsPolicy 如果当前线程池未关闭,将直接将此任务交给提交该任务的线程去处理。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
3、DiscardOledestPolicy 丢弃队列中最老的任务,再次尝试添加新任务。
4、DiscardPolicy 什么都不干,意思就是默默的丢弃掉该任务。
1.3.8、线程池父类Executor以及线程池状态切换方法
Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。 下面是它的继承与实现,如下图所示:
从图中可以看出Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为: 1、execute:履行Ruannable类型的任务。 2、submit:可用来提交Callable或Runnable任务,并返回代表此任务的 Future 对象。 3、shutdown:在完成已提交的任务后封闭办事,不再接管新任务。 4、shutdownNow:停止所有正在履行的任务并封闭办事。 5、isTerminated:测试是否所有任务都履行完毕了。 6、isShutdown:测试是否该ExecutorService已被关闭。
1.3.9、线程池的状态
1、RUNNING: 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!
2、SHUTDOWN: 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
3、STOP: 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING: 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING 状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在 ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理; 可以通过重载terminated()函数来实现。 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
5、TERMINATED: 状态说明:线程池彻底终止,就变成TERMINATED状态。 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING - > TERMINATED。 进入TERMINATED的条件如下: 1、线程池不是RUNNING状态。 2、线程池状态不是TIDYING状态或TERMINATED状态。 3、如果线程池状态是SHUTDOWN并且workerQueue为空。 4、workerCount为0。 5、设置TIDYING状态成功。 如下图所示:
2、线程池重点方法详解
2.1、addWorker
该方法的主要作用是创建线程并给它一个初始任务,比如在execute()方法里调用。 1、判断线程池的状态,如果不是running状态的话就不允许做后面的操作。 2、创建线程并加锁。 3、放入队列,前面加锁的原因就是能保证一定放到队列。 4、解锁。 5、启动线程。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2.2、Worker构造方法
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
下面这个就是Worker自己的run方法
public void run() {
runWorker(this);
}
2.3、runWorker
这个方法主要就是线程执行任务
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
2.4、getTask
这个方法的主要作用是从阻塞队列中拿出一个任务并执行 这块说的剔除线程是指 1、工作的线程大于核心线程数,所以就存在非核心线程,当没有任务的时候,非核心线程是要被剔除掉的。 2、allowCoreThreadTimeOut 是配置的允许核心线程被剔除的时间,当没有任务时,如果核心线程空闲时间超过了它的设置,也将会被剔除。
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
2.5、processWorkerExit
主要作用是当线程出现问题之后,将会重新创建一个线程去顶替他,但是任务会丢失,在runWorker中调用。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
|