CyclicBarrier详解
CyclicBarrier
字面意思回环栅栏(循环屏障),通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用 CyclicBarrier的使用 重要方法
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, Bro kenBarrierException, TimeoutException
public void reset()
案例
@Slf4j
public class CyclicBarrierLOL {
private static final CyclicBarrier cbr = new CyclicBarrier(6);
private static int aa ;
private void ChooseHero(String route) {
new Thread(()->{
log.info(route + "选择好了英雄,等待进入游戏");
try {
cbr.await();
} catch (InterruptedException e) {
e.printStackTrace();
log.info(route+"已被中断,无法进行游戏");
} catch (BrokenBarrierException e) {
e.printStackTrace();
log.info(route+"等待超时,无法进行游戏");
}
}).start();
}
public static void main(String[] args) throws Exception {
CyclicBarrierLOL lol = new CyclicBarrierLOL();
String[] rotes = {"上单","打野","中单","ADC","辅助"};
while (true){
log.info("进入英雄选择界面,等待英雄选择");
for (int i = 0; i < 5; i++) {
String rote = rotes[i];
lol.ChooseHero(rote);
}
cbr.await();
log.info("英雄选择完成,开始进入游戏");
aa++;
if (aa == 2) return ;
}
}
}
源码分析
CyclicBarrier的源码分析需要有ReentrantLock源码的基础。 CyclicBarrier的构造方法,第一个参数:资源数 第二个参数:执行任务 当资源数为0的时候调用这个任务 这个资源数可以理解为Semaphore和CountDownLatch的资源数,只不过Semaphore中持有资源数的线程才能通过,CountDownLatch中资源数不为0的时候线程阻塞。 在CyclicBarrier中定义的parties和count都等于资源数。其中this.parties用于线程重置,this.count 用于计数器控制线程是否阻塞 阻塞部分 当调用await()方法的时候进入阻塞路程
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
阻塞流程一个重点就是阻塞进入队列对应着就是trip.await(); 在看这部分源码之前先来看下AQS对于条件阻塞队列的实现 条件阻塞队列是一个单向链表,如何创造这个链表就在addConditionWaiter() 方法当中
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
总结就是addConditionWaiter(); 方法如果没有链表创建链表完成入队,如果已经创建了链表就直接入队 在回过头来看trip.await();
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
来看下解锁方法fullyRelease(node)
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
总结下来就是 trip.await(); 作用就是创建链表入队,释放锁 阻塞入队线程。
dowait(boolean timed, long nanos) 中当CyclicBarrier的资源数不为0的时候,进行上述操作阻塞当前线程然后进入条件等待队列,然后释放锁,进行下一个线程的操作。 当资源数等于0的时候就执行了这部分代码,重点是nextGeneration(); 这个是比较精髓的点
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
doSignalAll 方法就是单向链表出队,进入同步等待队列 再来看进入同步队列的方法
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
当nextGeneration() 执行完成后,return 0 最后还会执行dowait(boolean timed, long nanos) finally中的lock.unlock();方法。这个方法就是唤醒同步等待队列的方法。这部分逻辑就是ReentrantLock的解锁逻辑。
CyclicBarrier源码流程总结
CyclicBarrier cbr = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("begin run");
}
});
一上面构造出的CyclicBarrier为例子,此时的资源数是3 当调用cbr.await(); 的时候底层调用dowait(false, 0L) 方法。 dowait(false, 0L) 的执行流程: 一开始就加了ReentrantLock的锁,资源数(一开始是3)减1,判断资源数是不是等于0,如果不等于0通过自旋调用 trip.await(); 其中trip是ReentrantLock的条件锁, trip.await(); 的作用:
- 底层调用
addConditionWaiter() 完成创建单向链表并且进入该队列 - 调用
fullyRelease(node) 完成释放锁 - 调用
LockSupport.park(this); 阻塞线程 此时线程就是阻塞在这个地方 - 当线程被唤醒的时候调用
acquireQueued(node, savedState) 通过CAS获取ReentrantLock的锁
当资源数判断为0时,执行构造方法中的Runnable任务。然后条用nextGeneration(); nextGeneration(); 底层调用trip.signalAll(); 内部调用了doSignalAll 。doSignalAll 作用是条件等待队列的线程出队并且进入同步等待队列中。 nextGeneration(); 执行完成后 执行 dowait(false, 0L) 的finally里面的方法 lock.unlock(); 此时释放ReentrantLock的锁。 释放锁自然会唤醒ReentrantLock同步等待队列中阻塞的线程。 到此CyclicBarrier完成一轮的阻塞-唤醒流程
|