一、AQS原理
1、AQS是什么?
它是用来构建锁或者其他同步组件的基础框架。
从它的名拆开来看:
-
Abstract:因为它不知道怎么上锁,所以采用模板方法 暴露出上锁的逻辑。
-
Queue:阻塞队列
- 通过内置的FIFO队列来完成线程的排队工作;即抢不到锁的线程来这排队。
-
Synchronizer:同步
- 使用一个int成员变量state表示同步状态;采用CAS完成多线程抢锁的逻辑。
子类只需要实现自己的获取锁逻辑和释放锁逻辑即可,至于排队阻塞等待、唤醒机制均由AQS完成。
和真正的锁不同的是:AQS同步器是面向锁的实现者,而锁则是面向使用者。
2、AQS的核心:CLH队列
AQS里面的CLH队列是CLH同步锁的一种变形。AQS依赖内部的这个同步队列(FIFO)来完成同步状态的管理。
AQS的核心就是用CAS去操作这个同步队列的head和tail,也就是用CAS操作代替了锁整条双向链表的操作,所以它效率高。
-
1> 当线程获取同步状态失败时,AQS会将< 当前线程、等待状态等信息>构建成一个Node 节点通过addWaiter() 方法加入到阻塞队列的尾部,同时会阻塞当前线程。
- Node节点的内容:获取同步状态失败的线程引用、等待状态、前驱和后继节点等。
-
AQS的addWaiter() 方法是基于CAS的设置尾节点,其中:
-
compareAndSetTail(Node expext, Node update),保证节点入队的线程安全性。 -
最后会在enq(fianl Node node)方法中,通过“死循环”来保证节点的正确添加。 -
2> 当同步状态释放时,会把首节点的第一个活跃的后继节点线程唤醒,使其再次尝试获取同步状态。
- 唤醒操作只需要将首节点设置成为原首节点的后续节点,断开原节点的next引用即可。
3、以独占锁同步状态的获取和释放为例:
- 在使用
tryAcquire() 方法获取同步状态时,AQS维护了一个同步阻塞队列,获取同步状态失败的线程都会通过CAS被加入到队列中并在队列中进行自旋。 - 线程移出队列(停止自旋)的前提是前驱节点是头节点且成功获取了同步状态。
- 在释放同步状态时,AQS调用
tryRelease(int arg) 方法释放同步状态,然后唤醒头节点的后继节点。 - 注意:维护同步队列的FIFO原则,只有前驱节点是头结点的节点才能获取同步状态。
4、从具体的代码实现来看
此处做一个总述,具体细节见后面的源码分析
1)互斥锁
<1> acquire() 获取互斥锁
<2> release() 释放互斥锁
- release()方法中有一个模板方法
tryRelease() ,用于给子类实现调用,判断是否释放锁成功,如果失败直接返回FALSE,否者:
- 调用
unparkSuccessor() 方法唤醒head节点的有效后继节点,即:从head节点开始往后找到第一个有效后继节点,唤醒。
- 不过,找后继节点时,如果head的第一个后继节点是个无效节点,需要从
tail 节点开始往前遍历找到第一有效的后继节点。 - 因为我们往阻塞队列中添加线程节点时,首先更新的是tail引用,然后将旧的tail.next指向新tail,所以可能有一瞬间遍历不到新的tail节点。
2)共享锁
<3> acquireShared() 获取共享锁
<4> releaseShared() 释放共享锁
- releaseShared()方法中有一个模板方法
tryReleaseShared() ,用于给子类实现调用,判断是否释放锁成功,如果失败直接返回FALSE,否者:
- 调用
doReleaseShared() 方法唤醒head节点的有效后继节点,即:从head节点开始往后找到第一个有效后继节点,唤醒。
- 不过,找后继节点时,如果head的第一个后继节点是个无效节点,需要从
tail 节点开始往前遍历找到第一有效的后继节点。 - 因为我们往阻塞队列中添加线程节点时,首先更新的是tail引用,然后将旧的tail.next指向新tail,所以可能有一瞬间遍历不到新的tail节点。
5、面试题2:AQS中线程Node节点的PROPAGATE状态是什么意思?
在共享锁中,线程调用doReleaseShared() 方法执行唤醒时,方法的代码段不是互斥的,所以为了保证线程的正确唤醒,也就有了这么一个中间状态。
以一个场景为例:Semaphore有两个许可,A线程和B线程都获取到了许可,C线程和D线程在等着。
-
假如A和B同时执行、释放锁,A在doReleaseShared() 中CAS成功,将head节点B状态设置为0,并通过unparkSuccessor() 方法去唤醒C。 -
C被唤醒之后,它去抢锁,并且抢到锁了,在doAcquireShared() 中有可能执行到了setHeadAndPropagate() 方法去设置头结点为自己 ,也有可能没执行到 ,即可能将head节点从B更新到了C,可能还是B。这时我们再去看线程B释放锁 的情况。 -
此时线程B他获取到的head节点就有两种情况 : -
-
head还是B
- 表明线程A释放了一个共享锁唤醒了线程C,且线程C还没有将head节点更新到C上,即此时的head节点还是B,并且head节点的线程状态是0;
- B线程通过
doReleaseShared() 又释放了一个共享锁,释放锁的过程中发现head节点B的状态是0,这时就会将head节点B的状态设置为PROPAGATE状态。
如果没有设置为PROPAGATE状态这一步,在线程B调用doReleaseShared() 方法释放锁的过程,直接就h == head break退出循环了。即D不会由B唤醒,
- 此时给人的感觉就是:原本应该有两个信号量,这时只有一个了。
-
head是C就正常走了。
二、AQS源码分析
1、acquire()获取互斥锁
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
1> addWaiter()方法
获取互斥锁失败后,首先调用addWaiter() 方法,通过CAS把当前线程节点封装成Node 阻塞节点,放到AQS的阻塞队列中;
addWaiter()方法中基于CAS的设置尾节点,将阻塞线程节点放入阻塞队列;
保证节点入队的线程安全性;
并采用全路径 + 优化前置 (把enq()方法中for循环里的一些判断前置到addWaiter()方法)的技巧,实现快速入队。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
1)面试题3:为什么CAS可以保证线程安全?
-
正常将一个节点添加到队列尾部需要三个操作:
- 将tail节点的next引用指向newNode节点;
- 将newNode节点的prev引用指向tail节点;
- 将tail节点指针指向newNode节点。
-
那么CAS如何保证三个操作的原子性? -
addWaiter()中的逻辑为:
-
先复制引用,在线程栈中引用tail节点; -
将newNode节点的prev指向tail节点; -
CAS将tail指向newNode节点; -
修改旧tail节点的next引用到newNode上。
-
这时,如果另外一个线程在第三步CAS失败,会for循环继续重试。直到CAS成功才会返回。
2)面试题4:是否存在通过head引用遍历不到最后一个节点的情况?
存在,因为:
在addWaiter()方法将newNode节点设置为新tail节点之后,才会将旧tail节点的next引用指向newNode。
- 如果在设置旧tail.next = newNode之前,我们通过head引用遍历,这一瞬间是遍历不到最后一个节点的。
如何获取到最新的节点?
2> acquireQueued()方法
获取互斥锁失败后,其次调用acquireQueued() 方法尝试再获取一次锁,进而决定是否将当前线程进行阻塞。
@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1)面试题5:为什么添加到阻塞队列后,要先尝试获取锁?
因为如果添加到阻塞队列后,当前锁状态是无锁的话,就不会有活跃线程获取到锁。
1>> shouldParkAfterFailedAcquire()方法
如果前驱节点不是头结点 或 抢锁失败,需要先调用shouldParkAfterFailedAcquire() 方法判断线程是否应该被阻塞?
- 也就是找到一个可靠的(活着有效的)节点,然后将当前线程节点作为其后继节点即可。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
2>> parkAndCheckInterrupt()方法
如果前驱节点不是头结点 或 抢锁失败,并且可以阻塞当前线程,则调用parkAndCheckInterrupt() 方法以响应式中断的方法阻塞当前线程。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
2、release()释放互斥锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null
&& h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
1> unparkSuccessor()方法
该方法用于释放node节点后的有效后继节点,即:从node节点开始往后找到有效后继节点,唤醒。
找后继节点时,如果后继节点是个无效节点,需要从tail开始往前遍历找到第一有效的后继节点。
因为因为我们往阻塞队列中添加线程节点时,首先更新的是tail引用,然后将旧的tail.next指向新tail,所以可能有一瞬间遍历不到新的tail节点。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null
|| s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
3、acquireShared()获取共享锁
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
1> doAcquireShared()方法
获取共享锁失败后,首先调用doAcquireShared() 方法,阻塞当前线程节点;其中包括:
- 和
tryAcquire() 方法一样的调用addWaiter() 方法,通过CAS把当前线程节点封装成Node 阻塞节点,放到AQS的阻塞队列中;只是这里Node节点的类型是SHARED。 - 和
tryAcquire() 方法类似,尝试获取共享锁; - 继续往下走,如果线程获取锁失败,则采用响应中断的方式使用
LockSupport.park(this) 方法阻塞当前线程。 - 最后,如果在尝试获取锁的过程中失败,则调用
cancelAcquire() 方法取消当前线程尝试获取共享锁的操作。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1>> setHeadAndPropagate()方法
获取共享锁成功后,会调用setHeadAndPropagate() 方法将当前节点更新为head节点,并唤醒后继节点;
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0
|| h == null
|| h.waitStatus < 0
|| (h = head) == null
|| h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
在调用setHeadAndPropagate() 方法将当前节点更新为head节点,并唤醒后继节点时,如果当前节点没有后继节点了,则调用doReleaseShared() 方法释放共享锁;
doReleaseShared() 方法在下面的releaseShared() 方法中解析
4、 releaseShared()释放共享锁
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
1> doReleaseShared()方法
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
5、 acquireInterruptibly()以响应中断的方式获取互斥锁
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
总结
子类只需要实现自己的获取锁逻辑和释放锁逻辑即可,至于排队阻塞等待、唤醒机制均由AQS完成。
|