一、AQS概述
AbstractQueuedSynchronizer简称AQS,它是java.util.concurrent包中,它提供了一套完整的同步编程框架。我们常用的ReentrantLock、CountDownLatch都是基于AQS实现的。 AQS的实现分为两种形式,一种是独占锁,另一种则是共享锁。
独占锁:每次只能有一个线程持有锁。我们比较熟悉的ReentrantLock就是通过独占锁实现互斥性的。 共享锁:允许多个线程同时获取锁,并发地访问资源。例如ReentrantReadWriteLock。 在这篇文章中,我们分析一下AQS的独占锁机制。
二、AQS内部实现
AQS的实现是底层底层维护了一个先进先出(FIFO)的双向队列,这个队列是基于链表实现的。如果线程竞争锁失败,那么就会进入到这个同步队列中进行等待。当获得锁的线程释放锁之后,会从队列中唤醒一个线程。 双向队列是基于Node节点实现的,当线程需要入队列的时候,会将线程的信息封装成一个Node对象,进行入队操作。 属性head就标记头节点,属性tail标记尾节点。 Node类的组成如下:
static final class Node {
//方式一:标记为共享锁(mode)
static final Node SHARED = new Node();
//方式二:标记为独占锁(mode)
static final Node EXCLUSIVE = null;
//节点从同步队列中取消
static final int CANCELLED = 1;
//后继节点的线程处于等待状态,如果当前节点释放锁,会通知后继节点
static final int SIGNAL = -1;
//当前节点处于等待队列中
static final int CONDITION = -2;
//这个用于共享锁,表示共享式同步状态的传递
static final int PROPAGATE = -3;
//在同步队列中的等待状态
volatile int waitStatus;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//加入同步队列的线程引用
volatile Thread thread;
//等待队列中的下一个节点
Node nextWaiter;
//队列中的下一个对象是否为共享式
final boolean isShared() {
return nextWaiter == SHARED;
}
//获得当前节点的前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
//添加等待者
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
//头节点
private transient volatile Node head;
//尾节点
private transient volatile Node tail;
//标记状态
private volatile int state;
三、AQS的源码分析
清楚了AQS的基本架构以后,我们来分析一下AQS的源码,仍然以ReentrantLock为模型。 入口:
public void lock() {
sync.lock();
}
// NonfairSync.lock, 以下以非公平锁作为主要分析逻辑
final void lock() {
if (compareAndSetState(0, 1)) //通过cas操作来修改state状态,表示争抢锁的操作
setExclusiveOwnerThread(Thread.currentThread());//设置当前获得锁状态的线程
else
acquire(1); //尝试去获取锁
}
- 当state=0时,表示无锁状态
- 当state>0时,表示已经有线程获得了锁,也就是state=1,但是因为ReentrantLock允许重入,所以同一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放5次直到state=0其他线程才有资格获得锁
获得独占锁的源头就是从acquire()方法开始的。这个方法中分别调用了三个方法:tryAcquire()、addWaiter()、acquireQueued()。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个方法的主要逻辑是:
- 通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false
- 如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部
- acquireQueued,将Node作为参数,通过自旋去尝试获取锁。
首先调用的是tryAcquire(arg),这个方法只抛出了一个异常,因为它的具体实现是交给子类去完成的。这个方法的主要功能是:尝试获得锁,进入临界区。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquire在NonfairSync中的实现代码如下:
ffinal boolean nonfairTryAcquire(int acquires) {
//获得当前执行的线程
final Thread current = Thread.currentThread();
int c = getState(); //获得state的值
if (c == 0) { //state=0说明当前是无锁状态
//通过cas操作来替换state的值改为1,大家想想为什么要用cas呢?
//理由是,在多线程环境中,直接修改state=1会存在线程安全问题,你猜到了吗?
if (compareAndSetState(0, acquires)) {
//保存当前获得锁的线程
setExclusiveOwnerThread(current);
return true;
}
}
//这段逻辑就很简单了。如果是同一个线程来获得锁,则直接增加重入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; //增加重入次数
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
当tryAcquire方法获取锁失败以后,则会先调用addWaiter将当前线程封装成Node,然后添加到AQS队列
private Node addWaiter(Node mode) { //mode=Node.EXCLUSIVE
//将当前线程封装成Node,并且mode为独占锁
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// tail是AQS的中表示同步队列队尾的属性,刚开始为null,所以进行enq(node)方法
Node pred = tail;
if (pred != null) { //tail不为空的情况,说明队列中存在节点数据
node.prev = pred; //讲当前线程的Node的prev节点指向tail
if (compareAndSetTail(pred, node)) {//通过cas讲node添加到AQS队列
pred.next = node;//cas成功,把旧的tail的next指针指向新的tail
return node;
}
}
enq(node); //tail=null,将node添加到同步队列中
return node;
}
通过上面的代码我们知道,如果这个同步队列是空队列或者CAS失败,那就调用enq()方法。
private Node enq(final Node node) {
//自旋
for (;;) {
Node t = tail; //如果是第一次添加到队列,那么tail=null
if (t == null) { // Must initialize
//CAS的方式创建一个空的Node作为头结点
if (compareAndSetHead(new Node()))
//此时队列中只一个头结点,所以tail也指向它
tail = head;
} else {
//进行第二次循环时,tail不为null,进入else区域。将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向Node
node.prev = t;
if (compareAndSetTail(t, node)) {
//t此时指向tail,所以可以CAS成功,将tail重新指向Node。此时t为更新前的tail的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向Node,返回头结点
t.next = node;
return t;
}
}
}
}
将添加到队列中的Node作为参数传入acquireQueued方法,这里面会做抢占锁的操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();// 获取prev节点,若为null即刻抛出NullPointException
if (p == head && tryAcquire(arg)) {// 如果前驱为head才有资格进行锁的抢夺
setHead(node); // 获取锁成功后就不需要再进行同步操作了,获取锁成功的线程作为新的head节点
//凡是head节点,head.thread与head.prev永远为null, 但是head.next不为null
p.next = null; // help GC
failed = false; //获取锁成功
return interrupted;
}
//如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())// 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志
interrupted = true;
}
} finally {
if (failed) // 如果抛出异常则取消锁的获取,进行出队(sync queue)操作
cancelAcquire(node);
}
}
- 获取当前节点的prev节点
- 如果prev节点为head节点,那么它就有资格去争抢锁,调用tryAcquire抢占锁
- 抢占锁成功以后,把获得锁的节点设置为head,并且移除原来的初始化head节点
- 如果获得锁失败,则根据waitStatus决定是否需要挂起线程 最后,通过cancelAcquire取消获得锁的操作
前面的逻辑都很好理解,主要看一下shouldParkAfterFailedAcquire这个方法和parkAndCheckInterrupt的作用
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //前继节点的状态
if (ws == Node.SIGNAL)//如果是SIGNAL状态,意味着当前线程需要被unpark唤醒
return true;
如果前节点的状态大于0,即为CANCELLED状态时,则会从前节点开始逐步循环找到一个没有被“CANCELLED”节点设置为当前节点的前节点,返回false。在下次循环执行shouldParkAfterFailedAcquire时,返回true。这个操作实际是把队列中CANCELLED的节点剔除掉。
if (ws > 0) {// 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点'的前继节点”。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport类是Java6引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:
public native void unpark(Thread jthread);
public native void park(boolean isAbsolute, long time);
unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。 permit相当于0/1的开关,默认是0,调用一次unpark就加1变成了1.调用一次park会消费permit,又会变成0。 如果再调用一次park会阻塞,因为permit已经是0了。直到permit变成1.这时调用unpark会把permit设置为1.每个线程都有一个相关的permit,permit最多只有一个,重复调用unpark不会累积
|