前言
本文隶属于专栏《100个问题搞定Java并发》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见100个问题搞定Java并发
正文
AQS 的设计初衷
Doug Lea 曾经介绍过 AQS 的设计初衷。
Doug Lea,这个不用多说,大家可以搜一搜 JDK 的源码,JDK5 的并发包就是他开发的,真 Java 大神。
从原理上,一种同步结构往往是可以利用其他的结构实现的,但是,对某种同步结构的倾向,会导致复杂、晦涩的实现逻辑,所以,他选择了将基础的同步相关操作抽象在 AbstractQueuedSynchronizer 中,利用 AQS 为我们构建同步结构提供了范本。
AQS 源码(JDK8)
4 大核心
state
一个 volatile 的整数成员表征状态,同时提供了 setState 和 getState 方法
private volatile int state;
队列
一个先入先出(FIFO)的等待线程队列,以实现多线程间竞争和等待,这是 AQS 机制的核心之一。
private transient volatile Node head;
private transient volatile Node tail;
从源码可以看出来,AQS 通过双链表来实现 FIFO 队列
CAS
各种基于 CAS 的基础操作方法
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
从源码中可以看出来,CAS 操作的对象是 state 或 双链表Node节点。
acquire/release
各种期望具体同步结构去实现的 acquire/release 方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
利用 AQS 实现一个同步结构,至少要实现两个基本类型的方法,分别是 acquire 操作,获取资源的独占权;还有就是 release 操作,释放对某个资源的独占。
ReentrantLock
以 ReentrantLock 为例,它内部通过扩展 AQS 实现了 Sync 类型,以 AQS 的 state 来反映锁的持有情况。
abstract static class Sync extends AbstractQueuedSynchronizer {...}
下面是 ReentrantLock 对应 acquire 和 release 操作。
lock
public void lock() {
sync.lock();
}
ReentrantLock 的 lock 内部调用 Sync 的 lock方法
abstract void lock();
Sync 是个抽象类,lock 分别由它的两个子类 公平锁:FaireSync 和 非公平锁:NonFairSync 实现
FaireSync.lock
final void lock() {
acquire(1);
}
NonFairSync.lock
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
在 ReentrantLock 中,tryAcquire 逻辑实现在 NonfairSync 和 FairSync 中,分别提供了进一步的非公平或公平性方法,而 AQS 内部 tryAcquire 仅仅是个接近未实现的方法(直接抛异常),这是留给实现者自己定义的
FaireSync.tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
NonFaireSync.tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平的 tryAcquire 内部实现了如何配合状态与 CAS 获取锁,注意,对比公平版本的 tryAcquire,它在锁无人占有时,并不检查是否有其他等待者,这里体现了非公平的语义。
acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果前面的 tryAcquire 失败,代表着锁争抢失败,进入排队竞争阶段。 这里就是我们所说的,利用 FIFO 队列,实现线程间对锁的竞争的部分,算是是 AQS 的核心逻辑。 当前线程会被包装成为一个排他模式的节点(EXCLUSIVE),通过 addWaiter 方法 添加到队列中。 acquireQueued 的逻辑,简要来说,就是如果当前节点的前面是头节点,则试图获 取锁,一切顺利则成为新的头节点; 否则,有必要则等待。
到这里线程试图获取锁的过程基本展现出来了,tryAcquire 是按照特定场景需要开发者去实现的部分,而线程间竞争则是 AQS 通过 Waiter 队列与 acquireQueued 提供的,在 release 方法中,同样会对队列进行对应操作。
unlock
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
|