1、简介
- AQS 的全称是 AbstractQueuedSynchronizer,它的定位是为 JAVA 种几乎所有的锁和同步器提供一个基础框架
- AQS 是基于 FIFO 队列实现的,并且内部维护了一个状态变量 state,通过原子更新这个状态变量 state 即可实现加锁解锁操作。
主要内部类 Node
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
双向链表结构,节点中保存着当前线程、前一个节点、后一个节点以及线程的状态等信息。
主要属性
private transient volatile Node head;
private transient volatile Node tail;
注意:这几个变量都要使用 volatile 关键字来修饰,因为是在多线程环境下操作的,要保证它们的值修改后其他线程立即可见。
这几个变量的修改都是直接使用 Unsafe 这个类来操作的:
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
父类属性
AQS 还用到了其父类的 AbstractOwnableSynchronizer 的一些属性:
private transient Thread exclusiveOwnerThread;
2、AQS 成员方法解析
2.1、lock() 方法加锁解析
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
...
}
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
...
}
2.2、acquire(long arg) 令当前线程竞争资源的方法
public final void acquire(long arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2.3、tryAcquire(int acquires) 尝试获取锁的方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
2.4、hasQueuedPredecessors() 判断当前队列中是否有等待者线程
private boolean hasQueuePredecessor(){
Node h = head;
Node t = tail;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
2.5、addWaiter(Node mode) 方法:将当前线程加入到阻塞队列中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
2.6、enq(final Node node) 方法:当前线程对应的节点完整入队的方法(自旋入队)
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2.7、acquireQueued(final Node node, long arg) 方法:竞争资源失败后需要做什么?以及挂起被唤醒后需要做的事情(自旋竞争锁资源)
accquireQueued 需要做什么呢?
- 当前节点如果没有被 park 挂起,则 ===> 挂起当前线程
- 线程唤醒后 ===> 需要做一些线程唤醒之后的逻辑
final boolean acquireQueued(final Node node, long arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.8、shouldParkAfterFailedAcquire(Node pred, Node node) 方法:判断当前线程资源竞争失败后需不需要被挂起?
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
总结:
- 如果当前节点的前置节点是 CANCLED 取消状态,则:
- 第一次自旋来到这个方法的时候,会越过取消状态的节点。
- 第二次返回 true,然后 park 当前线程
- 如果当前节点的前置节点是 0 默认状态,则:
- 当前线程节点会设置前置节点的状态为 -1(SIGNAL)
- 第二次自旋来到这个方法的时候,会返回 true,然后 park 挂起当前线程
2.9、parkAndCheckInterrupt() 挂起当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
|