IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 【Java多线程】自己实现一个简单的线程池(三) -> 正文阅读

[Java知识库]【Java多线程】自己实现一个简单的线程池(三)

前言

上次本人是基于java1.8新特性编写了一个简单的线程池,使用了原子变量、阻塞队列、可重入锁等等新特性,内容全部收录在下面两篇文章中

这次本人闲的无聊,想通过原生的java方式实现一个简单的线程池

一.简易版实现流程

在这里插入图片描述

包含功能:

  1. 创建线程池,销毁线程池,添加新任务
  2. 没有任务进入等待,有任务则处理掉
  3. 动态伸缩,扩容
  4. 拒绝策略

因为自己实现的简易版本所以不建议生产中使用,生产中使用java.util.concurrent.ThreadPoolExecutor会更加健壮和优雅

二.实现步骤

以下线程池相关代码均在SimpleThreadPoolExecutor.java中,由于为了便于解读因此以代码块的形式呈现

维护一个内部枚举类,用来标记当前任务线程状态,在Thread中其实也有.

    /**
     * SimpleThreadPoolExecutor内部类-维护一个内部枚举类,用来标记当前任务线程状态,在Thread中其实也有.
     */
    private enum TaskState {
        //空闲、运行、阻塞、死亡
        FREE, RUNNABLE, BLOCKED, TERMINATED;
    }

定义拒绝策略接口,以及默认实现

    /**
     * SimpleThreadPoolExecutor内部接口-拒绝策略接口
     */
    interface DiscardPolicy {
        void discard() throws DiscardException;
    }
    /**
     * SimpleThreadPoolExecutor静态内部类-定义拒绝策略接口,以及默认实现
     */
    static class DiscardException extends RuntimeException {
        private static final long serialVersionUID = 8827362380544575914L;

        DiscardException(String message) {
            super(message);
        }
    }

工作线程具体实现

  1. 继承Thread,重写run方法。
  2. this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty()如果当前线程处于空闲状态且没有任何任务了就将它wait住,让出CPU执行权
  3. 如果有任务就去执行FIFO(先进先出)策略
  4. 定义close方法,关闭线程,当然这里不能暴力关闭,所以这里有需要借助interrupt()方法
    /**
     * SimpleThreadPoolExecutor内部类-工作线程
     */
    public static class WorkerTask extends Thread {
        // 线程编号(自增)
        private static int threadInitNumber;
        // 线程状态
        private TaskState taskState;

        /**
         * 初始化工作线程,绑定对应线程组以及线程名
         */
        WorkerTask() {
            super(THREAD_GROUP, nextThreadName());
        }

        /**
         * 生成线程名,参考Thread.nextThreadNum();
         *
         * @return
         */
        private static synchronized String nextThreadName() {
            return THREAD_NAME_PREFIX + (++threadInitNumber);
        }

        /**
         * 启动工作线程
         */
        @Override
        public void run() {
            //目前任务
            Runnable target;
            //说明该工作线程处于空闲状态
            OUTER:
            while (this.taskState != TaskState.TERMINATED) {
                synchronized (TASK_QUEUE) {
                    //如果当前线程处于空闲状态且没有任何任务了就将它wait住,让出CPU执行权
                    while (this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty()) {
                        try {
                            this.taskState = TaskState.BLOCKED;//此处标记当前工作线程为阻塞状态
                            //没有任务就wait住,让出CPU执行权
                            TASK_QUEUE.wait();
                            //如果被打断说明当前线程执行了 shutdown() 方法  线程状态为 TERMINATED 直接跳到 while 便于退出
                        } catch (InterruptedException e) {
                            break OUTER;
                        }
                    }

                    //遵循FIFO策略,获取任务执行
                    target = TASK_QUEUE.removeFirst();
                }

                if (target != null) {
                    //设置当前工作线程状态为"运行中"
                    this.taskState = TaskState.RUNNABLE;
                    //TODO:启动目标任务
                    target.run();
                    //目标任务执行外设置当前工作线程状态为"空闲"
                    this.taskState = TaskState.FREE;
                }
            }
        }

        void close() {//优雅关闭线程
            this.taskState = TaskState.TERMINATED;
            this.interrupt();
        }
    }

