一、概述
在日常开发中,经常会遇到需要在主线程中开启多个线程去并发执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景。在 CountDownLatch 出现之前都是使用线程的 join() 方法来实现这一点,但是 join() 方法不够灵活,不能满足不同场景的需要,所以出现了 CountDownLatch 这个类。
二、使用示例
public class AQS_CountDownLatch_Demo {
private static CountDownLatch countDownLatch = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " -- running");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
});
executorService.submit(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " -- running");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
});
System.out.println("等待所有子线程执行完毕");
countDownLatch.await();
executorService.shutdown();
}
}
等待所有子线程执行完毕
pool-1-thread-2 -- running
pool-1-thread-1 -- running
三、实现原理探究
1. 查看类图:
由上图可知,CountDownLatch 是使用 AQS 实现的,内部有个计数器 count ,实际上把 count 的值赋给了 AQS 的 state ,查看构造可知:
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
2. 方法剖析
1)public void countDown()
线程调用该方法后,计数器的值将递减,如果计数器值为 0, 则唤醒所有因调用 await 方法而被阻塞的线程,否则什么都不做。
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
2)public void await() throws InterruptedException
当调用了 CountDownLatch 对象的 await 方法后,当前线程会被阻塞,直到触发下列情况之一: 1)当所有线程都调用了 CountDownLatch 的 countDown() ,也就是计数器的值为0; 2)其它线程调用了当前线程的 interrupt() 方法中断了当前线程,当前线程就会抛出 InterruptedException 异常,然后返回。 由下面代码可知,线程获取资源时是可以被中断的,并且获取的资源是共享资源。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
3) public boolean await(long timeout, TimeUnit unit)
当线程调用了 CountDownLatch 对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回: 1)当所有线程都调用了 CountDownLatch 对象的 countDown 方法,计数器的值为0时,这时候会返回 true; 2)设置的 timeout 时间到了,因超时而返回 false; 3)其它线程调用了当前线程的 interrupt() 方法中断了当前线程,当前线程就会抛出 InterruptedException 异常,然后返回。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
四、总结
CountDownLatch 相比使用 join() 来实现线程间同步,前者更有灵活性和方便性。CountDownLatch 是使用 AQS 实现的,使用AQS 的状态变量来存放计数器的值。首先在初始化 CountDownLatch 时设置状态值(计数器值),当多个线程调动 countDown() 时实际是原子性递减 AQS 的状态值。当调用 await() 后当前线程会被放入 AQS 的阻塞队列等待计数器为0 再返回。其它线程调用 countDown() 让计数器递减1,当计数器值变为0时,当前线程还要调用 AQS 的 doReleaseShared 来激活由于调用 await() 而被阻塞的线程。
|