主要是按照ThreadPoolExecutor的执行流程实现的。 实现功能,
- 线程池中线程数 小于 corePoolSize 则直接创建一个线程执行任务。
- 大于则阻塞到阻塞队列。
- 阻塞队列满了 则创建非核心线程执行任务,此时会先执行阻塞队列放不下的那个任务,之后就是超时获取阻塞队列中的任务。
- 非核心线程的超时自动销毁。
package ThreeYue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Study38 implements Executor {
private final AtomicInteger ctl = new AtomicInteger();
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private BlockingQueue<Runnable> workQueue;
private volatile long keepAliveTime;
public Study38(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue, long keepAliveTime) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = keepAliveTime;
}
@Override
public void execute(Runnable command) {
int c = ctl.get();
if(c < corePoolSize) {
if(!addWorker(command)) {
reject();
}
return;
}
if(workQueue.offer(command)) {
System.out.println("此时阻塞队列中任务数量 " + workQueue.size());
System.out.println("加入阻塞队列成功");
if (c == 0) {
addWorker(null);
}
} else {
System.out.println("加入阻塞队列失败");
if(!addWorker(command)) {
reject();
}
}
}
private boolean addWorker(Runnable command) {
if (ctl.get() > maximumPoolSize){
return false;
}
if (command == null) {
System.out.println("非核心线程创建");
}
Worker worker = new Worker(command);
worker.thread.start();
System.out.println(worker.thread+" 启动了");
ctl.incrementAndGet();
return true;
}
private void reject() {
System.out.println("执行拒绝策略");
}
private final class Worker implements Runnable {
Runnable firstTask;
final Thread thread;
private Worker(Runnable task) {
this.thread = new Thread(this);
this.firstTask = task;
}
@Override
public void run() {
Runnable task = firstTask;
try{
while(task != null || (task = getTask()) != null) {
System.out.println(Thread.currentThread()+"获取了任务");
task.run();
if (ctl.get() > maximumPoolSize) {
break;
}
task = null;
}
} finally {
ctl.decrementAndGet();
}
}
}
private Runnable getTask() {
for ( ; ; ) {
try {
Runnable c = null;
if(ctl.get() > corePoolSize) {
c = workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS);
} else {
c = workQueue.take();
}
return c;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
核心就是
- 一个exeute()方法去控制流程。通过原子类的AtomicInteger保证线程数量正确变化。
- 一个addWorker()方法去创建一个Worker类。这个Worker类封装了Runnable和Thread。
- Worker类既可以当作一个任务,也可以当作一个线程。确实启动的线程执行的是worker自己的run。但是也就是这样,能够加入对阻塞队列获取的判断。并且获取不到任务的线程,超时就会退出for循环。
- getTask()方法 获取阻塞队列中方法,超时获取和阻塞获取。核心线程是阻塞获取,非核心线程是超时获取。
- 可以看出,核心线程和非核心线程都是普通Thread。就是根据对线程池中的线程数的判断,来实现非核心和核心的概念。
这个对比ThreadPoolExecutor,是很简略的。比如说addWorker方法中,使用了ReentrantLock 保证新建的线程正确加入到线程set中,保证不被GC。 再说Worker类,通过AQS,实现了一个不可重入的锁。保证线程池可以优雅的关闭。也就是正在执行任务的线程不被中断。
可以简单跑
package ThreeYue;
import java.util.concurrent.ArrayBlockingQueue;
public class Test {
public static void main(String[] args) {
Study38 study38 = new Study38(2,4,new ArrayBlockingQueue<Runnable>(5),10);
for(int i = 0; i < 10; i++) {
int taskNum = i;
study38.execute(() -> {
try {
System.out.println("任务编号:" + taskNum);
System.out.println("此时线程池中有线程:"+Thread.currentThread());
System.out.println("开始执行任务 " + taskNum);
Thread.sleep(1000);
System.out.println(taskNum + " 任务执行结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
其实线程池还有其它功能,很多。比如可以将Runnable封装为Callable返回结果。待续
|