三.完整代码

简易版线程池,主要就是维护了一个任务队列工作线程集,为了动态扩容,自己也继承了Thread去做监听操作,对外提供submit()提交执行任务shutdown()等待所有任务工作完毕,关闭线程池

/**
 * 简易版线程池
 */
public class SimpleThreadPoolExecutor extends Thread {
    private final static int DEFAULT_MIN_THREAD_SIZE = 2;// 默认最小线程数
    private final static int DEFAULT_ACTIVE_THREAD_SIZE = 5;// 活跃线程
    private final static int DEFAULT_MAX_THREAD_SIZE = 10;// 最大线程
    private final static int DEFAULT_WORKER_QUEUE_SIZE = 100;// 最多执行多少任务
    private final static String THREAD_NAME_PREFIX = "MY-THREAD-NAME-";//线程名前缀
    private final static String THREAD_POOL_NAME = "SIMPLE-POOL";//线程组的名称
    private final static ThreadGroup THREAD_GROUP = new ThreadGroup(THREAD_POOL_NAME);//线程组
    private final static List<WorkerTask> WORKER_TASKS = new ArrayList<>();// 线程容器
    // 任务队列容器,也可以用Queue<Runnable> 遵循 FIFO 规则
    private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
    // 拒绝策略
    private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {
        throw new DiscardException("[拒绝执行] - [任务队列溢出...]");
    };


    private int threadPoolSize;// 线程池大小
    private int queueSize;// 最大接收任务
    private DiscardPolicy discardPolicy;   // 拒绝策略
    private volatile boolean destroy = false;// 是否被销毁
    private int minSize;//最小线程
    private int maxSize;//最大线程
    private int activeSize;//活跃线程

    /**
     * 默认线程池
     */
    SimpleThreadPoolExecutor() {
        this(DEFAULT_MIN_THREAD_SIZE, DEFAULT_ACTIVE_THREAD_SIZE, DEFAULT_MAX_THREAD_SIZE, DEFAULT_WORKER_QUEUE_SIZE, DEFAULT_DISCARD_POLICY);
    }


    /**
     * 初始化线程
     *
     * @param minSize       最小线程数
     * @param activeSize    工作线程数
     * @param maxSize       最大线程数
     * @param queueSize     队列大小
     * @param discardPolicy 拒绝策略
     */
    SimpleThreadPoolExecutor(int minSize, int activeSize, int maxSize, int queueSize, DiscardPolicy discardPolicy) {
        this.minSize = minSize;
        this.activeSize = activeSize;
        this.maxSize = maxSize;
        this.queueSize = queueSize;
        this.discardPolicy = discardPolicy;
        //初始化线程池
        initPool();
    }

    /**
     * 初始化操作
     */
    private void initPool() {
        //根据最新线程数初始化工作线程
        for (int i = 0; i < this.minSize; i++) {
            //创建工作线程
            this.createWorkerTask();
        }
        //线程池大小=最小线程数
        this.threadPoolSize = minSize;
        //自己启动自己
        this.start();
    }

    /**
     * 初始化工作线程并加入任务队列中
     */
    private void createWorkerTask() {
        WorkerTask task = new WorkerTask();
        //刚创建出来的线程应该是未使用的
        task.taskState = TaskState.FREE;
        //保存工作线程到任务队列容器
        WORKER_TASKS.add(task);
        //启动工作线程
        task.start();
    }


    /**
     * 提交任务
     *
     * @param runnable
     */
    void submit(Runnable runnable) {
        if (destroy) {
            throw new IllegalStateException("线程池已销毁...");
        }

        synchronized (TASK_QUEUE) {
            if (TASK_QUEUE.size() > queueSize) {//如果当前任务队列超出队列限制,后续任务执行拒绝策略
                discardPolicy.discard();
            }
            // 1.将任务添加到任务队列
            TASK_QUEUE.addLast(runnable);
            // 2.唤醒等待的工作线程去执行任务
            TASK_QUEUE.notifyAll();
        }
    }

