AQS是AbstractQueuedSynchronizer的简称,通过一个原子变量+双向队列来实现多线程安全。
核心思想: 若资源state空闲则当前线程设置为拥有线程,并将资源设置为占用。 若资源state被占用,则请求线程进入一个FIFO同步队列等待唤醒机制保证资源分配。 参考资料 https://www.cnblogs.com/truestoriesavici01/p/13213978.html https://blog.csdn.net/sifanchao/article/details/84343848
原子变量state
state的定义:private volatile int state; 使用Unsafe+CAS更新state.
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
同步双向队列
当资源state被占用时,其他竞争线程会进入同步队列。等待资源被释放后唤醒如下图。
AQS结点结构
AQS同步队列是一个FIFO的双向有序队列,头结点占用资源。AQS有个静态内部类Node,Node属性如下:
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
结点状态
AQS每个Node都有不用状态。在AQS获取锁和释放锁都用到这个状态。
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
static final int INIT = 0;
独占锁获取
ReentrantLock 对象创建,有带参数和不带参数的实现。默认不带参是非公平锁。具体看ReentrantLock源码
@Test
public void testAqs() {
ReentrantLock lock = new ReentrantLock();
lock.lock();
System.out.println("");
lock.unlock();
}
public void lock() {
sync.lock();
}
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
final void lock() {
acquire(1);
}
公平锁:直接调用acquire获取同步锁。 非公平锁: 先调用CAS尝试同步状态,如果成功就把当前线程设置为独占线程。失败在调用acquire获取同步锁 区别:非公平锁在调用acquire之前,会尝试先获取一次锁。acquire方法公平锁和非公平锁共用代码在AQS实现。
acquire方法
公平锁和非公平锁都会调用到acquire方法尝试获取同步状态。详细见AbstractQueuedSynchronizer#acquire。实现逻辑:首先尝试获取同步资源,获取失败加入队列等待唤醒。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平锁和非公平锁实现区别:公平锁当同步状态空闲的时会判断同步队列是否有数据,没有才尝试抢占资源。非公平锁则忽略同步队列的存在直接抢占资源。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法
添加完结点到同步队列时,结点排队获取同步资源
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
acquireQueued流程: shouldParkAfterFailedAcquire方法逻辑:
总结
AQS用同步资源state和双向同步队列实现。首先尝试获取锁,获取不到则添加结点到队尾。 添加到队尾之后,尝试出队。获取失败就unpark等待被前一个节点唤醒详见unlock源码。被前一个节点唤醒之后又重新尝试获取同步资源,可能失败被重新unpark等待后重新被唤醒。如此自旋直到获取锁。但自始至终只有一个线程在自旋获取锁,其他节点都在堵塞
释放锁unlock
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
|