初步认识AbstractQueuedSynchonizer
源码采用JDK8
什么是AQS
AQS是用来构建锁或者其他同步组件的基础框架,它使用了一个int的变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
它的主要使用方式是继承,子类通过继承AQS并实现他的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行修改,AQS为此提供了三个方法(getState() 、setState(int newState) 、compareAndSetState(int expect, int update) )来操进行作操作,它们可以保证修改是安全的。使用自定义同步组件的静态内部类来实现子类,同步器自身没有实现任何同步接口,它仅仅定义了一些关于同步状态获取和释放的方法来使用。AQS既支持抢占式的获取同步状态,也支持共享式的获取同步状态。
AQS的接口和示例
AQS的设计基于模板方法模式的,这就需要我们继承AQS并重写指定的方法,随后AQS组合在自定义的组件实现中,并调用同步器提供的模板方法,而这些模板方法会调用我们重写的方法。子类需要重写的三个方法:
- getState() :获取当前同步状态
- setState(int newState):设置当前同步状态
- compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性
AQS可以被重写的方法
方法名称 | 描述 |
---|
protected boolean tryAcquire(int arg) | 独占式获取同步状态,实现的时候需要查询当前状态并判断是否符合预期,再使用CAS设置同步状态 | protected boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 | protected int tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于0的值,表示获取成功,否知失败 | protected boolean tryReleaseShared(int arg) | 共享式释放同步状态 | protected boolean isHeldExclusively() | 当前AQS是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 |
AQS提供的模板方法基本分为3类
- 独占式获取与释放同步状态
- 共享式获取与释放同步状态
- 查询同步队列中的等待线程情况
方法名称 | 描述 |
---|
void acquire(int arg) | 独占式获取同步状态,如果当前线程获取同步状态成功则返回,否则进入同步队列等待,该发放会去调用重写的tryAcquire(int arg) | void acquireInterruptibly(int arg) throws InterruptedException | 与acquire(int arg)的差异是 这个方法响应中断,当前线程在等待队列中,如果被中断,那么该方法会抛出异常并返回 | boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException | 在acquireInterruptibly(int arg)基础上增加了超时机制,在规定时间内取不到同步状态,也会以false返回。 | boolean release(int arg) | 独占式释放同步状态,在释放同步状态后,会将等待队列的第一个节点对应的线程唤醒 | void acquireShared(int arg) | 共享式获取同步状态,如果当前线程获取同步状态成功则返回,否则进入同步队列等待,该发放会去调用重写的tryAcquire(int arg),与独占式不同,可以有多个线程获取到同步状态。 | void acquireSharedInterruptibly(int arg) throws InterruptedException | 与acquireShared(int arg)的差异是 这个方法响应中断,当前线程在等待队列中,如果被中断,那么该方法会抛出异常并返回 | boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException | 在acquireSharedInterruptibly(int arg)基础上增加了超时机制,在规定时间内取不到同步状态,也会以false返回。 | boolean releaseShared(int arg) | 共享式释放同步状态, | Collection getQueuedThreads() | 获取等待队列中的线程集合 |
官网例子
在JDK1.8源码里面有提供了一个独占锁的实现例子Mutex自定义同步组件,它在同一时刻只能有一个线程能获取到锁。在Mutex中定义一个静态内部类Sync。
Sync继承了AQS,并使用getState() 、setState(int newState) 、compareAndSetState(int expect, int update) 重写了isHeldExclusively() 、tryAcquire(int acquires) 、tryRelease(int releases) 方法。
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Mutex implements Lock, java.io.Serializable {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
@Override
public boolean tryAcquire(int acquires) {
assert acquires == 1;
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
assert releases == 1;
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
Condition newCondition() {
return new ConditionObject();
}
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0);
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
编写测试MutexTest类,会使用AQS的模板方法进行操作。通过lock() 进行获取锁(内部调用了Sync的tryAcquire),unlock() (Sync的tryRelease)解锁。
public class MutexTest {
public static void main(String[] args) {
Mutex lock = new Mutex();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread() + " get lock");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread() + " release lock");
lock.unlock();
}
});
}
executorService.shutdown();
}
}
通过结果,可以看出一次只有一个线程获取到锁。
Thread[pool-1-thread-1,5,main] get lock. 17:21:23
Thread[pool-1-thread-1,5,main] release lock. 17:21:24
Thread[pool-1-thread-2,5,main] get lock. 17:21:24
Thread[pool-1-thread-2,5,main] release lock. 17:21:25
Thread[pool-1-thread-3,5,main] get lock. 17:21:25
Thread[pool-1-thread-3,5,main] release lock. 17:21:26
Thread[pool-1-thread-4,5,main] get lock. 17:21:26
Thread[pool-1-thread-4,5,main] release lock. 17:21:27
Thread[pool-1-thread-5,5,main] get lock. 17:21:27
Thread[pool-1-thread-5,5,main] release lock. 17:21:28
AQS实现分析
核心思想
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制在AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。队列有同步队列(sync queue)和条件队列(condition queue)。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列,即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
我的理解就是没有用类似QUEUE那样的队列的实例,而是通过NODE中存放前后结点PreNode和NextNode形成一种双向链表似的关系
sync queue 同步队列
AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS操作对该同步状态进行原子操作实现对其值的修改。CAS操作主要借助sun.misc.Unsafed类来实现。
private volatile int state;
同步队列中的结点用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继结点,结点的属性类型与名称, 源码如下
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
同步队列的数据结构
在AQS中有两个结点类型的引用,head是指向头结点(状态值不会是1),tail是指向尾结点。
private transient volatile Node head;
private transient volatile Node tail;
独占式同步状态获取与释放
以上述官网独占式获取同步状态为例,在使用sync.acquire(1); 获取同步状态失败的时候,会执行addWaiter(Node.EXCLUSIVE), arg) ,先通过compareAndSetTail(pred, node) 尝试快速填充队尾,如果填充失败或者当没有尾结点时,去调用enq(final Node node) 进行队列初始化,通过compareAndSetHead(Node update) 和compareAndSetTail(Node expect, Node update) 来设置AQS的head结点和tail结点。在完成addWaiter之后,继续执行acquireQueued(final Node node, int arg) ,如果当前结点的前驱结点是首结点,再次尝试获同步状态,若成功,将当前结点更新head结点,若失败,进行线程park,等待前驱结点释放锁唤醒当前线程,若期间当前线程中断,也会被唤醒。若发生异常情况, 会通过cancelAcquire(Node node) 取消继续获取(资源)。
锁获取的主要流程图如下:
独占锁的释放主要通过调用release(int arg) 来释放锁。
源码部分
addWaiter(Node mode)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq(final Node node)
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(Node pred, Node node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
cancelAcquire(Node node)
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node;
}
}
unparkSuccessor(Node node)
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
独占锁的释放主要通过调用release(int arg) 来释放锁。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
共享式同步状态获取与释放
在阅读这块内容时,可以先读Semaphore和CountDownLatch
共享式的锁是从acquireShared(int arg) 方法开始尝试获取。对于获取同步状态失败的线程会进入到doAcquireShared(int arg) 中继续执行,同样使用addWaiter(Node.SHARED) 填充队列,不过结点是共享模式的。如果当前结点的前驱结点是首结点,尝试获取锁,若成功获取,则执行setHeadAndPropagate(node, r) 设置首结点和可能唤醒后继结点;若获取失败,会判断是否park线程和中断标记,等待前驱结点释放锁唤醒当前线程,若期间当前线程中断,也会被唤醒。若发生异常情况, 会通过cancelAcquire(Node node) 取消继续获取(资源)。
acquireShared(int arg)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
doAcquireShared(int arg)
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();、
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate(Node node, int propagate)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
setHead(Node node)
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
doReleaseShared()
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
共享锁的释放可以通过调用releaseShared(int arg) 完成。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
独占式与共享式的主要区别
共享式获取允许同一时刻多个线程获取到同步状态。
共享式可以允许其他共享式的访问,独占式不允许其他任何访问。
condition queue 等待队列
ConditionObject实现了Condition接口,可以与Lock配合实现等待/通知模式。它与Object的监视器方法有些区别。
对比项 | Object monitor methods | condition |
---|
前置条件 | 获取对象的锁 | 调用Lock.lock()获取锁 调用Lock.newCondition()获取Condition对象 | 调用方式 | 直接调用 如object.wait() | 直接调用 如condition.await() | 等待队列个数 | 一个 | 多个 | 当前线程释放锁并进入等待状态 | 支持 | 支持 | 当前线程释放锁并进入等待状态,在等待状态中不响应中断 | 不支持 | 支持 | 当前线程释放锁并进入超时等待状态 | 支持 | 支持 | 当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 | 唤醒等待队列的一个线程 | 支持 | 支持 | 唤醒等待队列的全部线程 | 支持 | 支持 |
当前线程在调用这些方法的时候需要提前获取Condition对象关联的锁,Condition对象是由Lock对象创建出来的,由于Condition依赖Lock对象,所以采用了内部类的实现方式,将ConditionObject 作为AQS的内部类。
ConditionObject的主要方法
方法名称 | 描述 |
---|
void await() throws InterruptedException | 当前线程进入等待状态(释放锁)直至被通知(signal)或者中断,当前线程将进入运行状态且从await()方法返回的情况,包括 其让线程调用该condition的signal()或者signalAll()方法,而当前线程被选中唤醒 1.其他线程(调用interrupt()方法)中断当前线程 2.如果当前等待线程从await()方法返回,那么表明该线程已经获取了Condition对象对应的锁 | long awaitNanos(long nanosTimeout) throws InterruptedException | 当前线程进入等待状态(释放锁)直至被通知(signal)、中断或者超时。返回值表示还有多少时间超时,如果值为负数或0,认定为已经超时。 | boolean awaitUntil(Date deadline) throws InterruptedException | 当前线程进入等待状态(释放锁)直至被通知(signal)、中断或者到某个时间。如果到了指定时间还没有被通知唤醒,方法返回false,还没到指定时间被唤醒,方法返回true。 | boolean await(long time, TimeUnit unit) throws InterruptedException | 当前线程进入等待状态(释放锁)直至被通知(signal)、中断或者超时。返回true代表超时,返回false代表未超时。 | void awaitUninterruptibly() | 当前线程进入等待状态(释放锁)直至被通知(signal),对中断不敏感 | void signalAll() | 唤醒所有等待在condition上的线程,能够从等待方法返回的线程必须获得与condition相关联的锁 | void signal() | 唤醒一个等待在condition上的线程,该线程能够从等待方法返回,必须获得与condition相关联的锁 |
条件队列的数据结构
每一个Condition对象都包含一个队列,它是一个单向队列,主要有两个结点对象,firstWaiter指首个结点,lastWaiter指最后一个结点。
等待与通知模式示例
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedQueue<T> {
private Object[] items;
private int addindex, removeindex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
System.out.println(Thread.currentThread() + " is full. " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
notFull.await();
}
items[addindex] = t;
if (++addindex == items.length) {
addindex = 0;
}
++count;
System.out.println(Thread.currentThread() + " is not empty. " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
System.out.println(Thread.currentThread() + " is empty. " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
notEmpty.await();
}
Object x = items[removeindex];
if (++removeindex == items.length) {
removeindex = 0;
}
--count;
notFull.signal();
System.out.println(Thread.currentThread() + " is not full. " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
return (T) x;
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
BoundedQueue m = new BoundedQueue(2);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
int finalI = i;
executorService.execute(() -> {
try {
m.add(finalI);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
等待
调用condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程变为等待状态,当从await方法返回时,当前线程一定获取了condition的相关锁。
以await()为例。
await() throws InterruptedException
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter()
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
unlinkCancelledWaiters()
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
fullyRelease(Node node)
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
boolean isOnSyncQueue(Node node)
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null)
return true;
return findNodeFromTail(node);
}
唤醒
signalAll唤醒条件队列全部结点
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
doSignalAll(Node first)
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
transferForSignal(Node node)
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signal唤醒单个结点
signal()
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
doSignal(Node first)
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
同步队列与条件队列的关系
上图简单体现了节点从从condition queue 转移到sync queue 上去的过程。即使是调用signalAll 时,节点也是一个一个转移过去的,因为每个节点都需要重新建立sync queue 的链接。
|