一、倒计数器:CountDownLatch
CountDownLatch类位于java.util.concurrent包下,是一个同步工具类,允许一个或多个线程一直等待其他线程的操作执行完后再执行相关操作。
CountDownLatch基于线程计数器来实现并发访问控制,主要用于主线程等待其他子线程都执行完毕后执行相关操作
对于倒计数器,一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要对各项设备、仪器进行检查。只有等所有检查都完成后,引擎才能点火。这种场景就非常适合使用CountDownLatch。它可以使点火线程等待所有检查线程全部完工后再执行。CountDownLatch 的构造函数接收一个整数作为参数,即当前这个计数器的计数个数。
其使用过程为:在主线程中定义CountDownLatch,并将线程计数器的初始值设置为子线程的个数,多个子线程并发执行,每个子线程在执行完毕后都会调用countDown函数将计数器的值减1,直到线程计数器为0,表示所有的子线程任务都已执行完毕,此时在CountDownLatch上等待的主线程将被唤醒并继续执行
我们利用CountDownLatch可以实现类似计数器的功能。比如有一个主任务,它要等待其他两个任务都执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能。具体实现如下
public class CountDownLatchDemo implements Runnable {
static final CountDownLatch end = new CountDownLatch(2);
static final CountDownLatchDemo demo = new CountDownLatchDemo();
@Override
public void run() {
try {
Thread.sleep(new Random().nextInt(10) * 1000);
System.out.println("check complete!");
end.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; i++) {
exec.submit(demo);
}
end.await();
System.out.println("Fire!");
exec.shutdown();
}
}
以上代码片段先定义了一个大小为2的CountDownLatch,然后定义了两个子线程并启动该子线程,子线程执行完业务代码后在执行latch.countDown() 时减少一个信号量,表示自己已经执行完成。
主线程调用end.await(); 阻塞等待,在所有线程都执行完成并调用了countDown函数时,表示所有线程均执行完成,这时程序会主动唤醒主线程并开始执行主线程的业务逻辑
二、循环栅栏:CyclicBarrier
CyclicBarrier(循环屏障)是一个同步工具,可以实现让一组线程等待至某个状态之后再全部同时执行。在所有等待线程都被释放之后,CyclicBarrier可以被重用
CyclicBarrier的运行状态叫作Barrier状态,在调用await方法后,线程就处于Barrier状态。
CyclicBarrier可以理解为循环栅栏。栅栏就是一种障碍物,比如,通常在私人宅邸的周围就可以围上一圈栅栏,阻止闲杂人等入内。这里当然就是用来阻止线程继续执行,要求线程在栅栏外等待。
前面Cyclic意为循环,也就是说这个计数器可以反复使用。比如,我们将计数器设置为10,那么凑齐第一批10个线程后,计数器就会归零,接着凑齐下一批10个线程,这就是循环栅栏内在的含义
CyclicBarrier 的使用场景也很丰富。比如,司令下达命令,要求10个士兵一起去完成一项任务。这时就会要求10个士兵先集合报到,接着,一起雄赳赳,气昂昂地去执行任务。当10个士兵把自己手上的任务都执行完了,那么司令才能对外宣布,任务完成!
CyclicBarrier中最重要的方法是await方法,它有两种实现。
- public int await():挂起当前线程直到所有线程都为Barrier状态再同时执行后续的任务。
- public int await(long timeout, TimeUnit unit):设置一个超时时间,在超时时间过后,如果还有线程未达到Barrier状态,则不再等待,让达到Barrier状态的线程继续执行后续的任务。
不再等待,让达到Barrier状态的线程继续执行后续的任务。
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static class Soldier implements Runnable {
private String soldier;
private final CyclicBarrier cyclic;
Soldier(CyclicBarrier cyclic, String soldier) {
this.cyclic = cyclic;
this.soldier = soldier;
}
@Override
public void run() {
try {
cyclic.await();
doWork();
cyclic.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
public void doWork() {
try {
Thread.sleep(Math.abs(new Random().nextInt() % 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier + "任务完成");
}
}
public static class BarrierRun implements Runnable {
boolean flag;
int N;
BarrierRun(boolean flag, int N) {
this.flag = flag;
this.N = N;
}
@Override
public void run() {
if (flag) {
System.out.println("司令:【士兵" + N + "个,任务完成!】");
} else {
System.out.println("司令:【士兵" + N + "个,集合完毕!】");
flag = true;
}
}
}
public static void main(String[] args) {
final int N = 5;
Thread[] allSoldiers = new Thread[N];
boolean flag = false;
CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
System.out.println("集合队伍");
for (int i = 0; i < N; i++) {
System.out.println("士兵" + i + "报道");
allSoldiers[i] = new Thread(new Soldier(cyclic, "士兵" + i));
allSoldiers[i].start();
}
}
}
每一个士兵线程都会执行第13行定义的run()方法。
在第16行,每一个士兵线程都会等待,直到所有的士兵都集合完毕。 集合完毕意味着CyclicBarrier的一次计数完成,当再一次调用CyclicBarrier.await()方法时,会进行下一次计数。
第30行模拟了士兵的任务。当一个士兵任务执行完,他就会要求CyclicBarrier开始下一次计数,这次计数主要目的是监控是否所有的士兵都已经完成了任务。一旦任务全部完成,第40行定义的BarrierRun就会被调用,打印相关信息。
CyclicBarrier.await() 方法可能会抛出两个异常。
一个是InterruptedException ,也就是在等待过程中,线程被中断,应该说这是一个非常通用的异常。大部分迫使线程等待的方法都可能会抛出这个异常,使得线程在等待时依然可以响应外部紧急事件。
另外一个异常则是CyclicBarrier特有的BrokenBarrierException 。一旦遇到这个异常,则表示当前的CyclicBarrier已经破损了,可能系统已经没有办法等待所有线程到齐了。如果继续等待,可能就是徒劳无功的,因此,还是“打道回府”吧!上述代码第23~27行处理了这两种异常
三、信号量:Semaphore
Semaphore指信号量,用于控制同时访问某些资源的线程个数,具体做法为通过调用acquire() 获取一个许可,如果没有许可,则等待,在许可使用完毕后通过release() 释放该许可,以便其他线程使用。
Semaphore常被用于多个线程需要共享有限资源的情况,比如办公室有5打印机,但是有20个员工需要使用,一台打印机同时只能被一个员工使用,其他员工排队等候,且只有该打印机被使用完毕并释放后其他员工方可使用,这时就可以通过Semaphore来实现:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemapDemo implements Runnable {
final Semaphore semp = new Semaphore(5);
@Override
public void run() {
try {
semp.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + " done!");
semp.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String []args){
ExecutorService exec = Executors.newFixedThreadPool(20);
final SemapDemo demo = new SemapDemo();
for(int i=0;i<20;i++){
exec.submit(demo);
}
}
}
同时开启20个线程。观察这段程序的输出,你就会发现系统以5个线程一组为单位,依次输出带有线程ID的提示文本。
在Semaphore类中有以下几个比较重要的方法:
- public void acquire():以阻塞的方式获取一个许可,在有可用许可时返回该许可,在没有可用许可时阻塞等待,直到获得许可
- public void acquire(int permits):同时获取permits个许可
- public void release():释放某个许可。
- public void release(int permits):释放permits个许可。
- public boolean tryAcquire():以非阻塞方式获取一个许可,在有可用许可时获取该许可并返回true,否则返回false,不会等待。
- public boolean tryAcquire(long timeout, TimeUnit unit):如果在指定的时间内获取到可用许可,则返回true,否则返回false。
- public boolean tryAcquire(int permits):如果成功获取permits个许可,则返回true,否则立即返回false。
- public boolean tryAcquire(int permits, long timeout,TimeUnit unit):如果在指定的时间内成功获取permits个许可,则返回true,否则返回false。
- availablePermits():查询可用的许可数量。
CountDownLatch、CyclicBarrier、Semaphore的区别如下:
- CountDownLatch和CyclicBarrier都用于实现多线程之间的相互等待,但二者的关注点不同。CountDownLatch主要用于主线程等待其他子线程任务均执行完毕后再执行接下来的业务逻辑单元,而CyclicBarrier主要用于一组线程互相等待大家都达到某个状态后,再同时执行接下来的业务逻辑单元。此外,CountDownLatch是不可以重用的,而CyclicBarrier是可以重用的。
- Semaphore和Java中的锁功能类似,主要用于控制资源的并发访问。
四、volatile 关键字的作用
详细内容的学习可参考:volatile 底层关键字的作用
Java除了使用了synchronized保证变量的同步,还使用了稍弱的同步机制,即volatile变量。volatile也用于确保将变量的更新操作通知到其他线程。
volatile变量具备两种特性: 一种是保证该变量对所有线程可见,在一个线程修改了变量的值后,新的值对于其他线程是可以立即获取的;
一种是volatile禁止指令重排,即volatile变量不会被缓存在寄存器中或者对其他处理器不可见的地方,因此在读取volatile类型的变量时总会返回最新写入的值。
通过反编译会发现有volatile修饰的共享变量进行写操作的时候会多出lock前缀的指令,这个操作相当于一个内存屏障,使得重排序时不能把后面的指令重排序到内存屏障之前的位置。
因为在访问volatile变量时不会执行加锁操作,也就不会执行线程阻塞,因此volatile变量是一种比synchronized关键字更轻量级的同步机制。volatile主要适用于一个变量被多个线程共享,多个线程均可针对这个变量执行赋值或者读取的操作
在有多个线程对普通变量进行读写时,每个线程都首先需要将数据从内存中复制变量到CPU缓存中,如果计算机有多个CPU,则线程可能都在不同的CPU中被处理,这意味着每个线程都需要将同一个数据复制到不同的CPU Cache中,这样在每个线程都针对同一个变量的数据做了不同的处理后就可能存在数据不一致的情况。
这里的内存可以理解为主内存,CPU缓存可以理解为工作内存 Java 内存模型规定了所有的变量都存储在主内存中,每条线程都有自己的工作内存,线程的工作内存中保存了被该线程使用到的变量的主内存副本拷贝,线程对变量 的所有操作都必须在工作内存中进行,在修改后的某一个时刻(线程退出之前),自动把线程变量副本的值写回到主内存中。
如果将变量声明为volatile, JVM就能保证每次读取变量时都直接从内存中读取,跳过CPU Cache这一步,有效解决了多线程数据同步的问题。 原理:有volatile修饰的共享变量进行写操作的时候会多出Lock前缀的指令,该指令在多核处理器下会引发两件事情。
将当前处理器缓存行数据刷写到系统主内存。 这个刷写回主内存的操作会使其他CPU缓存的该共享变量内存地址的数据无效。
需要说明的是,volatile关键字可以严格保障变量的单次读、写操作的原子性,但并不能保证像i++这种操作的原子性,因为i++在本质上是读、写两次操作。
比如i++的操作,一个线程当getstatic指令把 i 的值读取到操作栈顶时,volatile 保证了此时的关键字是正确的,但是在执行加操作时,其他的线程可能已经把 i 的值加大了,导致现在操作的这个值可能就过期了。
volatile在某些场景下可以代替synchronized,但是volatile不能完全取代synchronized的位置,只有在一些特殊场景下才适合使用volatile。比如,必须同时满足下面两个条件才能保证并发环境的线程安全。
- 对变量的写操作不依赖于当前值(比如i++),或者说是单纯的变量赋值(boolean flag = true)。
- 该变量没有被包含在具有其他变量的不变式中,也就是说在不同的volatile变量之间不能互相依赖,只有在状态真正独立于程序内的其他内容时才能使用volatile。
volatile关键字的使用方法比较简单,直接在定义变量时加上volatile关键字即可:
volatile boolean flag = false;
|