IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 移动开发 -> java提供的四种线程池和自定义创建线程池 -> 正文阅读

[移动开发]java提供的四种线程池和自定义创建线程池

可以直接看java线程池使用最全详解我这下面的是做笔记用的,可能会忽略很多细节

0、ThreadPoolExecutor构造函数中参数的说明

因为不管是java提供现成的四种线程池还是用户自己自定义线程池,都是通过构造ThreadPoolExecutor来生成线程池的

 /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default rejected execution handler.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * 		  保留在池中的线??程数,即使它们是空闲的,除非设置allowCoreThreadTimeOut参数
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool 池中允许的最大线程数
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * 		  当线程数大于corePoolSize时,这是多余的空闲线程,在终止前等待新任务的最长时间
     * @param unit the time unit for the {@code keepAliveTime} argument
     * 		 keepAliveTime 参数的时间单位
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * 执行前用于保存任务的队列。该队列将仅保存 execute方法提交的 Runnable任务
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * 			执行器创建新线程时使用的工厂
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

看完上面的每一个参数的解释,记一下,后面会用到,当然也可以再回来看

1、java提供的四种线程池如何使用

这里不细说线程池底层实现,只是看如何去初始化,四种线程池的区别

先准备一个获得线程的工厂类,给四种线程池初始化用,
当然也可以不指定,这里主要是为了区分线程池中每一个线程

class threadPoolFactory implements ThreadFactory {
    private AtomicInteger threadIdx = new AtomicInteger(0);
    private String threadNamePrefix;

    public threadPoolFactory(String Prefix) {
        threadNamePrefix = Prefix;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        System.out.println("创建了一个新线程:"+threadNamePrefix+":"+threadIdx);
        thread.setName(threadNamePrefix + "--" + threadIdx.getAndIncrement());
        return thread;
    }
}

(1)newCachedThreadPool(无核心线程,空闲线程60s销毁,无上限数量)

核心线程数为0,最大为Integer.MAX_VALUE,线程空闲超过60s则销毁此线程

 public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool(new threadPoolFactory("cachedThread"));
        for (int i = 0; i < 10; i++) {
            executorService.submit(() -> {
                        System.out.println(Thread.currentThread().getName());
                    }
            );
        }

    }
  /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * 创建一个线程池,根据需要创建新线程,但会在之前构建的线程可用时重用它们,
     * 并在需要时使用提供的 ThreadFactory 来创建新线程
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

结果图
在这里插入图片描述
这里需要注意的是所有的输出都结束了,任务也没有停止,直到1分钟任务才停止,正好和ThreadPoolExecutor构造参数的corePoolSizekeepAliveTimeunit对应上,这里没测最大线程数,看构造参数中maximumPoolSize=Integer.MAX_VALUE,所以实际项目上不建议用这个,可能会出现oom的情况

(2)newFixedThreadPool(指定核心线程数)

核心线程数和最大线程数有一样,这样就不会创建超过核心线程数的线程,创建后线程不会销毁,

 public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5, new threadPoolFactory("fixedThreadPool"));
        for (int i = 0; i < 10; i++) {
            executorService.submit(() -> {
                        System.out.println(Thread.currentThread().getName());
                    }
            );
        }

    }
 /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue, using the provided
     * ThreadFactory to create new threads when needed.  At any point,
     * at most {@code nThreads} threads will be active processing
     * tasks.  If additional tasks are submitted when all threads are
     * active, they will wait in the queue until a thread is
     * available.  If any thread terminates due to a failure during
     * execution prior to shutdown, a new one will take its place if
     * needed to execute subsequent tasks.  The threads in the pool will
     * exist until it is explicitly {@link ExecutorService#shutdown
     * shutdown}.
     *建一个线程池,该线程池重用固定数量的线程操作共享无界队列,使用提供的 ThreadFactory 在需要时创建新线程。
     在任何时候,至多nThreads 线程将主动处理任务。
     如果在所有线程都处于活动状态时提交了额外的任务,它们将在队列中等待,直到有线程可用。
     如果任何线程在关机前的执行过程中因失败而终止,如果需要执行后续任务,则新的线程将取代它。
     池中的线程将存在,直到它明确地执行 ExecutorService接口中的shutdown
     * @param nThreads the number of threads in the pool
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

