什么是AbstractQueuedSynchronizer
AbstractQueuedSynchronizer是一个框架,一个基于FIFO队列用于实现阻塞锁和同步器的框架。
AbstractQueuedSynchronizer类设计为能支持大多数情况的同步器,其中通过一个原子性的value来表示锁的状态(state),对于实现者来说,只需要安全的去改变这个state(什么叫安全),然后定义state处于什么情况(什么值)下是获取(acquired),释放(released)。
基于此同步器需要在内部创建一个helper来继承于AbstractQueuedSynchronizer,然后在定义自己的一些同步方法,在同步方法中按照自己的逻辑去访问helper方法以达到同步目的。(范例)。
AbstractQueuedSynchronizer提供了两种模式,一种独占模式(exclusive),一种共享模式(shared),通过独占表示只有一个线程能占有,共享代表着允许多个线程同时持有。
实现同步器步骤
要实现AQS只需要按照自己的需要完成下面几个方法的实现就可以了:
- tryAcquir() 尝试获取独占锁
- tryRelease() 尝试释放独占锁
- tryAcquireShared() 尝试获取共享锁
- tryReleaseShared() 尝试释放共享锁
- isHeldExclusively() 是否持有独占锁
如果我们只需要其中一种方式,那么只需要实现对应的方法就行了,比如:
我们需要完成一个共享模式的同步器,那么只需要完成tryAcquireShared()和tryReleaseShared()方法的实现,当然我们也可以两种方式都需要比如说ReentrantReadWriteLock可重入的读写锁。
对于获取锁的场景会有两种:
- 直接返回是否能获取到锁,也就是我们实现的tryAcquir() 或者 tryAcquireShared()
- 当获取不到锁的时候希望能阻塞,一直到获取到锁为止,也就是AbstractQueuedSynchronizer提供的acquire() 或者 acquireShared()
第一种场景我们是自己实现的,而比较复杂的第二种AbstractQueuedSynchronizer已经帮我们完成了
安全的去改变state
AbstractQueuedSynchronizer中定义了getState(),setState(),compareAndSetState()三个方法来操作state,基于此在自己实现上述的五个方法,可以参考范例 中的tryAcquir(),tryRelease()等方法的实现。
范例
class Mutex implements Lock, java.io.Serializable {
private static class Sync extends AbstractQueuedSynchronizer {
protected boolean isHeldExclusively() {
return getState() == 1;
}
public boolean tryAcquire(int acquires) {
assert acquires == 1;
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int releases) {
assert releases == 1;
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
AbstractQueuedSynchronizer设计
通过上面描述我们简单的感受了下AbstractQueuedSynchronizer是做什么的,以及我们该怎么做,那么接下来我们就将详细看看AbstractQueuedSynchronizer中是如何设计整个框架的。
第一步:给框架进行定位
AbstractQueuedSynchronizer的定位是一个同步器框架,是一个基于FIFO队列用于实现阻塞锁和同步器的框架。
基于此,我们就需要准备一些东西:
- 锁的资源or同步的资源
- 一个FIFO队列的实现
对于第一点,既然是锁既然是同步器,那么我们就必须要有实际的物体来锁定,而不能是虚无的,它可以是对象,可以是任意一个实实在在的东西。AbstractQueuedSynchronizer选择int类型来做为资源标识,也就是上述的state成员变量。
对于第二点,AbstractQueuedSynchronizer选择CLH队列的变种,一个节点持有前后对象的双向链表来作为阻塞队列。
根据上面的结构和资源,一个简单的流程就有了: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z7yzQitg-1649401519743)(https://note.youdao.com/yws/res/10256/WEBRESOURCEc35f366090e1f3d6e164e18aa8fb3ead)]
第二步:抽象需自定义的API
对于争夺来说,不同的场景会存在不一样的需求,对于ReentrantLock可重入锁同一个线程允许多次进入,但是在范例中实现的锁就只能进入一次。
因此,需要将争夺相关的操作抽象出来,让实现方去自定义,同理争夺进行了自定义,那么释放必然就存在则差别,同样需要进行自定义。
而锁存在则两种模式,一种独占一种共享,对于框架来说,是没办法直接明白本次争夺是独占还是共享,那么只能进行机械式的拆分为两个方法来表示这层含义。
也就诞生出了这五种需要实现方关注的方法:
- tryAcquir() 尝试获取独占锁
- tryRelease() 尝试释放独占锁
- tryAcquireShared() 尝试获取共享锁
- tryReleaseShared() 尝试释放共享锁
- isHeldExclusively() 是否持有独占锁
第三步:CLH变种队列的设计与管理
现在我们明确了资源,队列结构,争夺实现。接下来就需要完成队列的管理。
- 队列的结构?
- 队列成员属性有哪些?
- 节点成员有几种状态?
- 队列操作
队列结构
基于CLH变种队列,在AbstractQueuedSynchronizer中将持有队列的头对象与尾对象来帮助完成快速进入队列和出队列。
private transient volatile Node head;
private transient volatile Node tail;
对于头部head,只能通过setHead()方法进行修改,对于尾部tail只能通过enq()方法添加到队列中。
队列成员属性
通过队列结构我们知道队列中的成员类型都为Node类型。
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
}
基于CLH变种的双向链表,必然存在前节点和后节点,对应的成员就是prev和next。 同时需要持有当前节点的线程来进行阻塞操作和唤醒操作,对应的成员就是thread。 通过nextWaiter成员来判断该节点的类型是共享还是独占
节点成员有几种状态
节点对应的操作通过waitStatus成员来进行表示
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
}
节点状态机
创建节点时
节点进入队列后,将前驱节点变更为SIGNAL
doReleaseShare进行释放
doReleaseShared进行释放
0
初始化
SIGNAL
PROPAGATE
共享模式队列操作
通过上面对队列结构的分析和节点node的描述,接下来就该看下队列的操作中是如何使用到队列结构与节点的。
共享模式-总体思路
原则:
- SIGNAL的节点需要对后续节点进行唤醒(PROPAGATE也会,单场景单一,只在替换哨兵节点的时候)
- CANCELLED的节点需要移除队列中
- 执行完毕的节点需要重置为0
- PROPAGATE的节点需要被转换成SIGNAL节点(涉及到共享锁的一个bug)
由第一个节点的waitStatus状态(SIGNAL)来判断是否进行后继节点的唤醒(出队),唤醒时机在释放(releaseShared)是进行。 对于CANCELLED节点,在获取(acquiredShared),释放(releaseShared)进行踢出队列。
第一种简单情形:
node3进入队列前:
node3进入队列后:
node1:SIGNAL
node2:SIGNAL
node3:0
第二种包含CANCELLED节点 node3进入队列前:
node1:SIGNAL
node2:CANCELLED
node3进入队列后:
共享模式-出入队
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 当队列初始化之后,头结点和尾节点不会为null,始终会保持存在头结点(哨兵节点),在共享模式下,当头结点获得之后,将让后继节点继续进行获取,并重置为head以达到共享的目的
addWaiter()
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
setHeadAndPropagate()
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
doReleaseShared()
1.要么将后继节点唤醒,把自身状态重置为0 2.要么把自身状态修改为PROPAGATE,解决共享锁同时访问的并发问题
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
PROPAGATE状态产生的原因
起因是由于共享锁,同时进行释放导致有资源的情况下线程需要持续等待。 bug地址
在没有PROPAGATE的情况下 假设:我们有2个资源,同时呢有2个线程已经持有了,有2个线程等待获取。

当node1进行释放时,唤醒node3: 
node1释放,state+1。 node3获取,state-1。 这时候node3通过doAcquireShared获取资源为0(刚好够自己用,不需要通知后续节点),但是没有执行完将自己设置为头部。
当node3准备将自己设置为头部之前,node2进行释放:  尴尬的情况出现了,资源state为1,由于前面获取的资源为0,那么node3将自己设置为head之后不会对后续的节点进行唤醒(因为它认为资源是空,唤醒没有意义,但是在它变为head之前已经有其它持有资源的线程释放了资源)。
场景描述清楚了,那么解决方案就是新增一个PROPAGATE状态,将0变更为PROPAGATE状态表示在进行头部节点的替换过程中存在资源的释放,同时在setHeadAndPropagate()判断是否唤醒后续节点的条件加上,当head处于PROPAGATE状态时。
共享模式队列操作
跟共享差别不大,获取锁时不在有扩散行为(PROPAGATE),不再做分析。
ConditionObject条件队列
后续在聊
|