    /**
     * 关闭线程池
     *
     * @throws InterruptedException
     */
    void shutdown() throws InterruptedException {
        //获取活跃线程数
        int activeCount = THREAD_GROUP.activeCount();
        while (!TASK_QUEUE.isEmpty() && activeCount > 0) {
            // 如果还有任务,那就休息一会
            Thread.sleep(100);
        }
        int intVal = WORKER_TASKS.size();//如果线程池中没有工作线程,那就不用关了
        while (intVal > 0) {
            for (WorkerTask task : WORKER_TASKS) {
                //当任务队列为空的时候,线程状态才会为 BLOCKED ,所以可以打断掉,相反等任务执行完在关闭
                if (task.taskState == TaskState.BLOCKED) {
                    task.close();
                    intVal--;
                }
                else {
                    Thread.sleep(50);
                }
            }
        }
        this.destroy = true;
        //资源回收
        TASK_QUEUE.clear();
        WORKER_TASKS.clear();
        this.interrupt();
        System.out.println("线程关闭");
    }


    /**
     * 启动线程池
     */
    @Override
    public void run() {
        while (!destroy) {
            try {
                Thread.sleep(5_000L);
                //任务队列个数 大于 工作线程数 && 线程池大小小于活跃线程
                if (TASK_QUEUE.size() > activeSize && threadPoolSize < activeSize) { // 第一次扩容到 activeSize 大小
                    for (int i = threadPoolSize; i < activeSize; i++) {
                        createWorkerTask();
                    }
                    this.threadPoolSize = activeSize;
                    System.out.println("[初次扩充] - [" + toString() + "]");
                }
                //任务队列个数 大于 最大线程数 && 线程池大小小于最大线程数
                else if (TASK_QUEUE.size() > maxSize && threadPoolSize < maxSize) {// 第二次扩容到最大线程
                    System.out.println();
                    for (int i = threadPoolSize; i < maxSize; i++) {
                        createWorkerTask();
                    }
                    this.threadPoolSize = maxSize;
                    System.out.println("[再次扩充] - [" + toString() + "]");
                }
                else {
                    //防止线程在submit的时候,其他线程获取到锁干坏事
                    synchronized (WORKER_TASKS) {
                        int releaseSize = threadPoolSize - activeSize;
                        Iterator<WorkerTask> iterator = WORKER_TASKS.iterator();// List不允许在for中删除集合元素,所以这里需要使用迭代器
                        while (iterator.hasNext()) {
                            if (releaseSize <= 0) {
                                break;
                            }
                            WorkerTask task = iterator.next();
                            //不能回收正在运行的线程,只回收空闲线程
                            if (task.taskState == TaskState.FREE) {
                                task.close();
                                iterator.remove();
                                releaseSize--;
                            }
                        }
                        System.out.println("[资源回收] - [" + toString() + "]");
                    }
                    threadPoolSize = activeSize;
                }
            } catch (InterruptedException e) {
                System.out.println("资源释放");
            }
        }
    }

    @Override
    public String toString() {
        return "SimpleThreadPoolExecutor{" +
                "threadPoolSize=" + threadPoolSize +
                ", taskQueueSize=" + TASK_QUEUE.size() +
                ", minSize=" + minSize +
                ", maxSize=" + maxSize +
                ", activeSize=" + activeSize +
                '}';
    }

    /**
     * 内部类-维护一个内部枚举类,用来标记当前任务线程状态,在Thread中其实也有.
     */
    private enum TaskState {
        //空闲、运行、阻塞、死亡
        FREE, RUNNABLE, BLOCKED, TERMINATED;
    }

    /**
     * 内部接口-拒绝策略接口
     */
    interface DiscardPolicy {
        void discard() throws DiscardException;
    }


    /**
     * 静态内部类-定义拒绝策略接口,以及默认实现
     */
    static class DiscardException extends RuntimeException {
        private static final long serialVersionUID = 8827362380544575914L;

        DiscardException(String message) {
            super(message);
        }
    }