结果图
在这里插入图片描述
这里需要注意的是任务不会自己停止,除非你加了 executorService.shutdown();,根据ThreadPoolExecutor的参数,核心线程数和最大线程数都是一个,再多的任务都是这这些线程跑。多了就只能等待,

(3)newSingleThreadExecutor(单线程的线程池)

单线程的线程池,并且把不需要的方法屏蔽掉

 public static void main(String[] args) {
        // 创建仅有单个线程的线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor(new threadPoolFactory("singleThreadPool"));
        for (int i = 0; i < 10; i++) {
            executorService.submit(() -> {
                        System.out.println(Thread.currentThread().getName());
                    }
            );
        }


    }

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue, and uses the provided ThreadFactory to
     * create a new thread when needed. Unlike the otherwise
     * equivalent {@code newFixedThreadPool(1, threadFactory)} the
     * returned executor is guaranteed not to be reconfigurable to use
     * additional threads.
     *创建一个 Executor,它使用单个工作线程操作无界队列,
     并使用提供的 ThreadFactory在需要时创建一个新线程。
     与其他等价的newFixedThreadPool(1, threadFactory)不同,
     返回的执行程序保证不可重新配置以使用附加线程
     * @param threadFactory the factory to use when creating new
     * threads
     *
     * @return the newly created single-threaded Executor
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

结果图
在这里插入图片描述
这里就是单线程的跑,并且不会运行完停止,
又和newFixedThreadPool(1, threadFactory) 有区别,因为有一层new FinalizableDelegatedExecutorService

/**
     * A wrapper class that exposes only the ExecutorService methods
     * of an ExecutorService implementation.
     * 仅公开 ExecutorService 实现的 ExecutorService 方法的包装类
     */
    static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List<Runnable> shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future<?> submit(Runnable task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Callable<T> task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }
	//FinalizableDelegatedExecutorService继承了DelegatedExecutorService 
    static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

实际上FinalizableDelegatedExecutorService这个类就是对ExecutorService进行了一个包装,防止暴露出不该被暴露的方法,然后加上了finalize方法保证线程池的关闭。

(4) newScheduledThreadPool(核心线程数指定,可以周期性执行)

注意:(1)如果初始化指定了核心线程数大于0,就算当所有核心线程数都繁忙时,线程池也不会额外再创建新的线程
(2)当指定的核心线程数为0时,则只会创建一个线程运行,并且结束后会销毁,ThreadPoolExecutor构造器中的maximumPoolSize=Integer.MAX_VALUE不会生效
解析说明在下面 **2)**的后半部分

0)newScheduledThreadPool构造方法

先看这个是因为后面的使用newScheduledThreadPool会有多种场景,所以先简单看一下它的构造方法

 /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @param threadFactory the factory to use when the executor
     * creates a new thread
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if threadFactory is null
     */
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

    /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given initial parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if {@code threadFactory} is null
     */
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

super也就是下面这个

 /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default rejected execution handler.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

所以最后还是ThreadPoolExecutor
再看下面这个

 /**
     * A wrapper class that exposes only the ScheduledExecutorService
     * methods of a ScheduledExecutorService implementation.
     */
    static class DelegatedScheduledExecutorService
            extends DelegatedExecutorService
            implements ScheduledExecutorService {
        private final ScheduledExecutorService e;
        DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
            super(executor);
            e = executor;
        }
        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
            return e.schedule(command, delay, unit);
        }
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            return e.schedule(callable, delay, unit);
        }
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
            return e.scheduleAtFixedRate(command, initialDelay, period, unit);
        }
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
            return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
        }
    }

DelegatedScheduledExecutorServiceFinalizableDelegatedExecutorService是兄弟关系,都是再次封装了DelegatedExecutorService,并且额外实现了ScheduledExecutorService 接口用于周期性执行任务

