AbstractQueuedSynchronizer(AQS)是一个用于构建锁和同步器的框架,很多同步器都可以通过AQS很容易并且高效的构建出来,包括ReentrantLock、CountDownLatch、Semaphore、FutureTask等。
AQS主要由两部分组成:同步队列(一个FIFO双向队列)和一个int成员变量(state)。state表示同步状态,FIFO队列用来完成资源获取线程的排队工作,同步队列中的节点(Node)用来保存获取同步状态失败的线程的引用、等待状态、前驱和后继节点。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 头节点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 同步状态
private volatile int state;
// 队列Node节点
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
}
?基本结构如下:
如何实现资源的获取和释放
在基于AQS构建的同步器类中, 最基本的操作包括各种形式的获取操作和释放操作。获取操作是一种依赖状态的操作,并且通常会阻塞。当使用锁或者信号量时,获取操作的含义非常直观,即获取的是锁,并且调用者可能会一直等待知道同步器类处于可被获取的状态。在使用CountDownLatch时,"获取操作"意味着"等待并直到闭锁到达结束状态",而在使用FutureTask时,则意味着"等待并直到任务已经完成"。"释放"并不是一个可阻塞的操作,当执行"释放"操作时,所有在请求时被阻塞的线程都会开始执行。
如果一个类想成为状态依赖的类,那么它必须拥有一些状态。AQS负责管理了一个整数状态(变量state)信息,可以通过getState,setState以及compareAndSetState等方法进行操作。这个整数可以用于表示任意状态。例如,ReentrantLock用它来表示所有者线程已经重复获取该锁的次数,即可重入锁的实现方式,Semaphore用它来表示剩余的许可数量,FutureTask用它来表示任务的状态(尚未开始、正在运行、已完成以及已取消)。
AQS提供了很多方法,AQS是一个抽象类,就会有方法需要子类去重写。AQS中需要子类重写的方法如下:?
方法 | 作用 | protected boolean tryAcquire(int arg) | 尝试获取独占式锁 | protected boolean tryRelease(int arg) | 尝试释放独占式锁 | protected int tryAcquireShared(int arg) | 尝试获取共享式锁 | protected boolean tryReleaseShared(int arg) | 尝试释放共享式锁 | protected boolean isHeldExclusively() | 当前线程是否独占式占有锁 |
除了需要子类重写的方法外,获取和释放锁都会尝试修改同步状态state的值,在AQS中提供了三个和同步状态相关的方法,如下
方法 | 作用 | int getState() | 获取同步状态state的值 | void setState(int newState) | 修改同步状态 | boolean compareAndSetState(int expect, int update) | 通过CAS的方式取修改同步状态 |
AQS中获取操作和释放操作的标准形式如下 :
boolean acquire() throws InterruptedException {
while (当前状态不允许获取操作) {
if (需要阻塞获取请求) {
如果当前线程不在队列中,则将其插入队列
阻塞当前线程
} else {
返回失败
}
}
可能更新同步器的状态
如果线程位于队列中,则将其移出队列
返回成功
}
void release() {
更新同步器状态
if (新的状态允许某个被阻塞的线程获取成功) {
解除队列中一个或多个线程的阻塞状态
}
}
根据同步器的不同,获取操作可是独占式的(例如ReentrantLock),也可以是非独占式的(例如Semaphore和CountDownLatch)。一个获取操作包括两部分。首先,同步器判断当前状态是否允许获得操作,如果是,则允许线程执行,否则获取操作将被阻塞或者失败。这种判断是由同步器的语义决定的,例如,对于锁来说,如果它没有被某个线程持有,那么就能被成功地获取,而对于闭锁来说,如果它处于结束状态,那么也能被成功地获取。
其次,就是更新同步器的状态,获取同步器的某个线程可能会对其他线程能否也获取到同步器造成影响。例如,当获取一个锁后,锁的状态将从"未被持有"变成"已被持有",而从Semaphore中获取一个许可后,将剩余的许可数量减1。然而,当一个线程获取闭锁时,并不会影响其他线程能否获取它,因此获取闭锁的操作不会改变闭锁的状态。
获取示例图如下:?
?节点在加入同步队列后,在同步队列中所有的节点都处于自旋状态,但是只有前驱节点是头节点才能尝试获取同步状态。一是因为头节点是已经获取到同步状态的节点,当头节点的线程释放同步状态之后,将会唤醒后继节点,后继节点被换唤醒后需要检查自己的前驱节点是否是头节点。而是因为这样处理可以维护同步队列的FIFO原则。
如何使用AQS实现自己的锁
public class MyLock implements Lock {
private Helper helper = new Helper();
private class Helper extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
// 1. 如果第一个线程进来,可以拿到锁,我们直接返回true
// 2. 如果第二个线程进来,则拿不到锁,返回false。如果当前进来的线程和当前保存的线程是同一个线程,则可以拿到锁,但是有代价,要更新状态值,即可重入锁
// 3. 如何判断是第一个线程进来还是其他线程进来
int state = getState();
Thread t = Thread.currentThread();
if (state == 0) {
if (compareAndSetState(0, arg)) {
setExclusiveOwnerThread(t);
return true;
}
} else if (getExclusiveOwnerThread() == t) {
setState(state + 1);
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
// 锁的获取和释放肯定是一一对应的,那么调用此方法的线程一定是当前线程
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new RuntimeException();
}
int state = getState() - arg;
boolean flag = false;
if (state == 0) {
setExclusiveOwnerThread(null);
flag = true;
}
setState(state);
return flag;
}
Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
helper.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
helper.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return helper.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return helper.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
helper.release(1);
}
@Override
public Condition newCondition() {
return helper.newCondition();
}
}
|