《Java 并发编程》专栏索引 👉 《Java 并发编程》进程与线程 👉《Java 并发编程》共享模型之管程 👉《Java 并发编程》共享模型之内存 👉《Java 并发编程》共享模型之无锁 👉《Java 并发编程》共享模型之不可变 👉《Java 并发编程》线程池
🚀1. 自定义线程池
- 阻塞队列中维护了由主线程(或者其他线程)所产生的的任务
- 主线程类似于生产者,产生任务并放入阻塞队列中
- 线程池类似于消费者,得到阻塞队列中已有的任务并执行
自定义线程池
class ThreadPool {
private BlockingQueue<Runnable> blockingQueue;
private int coreSize;
private HashSet<Worker> workers = new HashSet<>();
private TimeUnit timeUnit;
private long timeout;
private class Worker extends Thread {
Runnable task;
public Worker(Runnable task) {
System.out.println("初始化任务");
this.task = task;
}
@Override
public void run() {
while (task != null || (task = blockingQueue.take()) != null) {
try {
System.out.println("执行任务");
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("任务执行完毕");
task = null;
}
}
synchronized (workers) {
System.out.println("移除任务");
workers.remove(this);
}
}
}
public ThreadPool(int coreSize, TimeUnit timeUnit, long timeout, int capacity) {
this.coreSize = coreSize;
this.timeUnit = timeUnit;
blockingQueue = new BlockingQueue<>(capacity);
this.timeout = timeout;
}
public void execute(Runnable task) {
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
} else {
System.out.println("线程池中线程已用完,请稍等");
blockingQueue.put(task);
}
}
}
}
自定义阻塞队列
class BlockingQueue<T> {
private Deque<T> blockingQueue;
private int capacity;
private ReentrantLock lock;
private Condition fullQueue;
private Condition emptyQueue;
public BlockingQueue(int capacity) {
blockingQueue = new ArrayDeque<>(capacity);
lock = new ReentrantLock();
fullQueue = lock.newCondition();
emptyQueue = lock.newCondition();
this.capacity = capacity;
}
public T take() {
lock.lock();
try {
while (blockingQueue.isEmpty()) {
try {
emptyQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = blockingQueue.removeFirst();
fullQueue.signalAll();
return task;
} finally {
lock.unlock();
}
}
public T takeNanos(long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (blockingQueue.isEmpty()) {
try {
nanos = emptyQueue.awaitNanos(nanos);
if (nanos < 0) {
return null;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = blockingQueue.removeFirst();
fullQueue.signalAll();
return task;
} finally {
lock.unlock();
}
}
public void put(T task) {
lock.lock();
try {
while (blockingQueue.size() == capacity) {
try {
System.out.println("阻塞队列已满");
fullQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
blockingQueue.add(task);
emptyQueue.signalAll();
} finally {
lock.unlock();
}
}
public int getSize() {
lock.lock();
try {
return blockingQueue.size();
} finally {
lock.unlock();
}
}
}
调用
public class Test {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, TimeUnit.SECONDS, 1, 4);
for (int i = 0; i < 10; i++) {
threadPool.execute(()->{
try {
TimeUnit.SECONDS.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务正在执行!");
});
}
}
}
🚀2. ThreadPoolExecutor
ThreadPoolExecutor 的继承关系图如下图所示
🚁2.1 线程池状态
?ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
状态名称 | 高3位的值 | 描述 |
---|
RUNNING | 111 | 接收新任务,同时处理任务队列中的任务 | SHUTDOWN | 000 | 不接受新任务,但是处理任务队列中的任务 | STOP | 001 | 中断正在执行的任务,同时抛弃阻塞队列中的任务 | TIDYING | 010 | 任务执行完毕,活动线程为0时,即将进入终结阶段 | TERMINATED | 011 | 终结状态 |
?线程池状态和线程池中线程的数量由一个原子整型变量 ctl 保存,可以通过一次 CAS 同时更改两个属性的值。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1
获取线程池状态、线程数量以及合并两个值的操作
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程属性
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
...
}
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
🚁2.2 构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
?参数解释
- corePoolSize:核心线程数
- maxiumPoolSize:最大线程数,maximumPoolSize - corePoolSize = 救急线程数
- keepAliveTime:救急线程空闲时的最大生存时间
- unit:时间单位(针对救急线程)
- workQueue:阻塞队列(存放任务)
有界阻塞队列 ArrayBlockingQueue 无界阻塞队列 LinkedBlockingQueue 最多只有一个同步元素的 SynchronousQueue 优先队列 PriorityBlockingQueue - threadFactory:线程工厂(可以为线程创建时起名字)
- handler:拒绝策略
工作方式
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
- 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入 workQueue 队列排队,直到有空闲的线程
- 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maxiumPoolSize-corePoolSize 数目的线程来救急
- 如果线程达到 maxiumPoolSize 仍然有新任务这时会执行拒绝策略,
- 当高峰过去后,超过 corePoolSize 的救急线程如果一段时间没有任务做,需要结束来节省资源,这个时间由 keepAliveTime 和 unit 来控制
对于拒绝策略 JDK 提供了 4 种实现:
- AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
- CallerRunsPolicy 让调用者执行任务
- DiscardPolicy 放弃本次任务
- DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
一些著名框架,也提供了具体的实现:
- Dubbo 的实现,在抛出 RejectedExecution 异常之前会记录日志,并 dump 线程栈信息,方便定位问题
- Netty 的实现,是创建一个新线程来执行任务
- ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
- PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
使用
public class Test {
static AtomicInteger threadId = new AtomicInteger(0);
public static void main(String[] args) {
ArrayBlockingQueue<Runnable> runnable = new ArrayBlockingQueue<>(10);
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "working_thread"+threadId.getAndIncrement());
return thread;
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 7, 10, TimeUnit.SECONDS, runnable, threadFactory);
for (int i = 0; i < 20; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread());
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
🚁2.3 newFiexedThreadPool
内部调用的构造方法
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
特点:
- 核心线程数=最大线程数(没有救急线程被创建),因此也无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
适用于任务量已知,相对耗时的任务。
使用如下:
public class TestFixedThreadPool {
public static void main(String[] args) {
ThreadFactory factory = new ThreadFactory() {
AtomicInteger atomicInteger = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "myThread_"+atomicInteger.getAndIncrement());
}
};
ExecutorService executorService = Executors.newFixedThreadPool(2,factory);
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
System.out.println("this is fixedThreadPool");
}
};
executorService.execute(runnable);
}
}
🚁2.4 newCachedThreadPool
内部调用的构造方法
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
)
}
特点:
- 核心线程数是 0,最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着,全部都是救急线程(60s 后可以回收),救急线程可以无限创建。
- 阻塞队列采用了 SynchronousQueue,实现特点是,它没有容量,没有线程来取是放不进去的,只有当线程取任务时,才会将任务放入该阻塞队列中。
🚁2.5 newSingleThreadExecutor
内部构造方法
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1,1,0L, TimeUnit.MILLSECONDS,
new LinkedBlockingQueue<Runnable>()));
}
?内部调用了 new ThreadPoolExecutor 的构造方法,传入的 corePoolSize 和 maximumPoolSize 都为1。然后将该对象传给了 FinalizableDelegatedExecutorService。该类修饰了 ThreadPoolExecutor,让外部无法调用 ThreadPoolExecutor 内部的某些方法来修改所创建的线程池的大小。
注意点:
SingleThread 和自己创建一个线程来运行多个任务的区别:
- 当线程正在执行的任务发生错误时,如果是自己创建的线程,该任务和剩余的任务就无法再继续运行下去。而 SingleThread 会创建一个新线程,继续执行任务队列中剩余的任务
SingleThread 和 newFixedThreadPool(1) 的区别:
- newFixedThreadPool(1) 传值为 1,可以将 FixedThreadPool 强转为 ThreadPoolExecutor,然后通过 setCorePoolSize 改变核心线程数,而 SingleThread 无法修改核心线程数
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
threadPool.setCorePoolSize(2);
🚀3. 提交任务
execute() 方法,传入一个 Runnable 对象,执行其中的 run 方法
void execute(Runnable command);
submit() 方法,传入一个 Callable 对象,用 Future 来捕获返回值
使用
Future<String> future = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "hello submit";
}
});
System.out.println(future.get());
🚀4. 关闭线程池
shutdown()
void shutdown();
public void shutdown() {
final ReentrantLock mainlock = this.mainLock;
mainlock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown();
} finally {
mainlock.unlock();
}
tryTerminate();
}
shutdownNow
List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock();
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
其他方法
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
🚀5. 任务调度线程池
?在【任务调度线程池】功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
System.out.println("task 1");
sleep(2);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
System.out.println("task 2");
}
};
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
使用 ScheduleExecutorService 改写
ScheduleExecutorService executor = Executors.newScheduledThreadPool(2);
executor.schedule(()->{
System.out.println("任务1,执行时间"+new Date());
try {
Thread.sleep(2000);
} catch(InterruptedException e) {
}
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(()->{
System.out.println("任务2,执行时间:"+new Date());
}, 1000, TimeUnit.MILLISECONDS);
|