引言
CyclicBarrier 就像就组长召唤大家开会一样,首先是预约一个时间,小组成员提前做好准备,待开会组长发言完,小组成员再做出响应;
测试示例
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println(Thread.currentThread().getName() + " 组长: 明天组团去旅游");
});
System.out.println("组长: 大家一会开个小会");
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 准备");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " OK");
} catch (Exception e) {
e.printStackTrace();
}
}, "t" + i).start();
}
}
}
输出结果: CyclicBarrier 可以在构造的时候就预设一个命令,待子线程做完前置处理后执行这个预设命令,最后再执行子线程的后置处理;
CyclicBarrier 源码结构
public class CyclicBarrier {
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final Runnable barrierCommand;
private Generation generation = new Generation();
}
CyclicBarrier 是直接使用 ReentrantLock 和 Condition 来实现,不像CountDownLatch 和 Semaphore 直接搞一个实现了 AbstractQueuedSynchronizer 同步器的 Sync 内部类;
await 等待构造器的预设命令执行
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
遍历链表唤醒每个子线程 ↓
总结
CyclicBarrier 会对非最后一个子线程的其他子线程维护成一个链表的形式,如果是最后一个子线程,则用该线程先去执行构造时预设的命令,再唤醒其他睡眠的线程,最后一个线程不会加入到链表; CyclicBarrier 和 CountDownLatch、Semaphore的不同点是 CyclicBarrier 使用到了Lock锁,CountDownLatch、Semaphore 实现是全程无锁的 相同点是都使用到了 AbstractQueuedSynchronizer 和 LockSupport来阻塞唤醒线程
|