基础
概念
进程、线程、并发、并行:
- 进程:进程是资源分配的最小单位,是进入到内存中的程序
- 线程:线程是CPU调度的最小单位,是进程中的一个执行单元,负责当前进程中程序的执行,一个进程中至少有一个线程(单线程程序),一个进程中是可以有多个线程的,这个应用程序也可以称之为多线程程序。
- 并发:交替执行
- 并行:同时执行
线程的调度:
线程的状态
-
新建(NEW):至今尚未启动的线程处于这种状态。 -
运行(RUNNABLE):正在Java虚拟机中执行的线程处于这种状态。 -
阻塞(BLOCKED):受阻塞并等待某个监视器锁的线程处于这种状态。 -
无限等待(WAITING):无限期地等待另一个线程来执行某一特定操作的线程处于这种状态。 -
睡眠(计时等待)(TIMED_WAITING):等待另一个线程来执行取决于指定等待时间的操作的线程处于这种状态。 -
死亡(TERMINATED):已退出的线程处于这种状态。
Object类中等待与唤醒线程的方法:
java.lang.Object 类:是祖宗类,里边的方法,任意的一个类都可以使用
void wait()
void notify()
void notifyAll()
实现多线程的方式
主要有继承 Thread 类和实现 Runnable 接口两种方式。不过我们实际开发中一般都是通过线程池方式来获取空闲线程。
java.lang.Thread类 implements Runnable接口
Thread类的构造方法:
Thread(Runnable target)
Thread(Runnable target, String name)
实现步骤:
? 1.创建一个实现类,实现Runnable接口
? 2.在实现类中重写Runnable接口中的run方法(设置线程任务)
? 3.创建Runnable接口的实现类对象
? 4.创建Thread类对象,在构造方法中传递Runnable接口的实现类对象
? 5.调用Thread类中的方法start,开启新的线程,执行run方法
实现步骤:
? 1.创建一个类继承Thread类
? 2.在Thread类的子类中,重写Thread类中的run方法(设置线程任务)
? 3.创建Thread类的子类对象
? 4.调用继承自Thread类中的start方法,开启新的线程执行run方法
两种方式的比较
Thread类常用方法
static Thread currentThread()
static void sleep(long millis)
void start()
String getName()
void setName(String name)
锁的相关概念
可重入锁
如果锁具备可重入性,则称作为可重入锁。像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制:基于线程的分配,而不是基于方法调用的分配。举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。
class MyClass {
public synchronized void method1() {
method2();
}
public synchronized void method2() {
}
}
上述代码中的两个方法method1和method2都用synchronized修饰了,假如某一时刻,线程A执行到了method1,此时线程A获取了这个对象的锁,而由于method2也是synchronized方法,假如synchronized不具备可重入性,此时线程A需要重新申请锁。但是这就会造成一个问题,因为线程A已经持有了该对象的锁,而又在申请获取该对象的锁,这样就会线程A一直等待永远不会获取到的锁。
而由于synchronized和Lock都具备可重入性,所以不会发生上述现象。
可中断锁
可中断锁:就是可以响应中断的锁。
在Java中,synchronized就不是可中断锁,而Lock是可中断锁。
如果某一线程A正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程B不想等待了,想先处理其他事情,我们可以让它中断自己或者在别的线程中中断它,这种就是可中断锁。
在前面演示lockInterruptibly()的用法时已经体现了Lock的可中断性。
公平锁
公平锁即尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该所,这种就是公平锁。
非公平锁即无法保证锁的获取是按照请求锁的顺序进行的。这样就可能导致某个或者一些线程永远获取不到锁。
在Java中synchronized就是非公平锁,它无法保证等待的线程获取锁的顺序。
而对于ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁。
在ReentrantLock中定义了2个静态内部类,一个是NotFairSync,一个是FairSync,分别用来实现非公平锁和公平锁。
我们可以在创建ReentrantLock对象时,通过以下方式来设置锁的公平性:
ReentrantLock lock = new ReentrantLock( true );
另外在ReentrantLock类中定义了很多方法,比如:
boolean isFair()
boolean isLocked()
boolean isHeldByCurrentThread()
boolean hasQueuedThreads()
在ReentrantReadWriteLock中也有类似的方法,同样也可以设置为公平锁和非公平锁。不过要记住,ReentrantReadWriteLock并未实现Lock接口,它实现的是ReadWriteLock接口。
读写锁
读写锁将对一个资源(比如文件)的访问分成了2个锁,一个读锁和一个写锁。
正因为有了读写锁,才使得多个线程之间的读操作不会发生冲突。
ReadWriteLock就是读写锁,它是一个接口,ReentrantReadWriteLock实现了这个接口。
可以通过readLock()获取读锁,通过writeLock()获取写锁。
死锁
概念:线程获取不到锁对象,从而进不去同步中执行
前提:
- 必须出现同步代码块嵌套
- 必须有两个线程
- 必须有两个锁对象
原因:
- 出了两个同步代码块的嵌套
- 线程一拿着线程二的锁,线程二拿着线程一的锁
- 两个线程都处于阻塞状态,都不会继续执行
线程安全
高并发的线程安全问题
-
可见性:指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值 拓展:所有的共享变量(成员变量、静态成员变量)都存储于主内存。每一个线程还存在自己的工作内存,线程的工作内存保留了被线程使用的变量的工作副本。线程对变量的所有的操作(读,取)都必须在工作内存中完成,而不能直接读写主内存中的变量,不同线程之间也不能直接访问 -
有序性:即程序执行的顺序按照代码的先后顺序执行 -
原子性:即一个操作或者多个操作要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行
volatile 关键字
volatile关键字的作用:解决变量的可见性、有序性,但不能解决变量的原子性
synchronized 关键字
synchronized是java中的一个关键字,也就是说是Java语言内置的特性。
如果一个代码块被synchronized修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况:
- 获取锁的线程执行完了该代码块,然后线程释放对锁的占有;
- 线程执行发生异常,此时JVM会让线程自动释放锁。
如果这个获取锁的线程由于要等待IO或者其他原因(比如调用sleep方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,会非常影响程序的执行效率。
格式一(方法内同步代码块):
synchronized(锁对象){
访问了共享数据的代码(产生了线程安全问题的代码)
}
格式二(同步方法):
权限修饰符 synchronized 返回值类型 方法名(参数列表){
访问了共享数据的代码(可能产生线程安全问题的代码)
}
并发包
juc:java.util.concurrent
主要内容:
? 一、并发集合类(CopyOnWriteArrayList、CopyOnWriteArraySet、ConcurrentHashMap) 底层是CAS机制 乐观锁
? 二、原子类操作(AtomInteger、AtomicLong、AtomicBoolean) 底层是CAS(比较并替换)机制 乐观锁
? 三、锁操作(Lock、ReadWriteLock)
? 四、线程池(Callable、Future、Executor)
? 五、信号量(CountDownLatch、CyclicBarrier、Semapherre、Exchanger)
Hashtable:Java类库中从1.0版本提供的另一个线程安全的Map。
Hashtable和ConcurrentHashMap有什么区别:
-
Hashtable:采用的synchronized——悲观锁,效率更低。 Hashtable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下Hashtable的效率非常低下。因为其锁定的是整个哈希表,一个操作正在进行时,其他操作也同时锁定;当一个线程正在访问Hashtable的同步方法,其他线程也访问Hashtable的同步方法时,会进入阻塞状态;例如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。 -
ConcurrentHashMap:采用CAS + 局部(synchronized)锁定。CAS机制——乐观锁,效率更高。 局部(synchronized)锁定:只锁定 “桶” 。仅对当前元素锁定,其他 ”桶“ 里的元素不锁定。
原子类
概述:java从JDK1.5开始提供了 java.util.concurrent.atomic 包(简称Atomic包),这个包中的原子操作类提供了一种用法简单,性能高效,线程安全地更新一个变量的方式。
属于乐观锁,只能解决一个变量的原子性,底层是CAS机制,反复比较,只有内存中的值和预期的值一样,才会进行修改,否则就会循环重新获取值。
常用原子类:AtomicInteger、AtomicLong、AtomicBoolean、AtomicIntegerArray
- AtomicInteger原子型Integer,可以实现原子更新操作
public AtomicInteger():
public AtomicInteger(int initialValue)
int get():
int getAndIncrement():
int incrementAndGet():
int addAndGet(int data):
int getAndSet(int value):
- AtomicIntegerArray:可以用原子方式更新其元素的
int 数组:可以保证数组的原子性
AtomicIntegerArray(int length)
AtomicIntegerArray(int[] array)
int addAndGet(int i, int delta)
int get(int i)
Lock 锁
参考:https://www.wangt.cc/2020/11/%E9%94%81%EF%BC%88%E4%BA%94%EF%BC%89-lock%E6%8E%A5%E5%8F%A3%E5%B8%B8%E7%94%A8%E6%96%B9%E6%B3%95/
java.util.concurrent.locks.ReentrantLock类 implements Lock接口
格式:
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
} catch (Exception ex){
} finally {
lock.unlock();
}
if (lock.tryLock()) {
try {
} catch (Exception ex){
} finally {
lock.unlock();
}
} else {
}
常用方法:
void lock();
void unlock();
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void lockInterruptibly() throws InterruptedException;
信号量
多线程协作:CountDownLatch 允许一个或多个线程等待其他线程完成操作
多线程协作:CyclicBarrier 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行
并发数量控制:Semaphore 主要作用是控制线程的并发数量
线程信息交互:Exchanger 进行线程间的数据交换
CountDownLatch
用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。
CountDownLatch是通过一个计数器来实现的,每当一个线程完成了自己的任务后,可以调用countDown()方法让计数器-1,当计数器到达0时,调用CountDownLatch的await()方法的线程阻塞状态解除,继续执行。
CountDownLatch(int count)
void await()
void countDown()
示例:
public class MyThreadAC extends Thread{
private CountDownLatch countDownLatch;
public MyThreadAC(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println("A");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("C");
}
}
public class MyThreadB extends Thread {
private CountDownLatch countDownLatch;
public MyThreadB(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println("B");
countDownLatch.countDown();
}
}
public class Demo01Thread {
public static void main(String[] args) throws InterruptedException {
CountDownLatch cdl = new CountDownLatch(1);
new MyThreadAC(cdl).start();
Thread.sleep(1000);
new MyThreadB(cdl).start();
}
}
CyclicBarrier
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。即设置屏障,一个线程等待其他多个线程全部执行完毕,再执行
使用场景:CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
需求:使用两个线程读取2个文件中的数据,当两个文件中的数据都读取完毕以后,进行数据的汇总操作。
构造方法:
CyclicBarrier(int parties, Runnable barrierAction)
常用方法:
int await()
示例:
public class PersonThread extends Thread {
private CyclicBarrier cyclicBarrier;
public PersonThread(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
int r = (int)(Math.random()*1000);
Thread.sleep(r);
System.out.println(Thread.currentThread().getName()+"线程花了"+r+"毫秒来到了会议的现场!");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class MeetingThread extends Thread {
@Override
public void run() {
System.out.println("人齐了,开始开会了!");
}
}
public class Demo01CyclicBarrier {
public static void main(String[] args) {
CyclicBarrier cb = new CyclicBarrier(5,new MeetingThread());
PersonThread p1 = new PersonThread(cb);
PersonThread p2 = new PersonThread(cb);
PersonThread p3 = new PersonThread(cb);
PersonThread p4 = new PersonThread(cb);
PersonThread p5 = new PersonThread(cb);
p1.start();
p2.start();
p3.start();
p4.start();
p5.start();
}
}
Semaphore
主要作用是控制线程的并发数量,设置同时允许几个线程执行。
构造方法:
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
常用方法:
void acquire()
void release()
示例:同时允许2个线程同时执行
public class ClassRoom {
Semaphore semaphore = new Semaphore(2);
public void intoClassRoom() throws InterruptedException {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"...进入到教室参观!");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+"...离开了教室!");
semaphore.release();
}
}
public class StudentThread extends Thread {
private ClassRoom classRoom;
public StudentThread(ClassRoom classRoom) {
this.classRoom = classRoom;
}
@Override
public void run() {
try {
classRoom.intoClassRoom();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Demo01Semaphore {
public static void main(String[] args) {
ClassRoom cr = new ClassRoom();
for (int i = 0; i < 5; i++) {
new StudentThread(cr).start();
}
}
}
Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。
两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
注意:
使用场景:可以做数据校对工作
需求:比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水。为了避免错误,采用AB岗两人进行录入,录入到两个文件中,系统需要加载这两个文件,
并对两个文件数据进行校对,看看是否录入一致
构造方法:
public Exchanger()
常用方法:
V exchange(V x)
V exchange(V x, long timeout, TimeUnit unit)
示例:
public class ThreadA extends Thread {
private Exchanger<String> exchanger;
public ThreadA(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
System.out.println("线程A开始执行");
System.out.println("线程A给线程B100元,并从线程B得到一张火车票!");
String result = null;
try {
result = exchanger.exchange("100元");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程A得到的东西:"+result);
}
}
public class ThreadB extends Thread{
private Exchanger<String> exchanger;
public ThreadB(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
System.out.println("线程B开始执行");
System.out.println("线程B给线程A一张火车票,并从线程A得到100元!");
String result = null;
try {
result = exchanger.exchange("一张火车票");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程B得到的东西:"+result);
}
}
public class Demo01Exchanger {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new ThreadA(exchanger).start();
new ThreadB(exchanger).start();
}
}
各种不同锁机制及差别
常见的锁机制有synchronized、ReentrantLock、Atomic、Semaphore
synchronized:java关键字,属于悲观锁,通过在方法、类或代码块中加入该关键字给共享资源上锁,只有拿到锁的那个线程才可以访问共享资源,其他线程在拿到锁之前会阻塞;
**Atomic原子类:**属于乐观锁,只能解决一个变量的原子性,底层是CAS机制,反复比较,只有内存中的值和预期的值一样,才会进行修改,否则就会循环重新获取值
Lock(实现类 ReentrantLock):java类,属于悲观锁,在可能出现线程安全问题的代码前后进行加锁和解锁操作,只让一个线程进入到两个方法中间执行;
Semaphore:基本能完成ReentrantLock的所有工作,使用方法也与之类似,但它可以设置同时允许几个线程执行
volatile 关键字和 synchronized 关键字的区别:
- volatile关键字:只能修饰变量,可以解决变量的可见性,有序性,不能解决原子性
- synchronized关键字:不能修饰变量,可以修饰方法,代码块,使用的范围比volatile广,可以解决:可见性,有序性,原子性
synchronized和lock锁的区别:
- Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现;
- synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁;
- Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
- 通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。
- Lock可以提高多个线程进行读操作的效率。
在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。
CAS(乐观锁) 和 Synchronized 都可以保证多线程环境下共享数据的安全性。那么他们两者有什么区别?
-
Synchronized 是从悲观的角度出发:总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁。 共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程。因此Synchronized我们也将其称之为悲观锁。jdk中的ReentrantLock也是一种悲观锁。 -
CAS(乐观锁)是从乐观的角度出发:总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据。
线程池
参考:https://www.cnblogs.com/dolphin0520/p/3932921.html
? https://ifeve.com/java-threadpool/
概述
基础概念
线程池:线程池就是创建大量空闲的线程并存入一个容器中;程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,任务执行完成后,该线程又返回线程池中成为空闲线程,等待执行下一个任务。
根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。
线程池的特点:线程复用;控制最大并发数;管理线程
**线程池的作用:**避免频繁的创建线程和销毁线程,提高程序的效率
1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
ThreadPoolExecutor(线程池)执行顺序
- 当线程数小于核心线程数时,会一直创建线程直到线程数等于核心线程数;
- 当线程数等于核心线程数时,新加入的任务会被放到任务队列等待执行;
- 当任务队列已满,又有新的任务时,会创建线程直到线程数量等于最大线程数;
- 当线程数等于最大线程数,且任务队列已满时,新加入任务会被拒绝。
线程池的主要参数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
-
corePoolSize :核心线程数,也可以理解为最小线程数 ? (1)核心线程会一直存在,即使没有任务执行 ? (2)当线程数小于核心线程数的时候,即使有空闲线程,也会一直创建线程直到达到核心线程数 ? (3)设置 allowCoreThreadTimeout=true(默认 false)时,核心线程会超时关闭 -
maxPoolSize:最大线程数 ? (1)线程池里允许存在的最大线程数量 ? (2)当任务队列已满,且线程数量大于等于核心线程数时,会创建新的线程执行任务 ? (3)线程池里允许存在的最大线程数量。当任务队列已满,且线程数量大于等于核心线程数时,会创建新的线程执行任务 -
keepAliveTime :线程空闲时间 ? (1)当线程空闲时间达到 keepAliveTime 时,线程会退出(关闭),直到线程数等于核心线程数 ? (2)如果设置了 allowCoreThreadTimeout=true,则线程会退出直到线程数等于零 -
(BlockingQueue)workQueue:任务队列,用于传输和保存等待执行任务的阻塞队列。阻塞队列有以下几种选择:
-
LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE -
synchronousQueue:不保存提交的任务,而是将直接新建一个线程来执行新来的任务 -
ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小 -
PriorityBlockingQueue:一个具有优先级的无限阻塞队列 -
threadFactory:线程工厂,用于创建新线程。 threadFactory创建的线程也是采用new Thread()方式,threadFactory创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号) -
(RejectedExecutionHandler)handler:线程饱和策略/任务拒绝处理器,当线程池和队列都满了,再加入线程会执行此策略
- ThreadPoolExecutor.AbortPolicy
简单粗暴,丢弃任务并抛出RejectedExecutionException异常,这也是默认的拒绝策略。
- ThreadPoolExecutor.CallerRunsPolicy
如果线程池未关闭,则会在调用者线程中直接执行新任务,这会导致主线程提交线程性能变慢。
- ThreadPoolExecutor.DiscardPolicy
丢弃任务,但是不抛出异常
- ThreadPoolExecutor.DiscardOldestPolicy
抛弃最老的任务,就是从队列取出最老的任务然后放入新的任务进行执行。
-
unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性: TimeUnit.DAYS;
TimeUnit.HOURS;
TimeUnit.MINUTES;
TimeUnit.SECONDS;
TimeUnit.MILLISECONDS;
TimeUnit.MICROSECONDS;
TimeUnit.NANOSECONDS;
排队有三种通用策略:
**直接提交。**工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
**无界队列。**使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
**有界队列。**当使用有限的 maximumPoolSizes时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
常见线程池
常见线程池(实现ExecutorService的四种连接池,使用工厂类 Executors 的静态方法创建):
固定线程池,核心线程数和最大线程数固定相等,而空闲存活时间为0毫秒,说明此参数也无意义,工作队列为最大为Integer.MAX_VALUE大小的阻塞队列。当执行任务时,如果线程都很忙,就会丢到工作队列等有空闲线程时再执行,队列满就执行默认的拒绝策略。
带缓冲线程池,从构造看核心线程数为0,最大线程数为Integer最大值大小,超过0个的空闲线程在60秒后销毁,SynchronousQueue这是一个直接提交的队列,意味着每个新任务都会有线程来执行,如果线程池有可用线程则执行任务,没有的话就创建一个来执行,线程池中的线程数不确定,一般建议执行速度较快的线程,不然这个最大线程池边界过大容易造成内存溢出。
单线程线程池,核心线程数和最大线程数均为1,空闲线程存活0毫秒同样无意思,意味着每次只执行一个线程,多余的先存储到工作队列,一个一个执行,保证了线程的顺序执行。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。
调度线程池,大小无限制的线程池,支持定时和周期性的执行线程
线程池的合理配置分析
如何配置线程池,可以从以下几个角度来进行分析:
- 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
- 任务的优先级:高,中和低。
- 任务的执行时间:长,中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
(1)任务性质不同的任务可以用不同规模的线程池分开处理
CPU密集型任务配置尽可能少的线程数量,如配置 Ncpu+1 个线程的线程池。
IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。
混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。
可以通过 Runtime.getRuntime().availableProcessors() 方法获得当前设备的CPU个数。
(2)优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
(3)执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
(4)依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
(5)建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。
有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务。
线程池的监控
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用
- taskCount:线程池需要执行的任务数量。
- completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
- largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
- getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
- getActiveCount:获取活动的线程数。
通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:
protected void beforeExecute(Thread t, Runnable r) { }`
线程池接口的架构
一个线程池包括以下四个基本组成部分:
1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。
比较重要的几个类:
类 | 描述 |
---|
ExecutorService | 真正的线程池接口 | ThreadPoolExecutor | ExecutorService的默认实现 | ScheduledExecutorService | 能和Timer/TimerTask类似,解决那些需要任务重复执行的问题 | ScheduledThreadPoolExecutor | 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。 |
常用方法
java.util.concurrent.Executors:是一个创建线程池的工具类(工厂类),专门用来生产线程池,里边的方法都是静态的
常用方法:
static ExecutorService newFixedThreadPool(int nThreads)
void shutdown()
void shutdownNow()
java.util.concurrent.ExecutorService:描述线程池的接口
常用方法:
Future<?> submit(Runnable task)
<T> Future<T> submit(Callable<T> task)
一些比较重要成员变量:
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile int poolSize;
private volatile RejectedExecutionHandler handler;
private volatile ThreadFactory threadFactory;
private int largestPoolSize;
private long completedTaskCount;
java.util.concurrent.Future:表示异步计算的结果。用来接收call方法的返回值
接口中的方法:
V get()
submit和execute区别:
方法名 | 返回值 | 任务接口 | 向外层调用者抛出异常 |
---|
execute | void | Runnable接口 | 无法抛出异常 | submit | Future | Callable接口和Runnable接口 | 能抛出异常,通过Future.get捕获抛出的异常 |
补充:
-
方法execute()中的是Runnable接口的实现,所以只能使用try、catch来捕获Checked Exception,通过实现UncaughtExceptionHande接口处理UncheckedException, 即和普通线程的处理方式完全一致 方法submit()中抛出异常不管提交的是Runnable还是Callable类型的任务,如果不对返回值Future调用get()方法,都会吃掉异常 -
方法execute()提交的未执行的任务可以通过remove(Runnable)方法删除,而submit()提交的任务即使还未执行也不能通过remove(Runnable)方法删除
使用示例
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
for(int i=0;i<15;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在执行task "+taskNum);
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"执行完毕");
}
}
定时器
定时器:可以设置线程在某个时间执行某件事情,或者某个时间开始,每间隔指定的时间反复的做某件事情
java.util.Timer类:是一个描述定时器的类。一种工具,线程用其安排以后在后台线程中执行的任务。可安排任务执行一次,或者定期重复执行。
构造方法:
public Timer()
成员方法:
void schedule(TimerTask task, long delay)
void schedule(TimerTask task, long delay, long period)
void schedule(TimerTask task, Date time)
void schedule(TimerTask task, Date firstTime, long period)
void cancel()
java.util.TimerTask类 implements Runnable接口:由 Timer 安排为一次执行或重复执行的任务。
TimerTask类是一个抽象类,无法直接创建
void run()
|