一、是什么
抽象的队列同步器,是用来构建锁或者其他同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类型的变量来表示持有锁的状态。
官方说法:
二、与AQS相关联的知识
1、位置
2、ReentrantLock
3、CountDownLatch
4、ReentrantReadWriteLock
5、Semaphore
?
?6、锁与同步器的关系
1、锁:面向锁的使用者,定义了程序员和锁交互的使用API,隐藏了实现细节,调用即可。
2、同步器:面向锁的实现者。
三、能干什么
1、AQS主要处理的事情
?2、处理原理
AQS使用一个volatile的int类型的成员变量来表示同步状态。通过内置的FIFO队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值得修改。? ?
? ??
3、内部体系架构
?1.AQS的int变量
? ? ? ? AQS的同步状态State成员变量。默认为0,表示没有线程占用,大于0表示有线程占用
2.AQS的CLH队列?
?3.Node节点
- Node的int变量:Node的等待状态waitState成员变量,队列中等候线程的状态,每个线程就是一个Node节点
- 内部结构?
4. 总体结构?
?四、源码解读
一、从ReentrantLock开始解读AQS
一、Lock接口的实现类,,基本都是通过聚合一个队列同步器多的子类完成线程访问控制的。
二、ReentrantLock的原理
?三、公平锁和非公平锁源码之间的差异
?四、ReentrantLock源码加锁方法
?1.非公平锁源码
2.acquire()方法:公平锁和非公平锁都有这个方法
?3.tryAcquire()方法:公平锁和非公平锁都有,会被acquire方法调用
?
?4.之后执行addWaiter(Node.EXCLUSIVE), arg)这个方法。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//当前线程
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);//第一次入队
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))//创建一个新空节点,哨兵节点
tail = head;
} else {//第二次才走下面逻辑
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
5.最后执行acquireQueued()方法
?shouldParkAfterFailedAcquire方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前驱节点的状态
int ws = pred.waitStatus;
//如果是下面的状态,即等待被占用资源释放,直接返回true,准备继续调用
//parkAndCheckInterrupt()
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//说明是cancelled状态,即取消状态
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
循环判断前驱节点的前驱节点是否也为取消状态,忽略该状态的节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
将当前节点的前置节点设置为SIGNAL状态,用于后续唤醒操作
程序第一次执行到这返回false,还会进行第二次循环
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
?parkAndCheckInterrupt方法
//如果是上一个方法的前驱节点等于-1,就会返回true,然后就会执行这个方法,使线程进入挂起
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
?五、ReentrantLock源码解锁方法
1.unlock()方法
public void unlock() {
sync.release(1);
}
2.tryRelease(arg)方法
public final boolean release(int arg) {
//传入参数为1
if (tryRelease(arg)) {
Node h = head;
//这个是上一个节点,一般是哨兵节点,所以waitStatus 是-1,除了最后一个是0,前面的都是-1
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//state状态位1,主要是有线程占据了位置,releases也为1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//到了这一步,
boolean free = false;
if (c == 0) {
free = true;
//将占用线程置为null,也就是没有线程
setExclusiveOwnerThread(null);
}
//将状态位置为0
setState(c);
return free;
}
3.unparkSuccessor(Node node)方法
private void unparkSuccessor(Node node) {
//入参是哨兵节点,waitStatus为-1
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//这个是下一个节点
Node s = node.next;
//B节点不为null
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//进入这里,解除park状态
if (s != null)
LockSupport.unpark(s.thread);
}
这个时候后面的线程还在这里转着
?
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();//返回false
}
之后还进行acquireQueued(final Node node, int arg)方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//node节点是B,获取上一个节点,也就是哨兵节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//设置B节点为哨兵节点,里面讲B节点指向哨兵节点的指针置为null
setHead(node);
//哨兵节点的next指针置为null,垃圾回收器回收掉之前的哨兵节点
p.next = null; // help GC
failed = false;//没有走finall哪里的逻辑
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//这个方法执行完继续往下执行,继续循环
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//这里的入参是1,在前面图片可以看到
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//state这个时候已经是0了
int c = getState();
if (c == 0) {
// B线程进去占用,也就是将state置为1
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
|