1.介绍
AQS 全名AbstractQueueSynchronizer 抽象同步队列,是java中锁实现的一个抽象基类,大部分需要同步控制的工具类都继承了AQS。AQS主要是为了实现进程控制,作用类似于Synchronized ,不过Synchronized 是 c++ 实现的,而AQS是用java 来实现的。
1.1 特点:
- 1.用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取
* getState —— 获取 state 状态 * setState —— 设置 state 状态 * compareAndSetState - cas 机制设置 state 状态 * 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源 - 2.提供了一个局域FIFO的等待队列,类似于Monitor的EntryList
-
- 提供了条件变量等待和唤醒机制,支持多个条件变量,类似于Monitor 的 waitSet
1.2 实现的功能
- 阻塞版本的加锁acquire和非阻塞的加锁tryAcquire
- 超时获取锁
- 提供打断机制,来打断尝试加锁等待的线程
- 独占锁和共享锁
- 条件不满足时的等待机制,并且支持多条件
1.3 设置
-
state 设计
- state 使用volatile 配合cas 保证修改时的原子性
- state 使用的时32位的int类型
-
阻塞和恢复
- 早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume 那么 suspend 将感知不到解决方法是使用 park & unpark 来实现线程的暂停和恢复,先 unpark 再 park 也没问题
- park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
- park 线程还可以通过 interrupt 打断
-
队列设计
- 使用的是FIFO双向队列
- 在删除和添加节点时,使用的是 自旋的方式 + 无锁,性能比加锁更高
2. 原理
2.1 内部类
 在AQS中只有两个内部类Node 和 Condition, 这两个类的作用类似于synchronzied 中的 锁等待队列和条件等待队列
分析一些关键的属性
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
上面是每个线程的state 可取的值
* 0 : 表示未加锁状态
* 1 : 表示加锁状态
* 2 : 条件等待
* -1 : 锁等待队列中,需要唤醒下一个等待的队列
节点等待的状态
volatile int waitStatus;
锁等待队列的前一个节点
volatile Node prev;
指向锁等待队列的下个Node
volatile Node next;
Node 绑定的线程
volatile Thread thread;
下一个条件等待队列
Node nextWaiter;
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
- 锁等待队列使用的是一个双向链表
- 条件等待队列使用的是一个单链表 + 首位指针
- state 用于表示资源的状态,这里重点注意在不同的锁的实现类中的该state 变量的设置一般是不同的
- ConditionObject —— 条件等待链表
 ConditionObject 的是是实现比较简单,就定义了首尾指针来挂起条件等待的线程
AQS的设计还是比较简单的,下面自己基于AQS实现一个简单的锁:
3. 自定义锁实现
public class TestAqs {
public static void main(String[] args) {
测试
MyLock myLock = new MyLock();
new Thread(()->{
try {
myLock.lock();
System.out.println("111");
Thread.sleep(2000);
myLock.lock();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("1111释放");
myLock.unlock();
}
}).start();
new Thread(()->{
try {
myLock.lock();
System.out.println("222");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("2222释放");
myLock.unlock();
}
}).start();
}
自定义锁
static class MyLock implements Lock{
class MySync extends AbstractQueuedSynchronizer{
尝试加锁,无论加锁成功失败都会返回
@Override
protected boolean tryAcquire(int arg) {
AQS 内部使用的时cas 的方式来修改state 的值
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
尝试释放锁
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
private MySync mySync = new MySync();
@Override
public void lock() {
mySync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
mySync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return mySync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return mySync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
mySync.release(0);
}
@Override
public Condition newCondition() {
return null;
}
}
}
可以看到在基于AQS和llock的基础上实现一个简单的锁是非常容易的,就是在AQS的基础上自己提供一些操作加锁和解锁的逻辑就可以了。其次注意我这里实现的是独占锁。使用过ReentrantLock 的朋友应该知道,该锁是一个可重入的锁,那么它又是怎么实现的呢??下面看一下ReentantLock 实现原理
4.ReentrantLock
4.1 内部类
 包含三个内部类,sync 就是AQS锁,另外两个是锁的公平和非公平实现
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0);
}
}
直接搬源码,这里提供了基本的加锁和解锁方法模板,但是具体的实现交给了不同的子类
- NonfairSync 非公平锁
- FairSync 公平锁
4. 2非公平锁和公平锁的两处不同:
-
非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。 -
非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。 公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。
相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。
4.3 非公平锁加锁流程
 调用非公平同步队列的lock() 加锁  首先使用cas 尝试加锁,就是将state 的值从0 设置到 1
-
加锁成功
- 设置锁对象的owner 位当前线程
 -
加锁失败 —— 表示加锁产生竞争
进入acquire() 
- 再次尝试加锁(第二次)
 1. 首先判断当前锁对象的state,如果为0,就尝试加锁,加锁失败就直接返回失败 2. 判断当前线程是不是锁对象的持有者,如果是,那么设置state 的值为 state + 1(这里就是ReentrantLock 可重入锁的原理)
