下面让我们步入正轨,谈谈如何获取锁以及释放锁。
- 独占式(排他式)获取同步状态:acquire(), addWaiter(), enq(), acquireQueued();
- 独占式释放同步状态:release();
- 共享式获取同步状态:acquireShared(), doAcquireShared(), setHeadAndPropagate();
- 共享式释放同步状态:releaseShared(), doReleaseShared();
这个图总结了 AQS 整体架构的组成,和部分场景的动态流向,图中两个点说明一下,方便大家观看。
- AQS 中队列只有两个:同步队列 + 条件队列,底层数据结构两者都是链表(在上一篇我们已经介绍过了);
- 图中有四种颜色的线代表四种不同的场景,1、2、3 序号代表看的顺序。
AQS 本身就是一套锁的框架,它定义了获得锁和释放锁的代码结构,所以如果要新建锁,只要继承 AQS,并实现相应方法即可。
一、获取锁
1.1 acquire排他锁
获取锁最直观的感受就是使用 Lock.lock () 方法来获得锁,最终目的是想让线程获得对资源的访问权。
Lock 一般是 AQS 的子类,lock 方法根据情况一般会选择调用 AQS 的 acquire 或 tryAcquire 方法。
acquire 方法 AQS 已经实现了,tryAcquire 方法是等待子类去实现。 acquire 方法制定了获取锁的框架,先尝试使用 tryAcquire 方法获取锁,获取不到时,再入同步队列中等待锁。tryAcquire 方法 AQS 中直接抛出一个异常,表明需要子类去实现,子类可以根据同步器的 state 状态来决定是否能够获得锁,接下来我们详细看下 acquire 的源码解析。
acquire 也分两种,一种是排它锁,一种是共享锁,我们一一来看下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
以上代码的主要步骤是(流程见整体架构图中红色场景):
- 尝试执行一次 tryAcquire,如果成功直接返回,失败走 2;
- 线程尝试进入同步队列,首先调用 addWaiter 方法,把当前线程放到同步队列的队尾;
- 接着调用 acquireQueued 方法,两个作用,第一个作用是阻塞当前节点,第二个作用是节点被唤醒时,使其能够获得锁;
- 如果 2、3 失败了,打断线程。
1.1.2 addWaiter
接下来我们先来看下 addWaiter 的源码实现:
方法的主要目的:node追加到同步队列的队尾;
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
总结:就是把新的节点追加到同步队列的队尾。
1.1.3 acquireQueued
下一步就是要阻塞当前线程了,是 acquireQueued 方法来实现的,即队列中的节点什么时候阻塞,什么时候唤醒由 acquireQueued 去决定,我们来看下源码实现:
这个方法主要做了两件事:
- 通过不断的自旋尝试使自己的前一个节点的状态变成signal状态,然后阻塞自己;
- 获得锁的线程执行完毕之后,释放锁时,会把阻塞的 node 唤醒,node 唤醒之后再次自旋,尝试获得锁;
- 返回 false 表示获得锁成功,返回 true 则表示失败
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1.1.4 shouldParkAfterFailedAcquire
shouldParkAfterFailedAcquire,这个方法的主要目的就是把前一个节点的状态置为 SIGNAL,只要前一个节点的状态是 SIGNAL,当前节点就可以阻塞了。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
acquire 整个过程非常长,代码也非常多,但注释很清楚,可以一行一行仔细看看代码。
1.1.5 总结
acquire 方法大致分为三步:
- 使用 tryAcquire 方法尝试获得锁,获得锁直接返回,获取不到锁的走 2;
- 把当前线程组装成节点(Node),追加到同步队列的尾部(addWaiter);
- 自旋,使同步队列中当前节点的前置节点状态为 signal 后,然后阻塞自己。
1.2 acquireShared 获取共享锁
acquireShared 整体流程和 acquire 相同,代码也很相似,重复的源码就不贴了,我们就贴出来不一样的代码来,也方便大家进行比较:
-
第一处尝试获得锁的地方,有所不同,排它锁使用的是 tryAcquire 方法,共享锁使用的是 tryAcquireShared 方法,如下图: -
第二处不同,在于节点获得排它锁时,仅仅把自己设置为同步队列的头节点即可(setHead 方法),但如果是共享锁的话,还会去唤醒自己的后续节点,一起来获得该锁(setHeadAndPropagate 方法),不同之处如下(左边排它锁,右边共享锁):
1.2.1 setHeadAndPropagate
这个方法主要做了两件事:
- 把当前节点设置成头节点
- 看看后续节点有无正在等待,并且也是共享模式的,有的话唤醒这些节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
这个就是共享锁独特的地方,当一个线程获得锁后,它就会去唤醒排在它后面的其它节点,让其它节点也能够获得锁。
二、释放锁
释放锁的触发时机就是我们常用的 Lock.unLock () 方法,目的就是让线程释放对资源的访问权(流程见整体架构图蓝色路线)。 释放锁也是分为两类,一类是排它锁的释放,一类是共享锁的释放,我们分别来看下。
2.1 释放排它锁
排它锁的释放就比较简单了,从队头开始,找它的下一个节点,如果下一个节点是空的,就会从尾开始,一直找到状态不是取消的节点,然后释放该节点,源码如下:
2.1.1 release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2.1.2 unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
2.2 释放共享锁
2.2.1 releaseShared
释放共享锁的方法是 releaseShared,主要分成两步:
- tryReleaseShared 尝试释放当前共享锁,失败返回 false,成功走 2;
- 唤醒当前节点的后续阻塞节点,这个方法我们之前看过了,线程在获得共享锁的时候,就会去唤醒其后面的节点,方法名称为:doReleaseShared。
我们一起来看下 releaseShared 的源码:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
三. 条件队列的一些重要方法
3.1 为什么需要条件队列
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对 象关联的锁。Condition对象是由Lock对象创建出来的,换句话说,Condition是依赖Lock对象的。 当调用Condition的await()方法后,当前线程会释放锁并在此等待。而其他线程调用Condition对象的 signal()方法通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。 因为并不是所有场景一个同步队列就可以搞定的。
- 在遇到锁 + 队列结合的场景时,就需要 Lock + Condition 配合才行,先使用 Lock 来决定哪些线程可以获得锁,哪些线程需要到同步队列里面排队阻塞;
- 获得锁的多个线程在碰到队列满或者空的时候,可以使用 Condition 来管理这些线程,让这些线程阻塞等待,然后在合适的时机后,被正常唤醒。
同步队列 + 条件队列联手使用的场景,最多被使用到锁 + 队列的场景中。所以说条件队列也是不可或缺的一环。
接下来我们来看一下条件队列一些比较重要的方法,以下方法都在 ConditionObject 内部类中。
3.2 入队列等待 await
获得锁的线程,如果在碰到队列满或空的时候,就会阻塞住,这个阻塞就是用条件队列实现的,这个动作我们叫做入条件队列,方法名称为 await,流程见整体架构图中绿色箭头流向,我们一起来看下 await 的源码:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await 方法有几点需要特别注意:
- fullyRelease(node);,节点在准备进入条件队列之前,一定会先释放当前持有的锁,不然自己进去条件队列了,其余的线程都无法获得锁了;
- (acquireQueued(node, savedState) && interruptMode != THROW_IE),此时节点是被 Condition.signal 或者 signalAll 方法唤醒的,此时节点已经成功的被转移到同步队列中去了(整体架构图中蓝色流程),所以可以直接执行 acquireQueued 方法;
- Node 在条件队列中的命名,源码喜欢用 Waiter 来命名,所以我们在条件队列中看到 Waiter,其实就是 Node。
await 方法中有两个重要方法:addConditionWaiter 和 unlinkCancelledWaiters,我们一一看下。
3.3 addConditionWaiter
addConditionWaiter 方法主要是把节点放到条件队列中,方法源码如下:
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
整体过程比较简单,就是追加到队列的尾部
3.4 unlinkCancelledWaiters
其中还有个重要方法叫做 unlinkCancelledWaiters,这个方法会删除掉条件队列中状态不是 CONDITION 的所有节点,我们来看下 unlinkCancelledWaiters 方法的源码,如下:
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
3.5 单个唤醒 signal
signal 方法是唤醒的意思,比如之前队列满了,有了一些线程因为 take 操作而被阻塞进条件队列中,突然队列中的元素被线程 A 消费了,线程 A 就会调用 signal 方法,唤醒之前阻塞的线程,会从条件队列的头节点开始唤醒(流程见整体架构图中黄色部分),源码如下:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
我们来看下最关键的方法:transferForSignal。
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
整个源码下来,我们可以看到,唤醒条件队列中的节点,实际上就是把条件队列中的节点转移到同步队列中,并把其前置节点状态置为 SIGNAL。
3.6 全部唤醒 signalAll
signalAll 的作用是唤醒条件队列中的全部节点,源码如下:
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
从源码中可以看出,其本质就是 for 循环调用 transferForSignal 方法,将条件队列中的节点循环转移到同步队列中去。
四、总结
这个图应该看懂了吧,伙计们。2022新年快乐。
|