可以直接看java线程池使用最全详解我这下面的是做笔记用的,可能会忽略很多细节
0、ThreadPoolExecutor构造函数中参数的说明
因为不管是java提供现成的四种线程池还是用户自己自定义线程池,都是通过构造ThreadPoolExecutor来生成线程池的
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
看完上面的每一个参数的解释,记一下,后面会用到,当然也可以再回来看
1、java提供的四种线程池如何使用
这里不细说线程池底层实现,只是看如何去初始化,四种线程池的区别
先准备一个获得线程的工厂类,给四种线程池初始化用, 当然也可以不指定,这里主要是为了区分线程池中每一个线程
class threadPoolFactory implements ThreadFactory {
private AtomicInteger threadIdx = new AtomicInteger(0);
private String threadNamePrefix;
public threadPoolFactory(String Prefix) {
threadNamePrefix = Prefix;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
System.out.println("创建了一个新线程:"+threadNamePrefix+":"+threadIdx);
thread.setName(threadNamePrefix + "--" + threadIdx.getAndIncrement());
return thread;
}
}
(1)newCachedThreadPool(无核心线程,空闲线程60s销毁,无上限数量)
核心线程数为0,最大为Integer.MAX_VALUE,线程空闲超过60s则销毁此线程
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool(new threadPoolFactory("cachedThread"));
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName());
}
);
}
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
结果图 这里需要注意的是所有的输出都结束了,任务也没有停止,直到1分钟任务才停止,正好和ThreadPoolExecutor 构造参数的corePoolSize 、keepAliveTime 、unit 对应上,这里没测最大线程数,看构造参数中maximumPoolSize=Integer.MAX_VALUE ,所以实际项目上不建议用这个,可能会出现oom 的情况
(2)newFixedThreadPool(指定核心线程数)
核心线程数和最大线程数有一样,这样就不会创建超过核心线程数的线程,创建后线程不会销毁,
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5, new threadPoolFactory("fixedThreadPool"));
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName());
}
);
}
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
结果图 这里需要注意的是任务不会自己停止,除非你加了 executorService.shutdown(); ,根据ThreadPoolExecutor的参数,核心线程数和最大线程数都是一个,再多的任务都是这这些线程跑。多了就只能等待,
(3)newSingleThreadExecutor(单线程的线程池)
单线程的线程池,并且把不需要的方法屏蔽掉
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor(new threadPoolFactory("singleThreadPool"));
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName());
}
);
}
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
结果图 这里就是单线程的跑,并且不会运行完停止, 又和newFixedThreadPool(1, threadFactory) 有区别,因为有一层new FinalizableDelegatedExecutorService
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
实际上FinalizableDelegatedExecutorService这个类就是对ExecutorService进行了一个包装,防止暴露出不该被暴露的方法,然后加上了finalize方法保证线程池的关闭。
(4) newScheduledThreadPool(核心线程数指定,可以周期性执行)
注意:(1)如果初始化指定了核心线程数大于0,就算当所有核心线程数都繁忙时,线程池也不会额外再创建新的线程 (2)当指定的核心线程数为0时,则只会创建一个线程运行,并且结束后会销毁,ThreadPoolExecutor构造器中的maximumPoolSize=Integer.MAX_VALUE 不会生效 解析说明在下面 **2)**的后半部分
0)newScheduledThreadPool构造方法
先看这个是因为后面的使用newScheduledThreadPool 会有多种场景,所以先简单看一下它的构造方法
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
super也就是下面这个
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
所以最后还是ThreadPoolExecutor 再看下面这个
static class DelegatedScheduledExecutorService
extends DelegatedExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService e;
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return e.schedule(command, delay, unit);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return e.schedule(callable, delay, unit);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return e.scheduleAtFixedRate(command, initialDelay, period, unit);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}
DelegatedScheduledExecutorService 和FinalizableDelegatedExecutorService 是兄弟关系,都是再次封装了DelegatedExecutorService ,并且额外实现了ScheduledExecutorService 接口用于周期性执行任务
1) 延迟一段时间后执行
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5, new threadPoolFactory("scheduledThread"));
executorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ", delay 1s");
}
}, 1, TimeUnit.SECONDS);
}
结果图 因为这里核心线程数是5,所以任务没有停止,如果设置成0,线程执行完成后就会销毁,任务停止
2)延迟两秒后,三秒一个周期执行一次
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5, new threadPoolFactory("scheduledThread"));
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ", every 3s");
}
}, 2, 3, TimeUnit.SECONDS);
}
结果图
这里会有一个问题? 虽然我指定了核心线程数为5,但是根据构造方法得到空闲线程数应该是Integer.MAX_VALUE ,这里为什么不额外创建新线程呢?
看下面的测试用例1(设置2个核心线程,跑10个周期任务)
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2, new threadPoolFactory("scheduledThread"));
for (int i = 0; i < 10; i++) {
executorService.scheduleAtFixedRate(new Runnable() {
@SneakyThrows
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ", every 3s");
Thread.sleep(20000);
}
}, 2, 1, TimeUnit.MILLISECONDS);
}
}
结果图 你会发现线程卡在这里了,相当于两个核心线程休眠了,就算有新的任务也不会创建新的线程去执行新的周期任务,
下面看测试用例2(0个核心线程数,跑10个任务)
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(0, new threadPoolFactory("scheduledThread"));
for (int i = 0; i < 10; i++) {
executorService.scheduleAtFixedRate(new Runnable() {
@SneakyThrows
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ", every 3s");
Thread.sleep(20000);
}
}, 2, 1, TimeUnit.MILLISECONDS);
}
}
结果图 这里线程池只创建了一个线程去跑这个任务,所以设置核心线程数等于0,那相当于允许让线程池创建一个线程,并且还是用完就销毁的线程
(5)四种java提供的线程池比较
构造方法 | corePoolSize | maximumPoolSize | keepAliveTime | workQueue |
---|
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60s | SynchronousQueue | newFixedThreadPool | 用户指定 | 和corePoolSize数值一致 | 0 | LinkedBlockingQueue | newSingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue | newScheduledThreadPool | 用户指定 | Integer.MAX_VALUE(虽然入参是这个,但是如果corePoolSize=0,则这个实际是1;如果corePoolSize>0,则这个实际是0) | 0 | DelayedWorkQueue |
后面研究一下这些队列
3、自定义创建线程池
看完上面的其实也就知道如何自己创建线程池了
ExecutorService executorService =new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new threadPoolFactory("cachedThread"));
使用方法和上面四种线程池是一样的
|