如果加锁失败,方法继续执行  addWaiter() 就是创建一个新的节点和一个哨兵节点,并将新节点设置到锁等待链表的表尾 
acquireQueued () 将新创建的节点加入到锁等待队列中  过程: 1. 拿到当前节点的前驱节点,如果前驱节点是head,就表示当前节点是第一个等待的线程节点,那么就再尝试加锁(第三次) 2. 执行shouldParkAfterFailedAcquire(p, node),设置前驱节点的state 为 -1,(这里默认新建节点的state值为0) 主要是根据前去节点的这个状态来唤醒自己,就是是否需要park 是由前驱节点的waitStatus == Node.SIGNAL 来决定的 3. 执行打断当前线程
当出现多个线程竞争后执行图如下: 
小结: 加锁过程概述
- 前后一共尝试了三次 cas 加锁,前两次失败后就会将创建一个新的Node 绑定当前线程,并设置到链表末尾,接着如果当前这个新建的是链表中处理哨兵节点的第一个节点,那么还会再一次的尝试加锁。就进入打断代码;
- 设置当前新建节点的前一个节点的status 值为 SINGAL, 这个值是用于唤醒下一个节点的,最后调用Unsafe.park() 。
4.4 解锁流程
 
-
尝试解锁,失败直接放回,成功,则unPark链表中的其他节点 
- 拿到当前的锁的资源状态state的值- release. c == 0 表示当前锁的重入状态解除完了,所以直接释放锁,设置当前锁的持有者为null。
- c 大于0 则表示 当前线程对该锁重入不为0,那么设置新的state值,然后返回解锁失败
唤醒链接中后继节点 
private void unparkSuccessor(Node node) {
参数为head节点
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
拿到链表节点中从前向后的第一个 不为 空或者cancelled 节点
在锁等待队列中 状态值小于0 的只有 - 1 ,这里是从后向前遍历找到第第一个 状态值未-1的节点来唤醒
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 设置当前线程节点的status 值0, 表示未加锁状态,
- 找到链表中第一个状态值为 SINGAL 来唤醒 —— 线程唤醒的依据是根据链表前一个节点的status 的值来判断的
小结: 解锁流程:
- 尝试解锁,失败则返回,这里设计重入原理后面介绍。
- 成功,则准备唤醒链表中的下一个节点,这里唤醒的是处理哨兵节点的下一个不为null,或者status 值为 cancelled 的节点
4.5 公平锁
于非公平锁的实现的关键不同点是tryAcquire()方法
这是非公平锁 
这是公平锁  主要的区别就是这个方法,这在当前线程尝试加锁之前会判断锁等待链表中是否有其他线程在等待,如果没有才会尝试加锁。
小结:
- 非公平锁,当有新的线程来时,会直接尝试加锁
- 公平锁,当有新线程需要加锁是,会判断等待链表中是否有等待的线程,如果没有才会尝试加锁。
4.6 可重入原理
 加锁时,如果依然时当前线程对同一个锁进行加锁操作,那么这里就是在原来的status 的值上加1, 然后设置status 值即可。在synchronized 中 可重入的实现是在栈帧中创建一个新的lock Record 对象,设置owner 为null表示可重入。
 释放锁时,如果减1后status 的值依然不为0 那么就表示当前线程锁重入了,那么释放锁就是将原来的status 的值设置为status - 1。
4.7 可打断原理
首先看不可打断的加锁
 
parkAndCheckInterrupt() 方法内部就一个park的过程,如果在park的过程中,park标记设置为true 了,那么这个park 就会失效,然后Thread.Interrupted 会清除打断标记,然后返回,这是打断标记为true,接着自旋由被阻塞。并且使用 变量 interrupted 来表示在加锁过程是否被打断过。
可打断加锁  可以看到,如果在加锁的过程打断标记被设置为了true ,那么当方法返回时直接抛出异常表示加锁被打断。
小结:
- 不可打断加锁方式,本质上还是会被打断,只不过被打断后通过一个变量interrupted 来表示打断状态,并且只有当被唤醒加锁成功,返回后才可以知道线程加速是时被打断过
- 可打断加锁,在加锁的park的时,如果被打断了,那么会直接抛出异常。
4.8 条件变量实现原理
 每一个条件变量就是一个等待队列,不同条件变量有不同的等待队列,在synchronized 中只有一个条件等待队列
等待流程:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
创建一个新的节点,并将其加入到等待队列的尾部,这是status = condition
Node node = addConditionWaiter();
清除当前线程占用的所有锁资源,包括可重入锁,唤醒下一个等待的节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
 thread0 拿到锁,然后嗲用await()进入等待队列,接着唤醒thread1  
唤醒流程
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
这里唤醒的是第一个等待队列中的节点
Node first = firstWaiter;
if (first != null)
进入唤醒锁
doSignal(first);
}
private void doSignal(Node first) {
do {
将firstWaiter 指向 first 的下一个节点,这里就会将这个待唤醒的几点从等待队列中断开
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
这里就循环尝试唤醒节点,转移到锁等待队列中,直到找到一个可用的节点来转移
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
尝试将当前concelled 的节点的状态设置0,
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
将当前节点加入到锁等链表的末尾,并返回末尾的前一个节点
Node p = enq(node);
设置前一个节点的status 为 SINGAL
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
小结:
- 条件等待,就是创建一个新的Node 节点并绑定当前线程,然后加入到条件等待队列的末尾,然后唤醒锁等待队列第一个status 不为cancelled 的结点。
- 唤醒,依次遍历条件等待队列中的结点,直到找到一个可以被转移到锁等大队列末尾的结点来唤醒。
|