手把手讲解AQS源码
一、概述
? 本文将会通过ReentrantLock 为例,带大家看一下AQS的源码,其实并不难,下面是一个公平锁的小案例,大家可以自己跑一下感受一下。下面将会带大家一点一点阅读源码,认真看下来你就会发现其实并不难。
public class TestAQS {
static ReentrantLock reentrantLock = new ReentrantLock(true);
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(new Target(i)).start();
}
}
static class Target implements Runnable {
private int num;
public Target(int num) {
this.num = num;
}
@Override
public void run() {
for (int i = 0; i < 2; i++) {
reentrantLock.lock();
try {
System.out.println("任务" + this.num + "执行了");
} finally {
reentrantLock.unlock();
}
}
}
}
}
二、源码部分
ReentrantLock
按住Ctrl+鼠标左键 ,点击lock() 方法,就会进入到方法内部
public void lock() {
sync.lock();
}
到这里,你会发现用的是sync的lock()方法,按住Ctrl点击sync就会发现它其实就是一个抽象静态内部类
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
}
}
通过英文注释(看不懂的可以翻译看一下),可以看出,ReentrantLock是基于Sync类来实现公平/非公平锁,使用AQS 中的state 属性去表示加锁的次数,其实我们常说的AQS其实就是AbstractQueuedSynchronizer ,继续点lock() 方法往下走,选择FairSync ,可以看到下面的源码
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
}
进入到acquire(1) 中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里解释一下if中的三个方法:
-
tryAcquire() 顾名思义,就是会尝试获取一次锁。公平/不公平锁的逻辑会有一点点的不同 protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
总结一下上面的方法其实很简单,在【没人使用并且当前线程成功独占】或者【正在独占的线程是自己】的时候才会返回true,说得通俗一点就是试一下看能不能独占 -
addWaiter() 翻译过来就是添加等待者,其实说白了,就是上面尝试获取锁失败后会加入到等待队列 中 private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
总结一下,说得通俗一点就是排队 -
acquireQueued() 在这里面做CAS自旋,会不断获取锁,失败后会阻塞当前线程 final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
?
AbstractQueuedSynchronizer
组成部分
? 1、等待队列(CLH队列) ,其实本质上是双向链表
? 2、state 变量,保存同步状态
? 3、head 和 tail 指针,用于保存等待队列的头尾
? 4、内部类Node ,通过两个指针,之前前驱节点和后继节点
? 5、操作队列的方法和一系列的CAS native方法
三、总结
? 个人理解,AQS框架抽取了很多同步策略的特性,比如信号量、互斥量、各种锁中都可能出现的有的线程需要等待的情况,为此,抽取出一个阻塞队列(CLH)用于保存阻塞的线程并用于之后唤醒。不同的同步策略所允许的同时运行的线程的数量不一样,为此,抽取出一个state变量。之后就是一系列的CAS方法来操作阻塞队列,并且底层都是C语言实现的原子操作。
|