线程池
什么是线程池
线程池一种线程使用模式,线程池会维护多个线程,等待着分配可并发执行的任务,当有任务需要线程执行时,从线程池中分配线程给该任务而不用主动的创建线程。
线程池的好处
如果在我们平时如果需要用到线程时,我们一般是这样做的:创建线程(T1),使用创建的线程来执行任务(T2),任务执行完成后销毁当前线程(T3),这三个阶段是必须要有的。
而如果使用线程池呢?
线程池会预先创建好一定数量的线程,需要的时候申请使用,在一个任务执行完后也不需要将该线程销毁,很明显的节省了T1和T3这两阶段的时间。
同时我们的线程由线程池来统一进行管理,这样也提高了线程的可管理性。
手写一个自己的线程池
现在我们可以简单的理解为线程池实际上就是存放多个线程的数组,在程序启动是预先实例化一定的线程实例,当有任务需要时分配出去。现在我们先来写一个自己的线程池来理解一下线程池基本的工作过程。
线程池需要些什么?
首先线程池肯定需要一定数量的线程,所以首先需要一个线程数组,当然也可以是一个集合。
线程数组是用来进行存放线程实例的,要使用这些线程就需要有任务提交过来。当任务量过大时,我们是不可能在同一时刻给所有的任务分配一个线程的,所以我们还需要一个用于存放任务的容器。
这里的预先初始化线程实例的数量也需要我们来根据业务确定。
同时线程实例的数量也不能随意的定义,所以我们还需要设置一个最大线程数。
//线程池中允许的最大线程数
private static int MAXTHREDNUM = Integer.MAX_VALUE;
//当用户没有指定时默认的线程数
private int threadNum = 6;
//线程队列,存放线程任务
private List<Runnable> queue;
private WorkerThread[] workerThreads;
线程池工作
线程池的线程一般需要预先进行实例化,这里我们通过构造函数来模拟这个过程。
public MyThreadPool(int threadNum) {
this.threadNum = threadNum;
if(threadNum > MAXTHREDNUM)
threadNum = MAXTHREDNUM;
this.queue = new LinkedList<>();
this.workerThreads = new WorkerThread[threadNum];
init();
}
//初始化线程池中的线程
private void init(){
for(int i=0;i<threadNum;i++){
workerThreads[i] = new WorkerThread();
workerThreads[i].start();
}
}
在线程池准备好了后,我们需要像线程池中提交工作任务,任务统一提交到队列中,当有任务时,自动分发线程。
//提交任务
public void execute(Runnable task){
synchronized (queue){
queue.add(task);
//提交任务后唤醒等待在队列的线程
queue.notifyAll();
}
}
我们的工作线程为了获取任务,需要一直监听任务队列,当队列中有任务时就由一个线程去执行,这里我们用到了前面提到的安全中断。
private class WorkerThread extends Thread {
private volatile boolean on = true;
@Override
public void run() {
Runnable task = null;
//判断是否可以取任务
try {
while(on&&!isInterrupted()){
synchronized (queue){
while (on && !isInterrupted() && queue.isEmpty()) {
//这里如果使用阻塞队列来获取在执行时就不会报错
//报错是因为退出时销毁了所有的线程资源,不影响使用
queue.wait(1000);
}
if (on && !isInterrupted() && !queue.isEmpty()) {
task = queue.remove(0);
}
if(task !=null){
//取到任务后执行
task.run();
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
task = null;//任务结束后手动置空,加速回收
}
public void cancel(){
on = false;
interrupt();
}
}
当然退出时还需要对线程池中的线程等进行销毁。
//销毁线程池
public void shutdown(){
for(int i=0;i<threadNum;i++){
workerThreads[i].cancel();
workerThreads[i] = null;
}
queue.clear();
}
好了,到这里我们的一个简易版的线程池就完成了,功能虽然不多但是线程池运行的基本原理差不多实现了,实际上非常简单,我们来写个程序测试一下:
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
// 创建3个线程的线程池
MyThreadPool t = new MyThreadPool(3);
CountDownLatch countDownLatch = new CountDownLatch(5);
t.execute(new MyTask(countDownLatch, "testA"));
t.execute(new MyTask(countDownLatch, "testB"));
t.execute(new MyTask(countDownLatch, "testC"));
t.execute(new MyTask(countDownLatch, "testD"));
t.execute(new MyTask(countDownLatch, "testE"));
countDownLatch.await();
Thread.sleep(500);
t.shutdown();// 所有线程都执行完成才destory
System.out.println("finished...");
}
// 任务类
static class MyTask implements Runnable {
private CountDownLatch countDownLatch;
private String name;
private Random r = new Random();
public MyTask(CountDownLatch countDownLatch, String name) {
this.countDownLatch = countDownLatch;
this.name = name;
}
public String getName() {
return name;
}
@Override
public void run() {// 执行任务
try {
countDownLatch.countDown();
Thread.sleep(r.nextInt(1000));
System.out.println("任务 " + name + " 完成");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:"
+Thread.currentThread().isInterrupted());
}
}
}
}
result:
任务 testA 完成
任务 testB 完成
任务 testC 完成
任务 testD 完成
任务 testE 完成
finished...
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at com.learn.threadpool.MyThreadPool$WorkerThread.run(MyThreadPool.java:75)
...
从结果可以看到我们提交的任务都被执行了,当所有任务执行完成后,我们强制销毁了所有线程,所以会抛出异常。
|