    /**
     * 工作线程
     * <p>
     * 1.继承Thread,重写run方法。
     * 2.this.taskState == TaskState.FREE&& TASK_QUEUE.isEmpty() 如果当前线程处于空闲状态且没有任何任务了就将它wait住,让出CPU执行权
     * 3.如果有任务就去执行FIFO(先进先出)策略
     * 4.定义close方法,关闭线程,当然这里不能暴力关闭,所以这里有需要借助interrupt
     */
    public static class WorkerTask extends Thread {
        // 线程编号(自增)
        private static int threadInitNumber;
        // 线程状态
        private TaskState taskState;

        /**
         * 初始化工作线程,绑定对应线程组以及线程名
         */
        WorkerTask() {
            super(THREAD_GROUP, nextThreadName());
        }

        /**
         * 生成线程名,参考Thread.nextThreadNum();
         *
         * @return
         */
        private static synchronized String nextThreadName() {
            return THREAD_NAME_PREFIX + (++threadInitNumber);
        }

        /**
         * 启动工作线程
         */
        @Override
        public void run() {
            //目前任务
            Runnable target;
            //说明该工作线程处于空闲状态
            OUTER:
            while (this.taskState != TaskState.TERMINATED) {
                synchronized (TASK_QUEUE) {
                    //如果当前线程处于空闲状态且没有任何任务了就将它wait住,让出CPU执行权
                    while (this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty()) {
                        try {
                            this.taskState = TaskState.BLOCKED;//此处标记当前工作线程为阻塞状态
                            //没有任务就wait住,让出CPU执行权
                            TASK_QUEUE.wait();
                            //如果被打断说明当前线程执行了 shutdown() 方法  线程状态为 TERMINATED 直接跳到 while 便于退出
                        } catch (InterruptedException e) {
                            break OUTER;
                        }
                    }

                    //遵循FIFO策略,获取任务执行
                    target = TASK_QUEUE.removeFirst();
                }

                if (target != null) {
                    //设置当前工作线程状态为"运行中"
                    this.taskState = TaskState.RUNNABLE;
                    //TODO:启动目标任务
                    target.run();
                    //目标任务执行外设置当前工作线程状态为"空闲"
                    this.taskState = TaskState.FREE;
                }
            }
        }

        void close() {//优雅关闭线程
            this.taskState = TaskState.TERMINATED;
            this.interrupt();
        }
    }
}

四.测试

public class Client {
    public static void main(String[] args) throws InterruptedException {

        SimpleThreadPoolExecutor executor = new SimpleThreadPoolExecutor();

        IntStream.range(0, 30).forEach(i ->
                executor.submit(() -> {
                    System.out.printf("[线程] - [%s] 开始工作...\n", Thread.currentThread().getName());
                    try {
                        Thread.sleep(2_000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("[线程] - [%s] 工作完毕...\n", Thread.currentThread().getName());
                })
        );

        //executor.shutdown();如果放开注释即会执行完所有任务关闭线程池
    }
}

执行结果

[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[线程] - [MY-THREAD-NAME-1] 工作完毕...
[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 工作完毕...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[线程] - [MY-THREAD-NAME-1] 工作完毕...
[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 工作完毕...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[初次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=44, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-3] 开始工作...
...
[线程] - [MY-THREAD-NAME-6] 开始工作...
[线程] - [MY-THREAD-NAME-7] 开始工作...
[再次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=30, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-10] 开始工作...
...
[线程] - [MY-THREAD-NAME-5] 开始工作...
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=4, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-4] 工作完毕...
...
[线程] - [MY-THREAD-NAME-7] 工作完毕...
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]

从日志中可以看到,初始化线程池有2个工作线程,执行速度较为缓慢,当经过第1次扩容后,会观察到线程池里线程个数增加了,执行任务的速度就越来越快了,本文一共扩容了2次,第一次是扩容到activeSize的大小,第二次是扩容到maxSize,如果任务队列没有可执行的任务时,就会进行线程回收,使工作线程数回到activeSize


一份针对于新手的多线程实践
一份针对于新手的多线程实践–进阶篇
一个线程罢工的诡异事件
线程池不容错过的细节
JAVA并发编程J.U.C学习总结
ReentrantLock 实现原理
Synchronize 关键字原理

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-08-28 08:53:50  更:2021-08-28 08:53:57 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:31:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码