1) 延迟一段时间后执行

 public static void main(String[] args) {
        // 创建指定核心线程数5
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5, new threadPoolFactory("scheduledThread"));

        // 定时执行一次的任务,延迟1s后执行
        executorService.schedule(new Runnable() {
            @Override
            public void run() {

                System.out.println(Thread.currentThread().getName() + ", delay 1s");
            }
        }, 1, TimeUnit.SECONDS);
}        

结果图
在这里插入图片描述
因为这里核心线程数是5,所以任务没有停止,如果设置成0,线程执行完成后就会销毁,任务停止

2)延迟两秒后,三秒一个周期执行一次

 public static void main(String[] args) {
        // 创建指定核心线程数5
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5, new threadPoolFactory("scheduledThread"));
        // 周期性地执行任务,延迟2s后,每3s一次地周期性执行任务
        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + ", every 3s");
            }
        }, 2, 3, TimeUnit.SECONDS);
}        

结果图
在这里插入图片描述

这里会有一个问题?
虽然我指定了核心线程数为5,但是根据构造方法得到空闲线程数应该是Integer.MAX_VALUE,这里为什么不额外创建新线程呢?

看下面的测试用例1(设置2个核心线程,跑10个周期任务)

public static void main(String[] args) {
        // 创建指定核心线程数2
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2, new threadPoolFactory("scheduledThread"));

        // 10个周期性地执行任务,延迟2s后,每1毫秒一次地周期性执行任务
        for (int i = 0; i < 10; i++) {
            executorService.scheduleAtFixedRate(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + ", every 3s");
                    //休眠此线程
                    Thread.sleep(20000);
                }
            }, 2, 1, TimeUnit.MILLISECONDS);
        }
}        

结果图
在这里插入图片描述
你会发现线程卡在这里了,相当于两个核心线程休眠了,就算有新的任务也不会创建新的线程去执行新的周期任务,

下面看测试用例2(0个核心线程数,跑10个任务)

  public static void main(String[] args) {
        // 创建指定核心线程数0
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(0, new threadPoolFactory("scheduledThread"));

        // 周期性地执行任务,延迟2s后,每3s一次地周期性执行任务
        for (int i = 0; i < 10; i++) {
            executorService.scheduleAtFixedRate(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + ", every 3s");
                    Thread.sleep(20000);
                }
            }, 2, 1, TimeUnit.MILLISECONDS);
        }
    }

结果图
在这里插入图片描述
这里线程池只创建了一个线程去跑这个任务,所以设置核心线程数等于0,那相当于允许让线程池创建一个线程,并且还是用完就销毁的线程

(5)四种java提供的线程池比较

构造方法corePoolSizemaximumPoolSizekeepAliveTimeworkQueue
newCachedThreadPool0Integer.MAX_VALUE60sSynchronousQueue
newFixedThreadPool用户指定和corePoolSize数值一致0LinkedBlockingQueue
newSingleThreadExecutor110LinkedBlockingQueue
newScheduledThreadPool用户指定Integer.MAX_VALUE(虽然入参是这个,但是如果corePoolSize=0,则这个实际是1;如果corePoolSize>0,则这个实际是0)0DelayedWorkQueue

后面研究一下这些队列

3、自定义创建线程池

看完上面的其实也就知道如何自己创建线程池了

        // 自定义创建线程池
        ExecutorService executorService =new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new threadPoolFactory("cachedThread"));           

使用方法和上面四种线程池是一样的

  移动开发 最新文章
Vue3装载axios和element-ui
android adb cmd
【xcode】Xcode常用快捷键与技巧
Android开发中的线程池使用
Java 和 Android 的 Base64
Android 测试文字编码格式
微信小程序支付
安卓权限记录
知乎之自动养号
【Android Jetpack】DataStore
上一篇文章      下一篇文章      查看所有文章
加:2022-05-07 11:18:35  更:2022-05-07 11:19:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 0:29:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码