一、概述
1、作用?
允许一组线程互相等待,直到到达某个公共屏障(barrier)点。
- 因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。
2、使用场景?
用于多线程计算数据,最后合并计算结果的场景。
3、常用类方法?
- await():告诉CyclicBarrier,线程已经到达了屏障,计数减一;然后阻塞线程,直到count为0。
- reset():重置CyclicBarrier的未加入到party的数量count和当前代Generation。
4、案例
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
new Thread(() -> {
System.out.println("work thread start + 1");
try {
TimeUnit.SECONDS.sleep(1);
cyclicBarrier.await();
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("main thread OK, work thread go on!");
} catch (Exception e) {
e.printStackTrace();
}
}, "work-thread").start();
cyclicBarrier.await();
System.out.println("main end");
}
}
二、原理
1)CyclicBarrier中一个generation代表了每一代,通过这个实现CyclicBarrier的复用。
- parties变量用来表示参与party的线程数;
- count变量代表了还没到party的线程数;
- 外加个ReentrantLock锁和一个Condition条件变量实现线程的并发和阻塞。
2)在CyclicBarrier类的内部有一个计数器count,每个线程在到达屏障点的时候都会调用await() 方法将自己阻塞排队,并将计数器count减1; 3)当计数器count减为0的时候,所有因调用await() 方法而被阻塞的线程 将被唤醒 。 4)线程的排队 进入party通过ReentrantLock实现 ;进入party后睡眠 等待所有参会者通过锁的条件等待Condition实现 。
三、源码解析
1、成员变量和构造器
- CyclicBarrier内部是通过条件队列
trip 对线程进行阻塞; - 两个int型的变量
parties 和count :
parties 表示每次拦截的线程数,即party的所有参会者 ;count 表示还未拦截的线程数,即还有多少参会者没到party ;它的初始值和parties相同,调用await() 方法减1,减为0时将所有阻塞在条件变量上线程唤醒。
- 静态内部类
Generation ,代表栅栏的当前代 ,就像玩游戏时代表的本局游戏,利用它可以实现循环等待 。 barrierCommand ,表示换代前执行的任务。当前代结束会执行该任务,然后自动开启下一代 。
public class CyclicBarrier {
private static class Generation {
boolean broken = false;
}
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
}
2、await()方法
CyclicBarrier的核心方法是await() ,该方法是线程相互等待的关键,它有两种实现:一种是带等待超时的,一种是不带等待超时,本质上都是调用了同一个方法dowait() ,只是带等待超时的多传了一个时间。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
CyclicBarrier#dowait()方法:
- 首先因为await()是同步的,需要先加互斥锁ReentrantLock;
- 每次进来都将count减1,减完立马进行判断看看count是否等于0:
- 如果等于0,则执行
换代前要执行的任务barrierCommand ,然后唤醒所有阻塞等待的线程 ,接着自动进入 CyclicBarrier的下一代 ;将计数器count 的值重新设为parties。如果barrierCommand运行异常 ,则打破栅栏的当前代 ,唤醒所有阻塞等待的线程。 - count不等于0,这进入for循环:
不是超时等待 ,直接调用Condition.await()阻塞 当前线程。是超时等待 ,就在nanos时间内循环竞争锁;
- 如果当前线程在
await() 获取锁过程中被中断了:
- 在当前代还没结束之前打破栅栏,即游戏在中途被打断,则设置generation的broken状态为true并唤醒所有线程。
- 当前代已经结束,则直接中断当前线程。
线程被唤醒后 进行下面三个判断:
- 如果线程因为
broken generation 操作(即调用breakBarrier()方法)而被唤醒则抛出异常 ; 2.如果线程因为CyclicBarrier正常换代被唤醒 ,则返回计数器count的值 ; - 如果线程因为
超时而被唤醒打破栅栏并抛出TimeOut异常 。 注意 :如果其中有一个线程因为等待超时而退出 ,那么整盘游戏也会结束,其他线程都会被唤醒 。
- 最后解锁。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
3、breakBarrier()方法:
意味着有人搞破坏,游戏中途结束,将所有的等待线程全部唤醒。
- await()方法通过抛出BrokenBarrierException 异常返回;
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
4、nextGeneration()方法:
开启栅栏新的一代。
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
5、reset()方法:
重置一个栅栏 :
- 打破栅栏 中断当前代,await()方法通过抛出BrokenBarrierException 异常返回;
- 开始新的下一代,重置count和generation。
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier();
nextGeneration();
} finally {
lock.unlock();
}
}
若barrierCommand正常完成 ,则不需要手动调用reset() 就可自动进入新的一代 ,因为运行barrierCommand之后调用了nextGeneration()。
四、总结
简单说就是一个ReentrantLock 加上一个Condition条件变量 实现并发控制和多个线程的阻塞等待。 并且采用多线程协作 机制,在多个线程协作过程中,只要有一个线程被中断或者发生异常 ,则整个协作过程取消 。
CyclicBarrier和CountDownLatch相同的是,它们都能让多个线程协调在某一个节点上等待;下面我们看一下他们的区别?
2、CyclicBarrier和CountDownLatch的区别?
CountDownLatch 的计数器只能使用一次;CyclicBarrier 的计数器可以使用reset()方法重置进而循环使用 。CyclicBarrier 是多线程协作 ,当线程达到等待数量或者一个线程出现异常或被中断时自动放行 ;而CountDownLatch 是多线程阻塞后,需要等待外界条件达到某种状态才会被统一唤醒 。所以CyclicBarrier中只需要await() ,CountDownLatch 还需要额外的countDown()唤醒 操作。- CountDownLatch的性能大于CyclicBarrier,因为
CountDownLatch 是自己采用CAS ,利用共享锁 的原理实现;CyclicBarrier 是采用ReentrantLock独占锁 +Condition条件变量 实现; - 应用场景不同,
CyclicBarrie r能处理更为复杂的业务场景 。
|