前言
上次本人是基于java1.8新特性编写了一个简单的线程池,使用了原子变量、阻塞队列、可重入锁等等新特性,内容全部收录在下面两篇文章中
这次本人闲的无聊,想通过原生的java方式实现一个简单的线程池
一.简易版实现流程
包含功能:
- 创建线程池,销毁线程池,添加新任务
- 没有任务进入等待,有任务则处理掉
- 动态伸缩,扩容
- 拒绝策略
因为自己实现的简易版本所以不建议生产中使用,生产中使用java.util.concurrent.ThreadPoolExecutor 会更加健壮和优雅
二.实现步骤
以下线程池相关代码均在SimpleThreadPoolExecutor.java 中,由于为了便于解读因此以代码块的形式呈现
维护一个内部枚举类,用来标记当前任务线程状态,在Thread中其实也有.
private enum TaskState {
FREE, RUNNABLE, BLOCKED, TERMINATED;
}
定义拒绝策略接口,以及默认实现
interface DiscardPolicy {
void discard() throws DiscardException;
}
static class DiscardException extends RuntimeException {
private static final long serialVersionUID = 8827362380544575914L;
DiscardException(String message) {
super(message);
}
}
工作线程具体实现
- 继承
Thread ,重写run方法。 this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty() 如果当前线程处于空闲状态且没有任何任务了就将它wait住,让出CPU执行权- 如果有任务就去执行
FIFO(先进先出) 策略 - 定义
close方法 ,关闭线程,当然这里不能暴力关闭,所以这里有需要借助interrupt()方法
public static class WorkerTask extends Thread {
private static int threadInitNumber;
private TaskState taskState;
WorkerTask() {
super(THREAD_GROUP, nextThreadName());
}
private static synchronized String nextThreadName() {
return THREAD_NAME_PREFIX + (++threadInitNumber);
}
@Override
public void run() {
Runnable target;
OUTER:
while (this.taskState != TaskState.TERMINATED) {
synchronized (TASK_QUEUE) {
while (this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty()) {
try {
this.taskState = TaskState.BLOCKED;
TASK_QUEUE.wait();
} catch (InterruptedException e) {
break OUTER;
}
}
target = TASK_QUEUE.removeFirst();
}
if (target != null) {
this.taskState = TaskState.RUNNABLE;
target.run();
this.taskState = TaskState.FREE;
}
}
}
void close() {
this.taskState = TaskState.TERMINATED;
this.interrupt();
}
}
三.完整代码
简易版线程池,主要就是维护了一个任务队列 和工作线程集 ,为了动态扩容,自己也继承了Thread 去做监听操作,对外提供submit()提交执行任务 、shutdown()等待所有任务工作完毕,关闭线程池
public class SimpleThreadPoolExecutor extends Thread {
private final static int DEFAULT_MIN_THREAD_SIZE = 2;
private final static int DEFAULT_ACTIVE_THREAD_SIZE = 5;
private final static int DEFAULT_MAX_THREAD_SIZE = 10;
private final static int DEFAULT_WORKER_QUEUE_SIZE = 100;
private final static String THREAD_NAME_PREFIX = "MY-THREAD-NAME-";
private final static String THREAD_POOL_NAME = "SIMPLE-POOL";
private final static ThreadGroup THREAD_GROUP = new ThreadGroup(THREAD_POOL_NAME);
private final static List<WorkerTask> WORKER_TASKS = new ArrayList<>();
private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {
throw new DiscardException("[拒绝执行] - [任务队列溢出...]");
};
private int threadPoolSize;
private int queueSize;
private DiscardPolicy discardPolicy;
private volatile boolean destroy = false;
private int minSize;
private int maxSize;
private int activeSize;
SimpleThreadPoolExecutor() {
this(DEFAULT_MIN_THREAD_SIZE, DEFAULT_ACTIVE_THREAD_SIZE, DEFAULT_MAX_THREAD_SIZE, DEFAULT_WORKER_QUEUE_SIZE, DEFAULT_DISCARD_POLICY);
}
SimpleThreadPoolExecutor(int minSize, int activeSize, int maxSize, int queueSize, DiscardPolicy discardPolicy) {
this.minSize = minSize;
this.activeSize = activeSize;
this.maxSize = maxSize;
this.queueSize = queueSize;
this.discardPolicy = discardPolicy;
initPool();
}
private void initPool() {
for (int i = 0; i < this.minSize; i++) {
this.createWorkerTask();
}
this.threadPoolSize = minSize;
this.start();
}
private void createWorkerTask() {
WorkerTask task = new WorkerTask();
task.taskState = TaskState.FREE;
WORKER_TASKS.add(task);
task.start();
}
void submit(Runnable runnable) {
if (destroy) {
throw new IllegalStateException("线程池已销毁...");
}
synchronized (TASK_QUEUE) {
if (TASK_QUEUE.size() > queueSize) {
discardPolicy.discard();
}
TASK_QUEUE.addLast(runnable);
TASK_QUEUE.notifyAll();
}
}
void shutdown() throws InterruptedException {
int activeCount = THREAD_GROUP.activeCount();
while (!TASK_QUEUE.isEmpty() && activeCount > 0) {
Thread.sleep(100);
}
int intVal = WORKER_TASKS.size();
while (intVal > 0) {
for (WorkerTask task : WORKER_TASKS) {
if (task.taskState == TaskState.BLOCKED) {
task.close();
intVal--;
}
else {
Thread.sleep(50);
}
}
}
this.destroy = true;
TASK_QUEUE.clear();
WORKER_TASKS.clear();
this.interrupt();
System.out.println("线程关闭");
}
@Override
public void run() {
while (!destroy) {
try {
Thread.sleep(5_000L);
if (TASK_QUEUE.size() > activeSize && threadPoolSize < activeSize) {
for (int i = threadPoolSize; i < activeSize; i++) {
createWorkerTask();
}
this.threadPoolSize = activeSize;
System.out.println("[初次扩充] - [" + toString() + "]");
}
else if (TASK_QUEUE.size() > maxSize && threadPoolSize < maxSize) {
System.out.println();
for (int i = threadPoolSize; i < maxSize; i++) {
createWorkerTask();
}
this.threadPoolSize = maxSize;
System.out.println("[再次扩充] - [" + toString() + "]");
}
else {
synchronized (WORKER_TASKS) {
int releaseSize = threadPoolSize - activeSize;
Iterator<WorkerTask> iterator = WORKER_TASKS.iterator();
while (iterator.hasNext()) {
if (releaseSize <= 0) {
break;
}
WorkerTask task = iterator.next();
if (task.taskState == TaskState.FREE) {
task.close();
iterator.remove();
releaseSize--;
}
}
System.out.println("[资源回收] - [" + toString() + "]");
}
threadPoolSize = activeSize;
}
} catch (InterruptedException e) {
System.out.println("资源释放");
}
}
}
@Override
public String toString() {
return "SimpleThreadPoolExecutor{" +
"threadPoolSize=" + threadPoolSize +
", taskQueueSize=" + TASK_QUEUE.size() +
", minSize=" + minSize +
", maxSize=" + maxSize +
", activeSize=" + activeSize +
'}';
}
private enum TaskState {
FREE, RUNNABLE, BLOCKED, TERMINATED;
}
interface DiscardPolicy {
void discard() throws DiscardException;
}
static class DiscardException extends RuntimeException {
private static final long serialVersionUID = 8827362380544575914L;
DiscardException(String message) {
super(message);
}
}
public static class WorkerTask extends Thread {
private static int threadInitNumber;
private TaskState taskState;
WorkerTask() {
super(THREAD_GROUP, nextThreadName());
}
private static synchronized String nextThreadName() {
return THREAD_NAME_PREFIX + (++threadInitNumber);
}
@Override
public void run() {
Runnable target;
OUTER:
while (this.taskState != TaskState.TERMINATED) {
synchronized (TASK_QUEUE) {
while (this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty()) {
try {
this.taskState = TaskState.BLOCKED;
TASK_QUEUE.wait();
} catch (InterruptedException e) {
break OUTER;
}
}
target = TASK_QUEUE.removeFirst();
}
if (target != null) {
this.taskState = TaskState.RUNNABLE;
target.run();
this.taskState = TaskState.FREE;
}
}
}
void close() {
this.taskState = TaskState.TERMINATED;
this.interrupt();
}
}
}
四.测试
public class Client {
public static void main(String[] args) throws InterruptedException {
SimpleThreadPoolExecutor executor = new SimpleThreadPoolExecutor();
IntStream.range(0, 30).forEach(i ->
executor.submit(() -> {
System.out.printf("[线程] - [%s] 开始工作...\n", Thread.currentThread().getName());
try {
Thread.sleep(2_000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("[线程] - [%s] 工作完毕...\n", Thread.currentThread().getName());
})
);
}
}
执行结果
[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[线程] - [MY-THREAD-NAME-1] 工作完毕...
[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 工作完毕...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[线程] - [MY-THREAD-NAME-1] 工作完毕...
[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 工作完毕...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[初次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=44, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-3] 开始工作...
...
[线程] - [MY-THREAD-NAME-6] 开始工作...
[线程] - [MY-THREAD-NAME-7] 开始工作...
[再次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=30, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-10] 开始工作...
...
[线程] - [MY-THREAD-NAME-5] 开始工作...
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=4, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-4] 工作完毕...
...
[线程] - [MY-THREAD-NAME-7] 工作完毕...
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]
从日志中可以看到,初始化线程池有2个工作线程 ,执行速度较为缓慢,当经过第1次扩容 后,会观察到线程池里线程个数增加了,执行任务的速度就越来越快了,本文一共扩容了2次,第一次是扩容到activeSize的大小,第二次是扩容到maxSize ,如果任务队列没有可执行的任务时,就会进行线程回收,使工作线程数回到activeSize 。
一份针对于新手的多线程实践 一份针对于新手的多线程实践–进阶篇 一个线程罢工的诡异事件 线程池不容错过的细节 JAVA并发编程J.U.C学习总结 ReentrantLock 实现原理 Synchronize 关键字原理
|