序章
金九银十,又到了程序员迁徙的季节。抱着试一试的心态去鹅城闯闯,整理整理知识脉络,连接上远程面试的软件,接通语音,“吃着火锅”,“唱着歌”,就准备开始面试。
枪在手,跟鹅走!
面试官:请自我介绍一下。
我 :扒拉扒拉一堆废话。
面试官:介绍下做的项目吧。
我 :扒拉扒拉一堆废话。(越聊越对味)
面试官:那我们问一些Java基础知识
面试官:ReentrantLock用过吗?它的公平锁是真的公平吗?AQS知道吗?底层原理能描述下吗?
我 :???…
面试官:好的,再会。
总的来说面试的这个点我是回答的不好,所以面试完之后我就主动去看了下ReentantLock的源码,花了五天的时间研究编写了这篇文章,和大家一起探讨这个知识点,也希望能对不了解这个知识点的朋友有些帮助。
简介
其实在我们的日常开发中主动去使用ReentrantLock是比较少的,但是看过源码的朋友们会知道ReentrantLock在ConcurrentHashMap中是有用到的。当然如果你经常接触到多线程编程肯定不陌生。
在Java多线程中,可以使用synchronized关键字来实现线程之间的同步互斥,但在JDK1.5中新增了ReentrantLock类也能达到同样的效果,并且在扩展功能上也更加强大,比如具有嗅探锁定、多路分支通知等功能,而且在使用上也比synchronized更加灵活。
使用ReentrantLock类
我们在读源码看原理之前首先还是得了解这个类怎么用,大家都用ReentrantLock和synchronized比,既然ReentrantLock类在功能上比synchronized更多,那么就以一个初步的程序来介绍一下ReentrantLock类的使用。熟练使用的朋友可以跳过这个章节。
使用ReentrantLock实现同步
创建一个ReentrantLockTest类,如下所示,调用ReentrantLock对象的lock()方法获取锁,调用unlock()方法释放锁。
public class ReentrantLockTest {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
ExecutorService executorService = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(10));
for (int i = 0; i < 5; i++) {
executorService.execute(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":线程开始做事");
lock.lock();
System.out.println(Thread.currentThread().getName() + ":线程正在做事");
lock.unlock();
System.out.println(Thread.currentThread().getName() + ":线程做完事了");
});
}
}
}
查看运行结果如下,从运行结果来看,当前线程开始做事,正在做事,做完事了都打印完毕了之后另外一个线程才可以继续打印。此处的unlock()方法释放了之后线程还能正常打印做完事了是因为进行了Thread.sleep(100)。
pool-1-thread-4:线程开始做事
pool-1-thread-4:线程正在做事
pool-1-thread-4:线程做完事了
pool-1-thread-1:线程开始做事
pool-1-thread-1:线程正在做事
pool-1-thread-1:线程做完事了
pool-1-thread-5:线程开始做事
pool-1-thread-5:线程正在做事
pool-1-thread-5:线程做完事了
pool-1-thread-2:线程开始做事
pool-1-thread-2:线程正在做事
pool-1-thread-2:线程做完事了
pool-1-thread-3:线程开始做事
pool-1-thread-3:线程正在做事
pool-1-thread-3:线程做完事了
使用Condition实现等待/通知
介绍Condition的时候先看下Object中的wait()方法,从注释中可以清楚得知关键字synchronized与wait()、notify()、notifyAll()等方法结合可以实现等待/通知模式,类ReentrantLock也可以实现相同的功能,但需要借助于Condition对象。
Condition类是在JDK1.5中出现的技术,在一个Lock对象里面可以创建多个Condition实例,线程对象可以注册在指定的Condition中,从而可以有选择性的进行线程通知,在调度线程上更加灵活。
在使用notify()/notifyAll()方法进行通知时,被通知的线程是由JVM随机选择的,线程进行通知的时候没有选择权。
接下来就用一个简单的例子演示一下Condition的等待/通知
public class ReentrantLockTest {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void await(){
lock.lock();
try {
System.out.println("等待开始时间"+System.currentTimeMillis());
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void signal(){
lock.lock();
System.out.println("等待结束时间"+System.currentTimeMillis());
condition.signal();
lock.unlock();
}
public static void main(String[] args) throws InterruptedException {
ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
new Thread(()->{
reentrantLockTest.await();
System.out.println("逻辑执行时间"+System.currentTimeMillis());
},"A").start();
Thread.sleep(2000);
reentrantLockTest.signal();
}
}
ReentrantLock中还有很多其他的API没有一一举例了,本文主要讲的是加锁解锁的知识点,而这个知识点只是ReentrantLock类中的冰山一角。
实现生产者消费者模式
写一个用ReentrantLock实现的生产者和消费者的代码示例:
public class ConditionTest {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean hasValue = false;
public void set(){
try {
lock.lock();
while (hasValue==true){
condition.await();
}
System.out.println("打印***1***");
hasValue = true;
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void get(){
try {
lock.lock();
while (hasValue==false){
condition.await();
}
System.out.println("打印***2***");
hasValue = false;
condition.signal();
}catch(InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
ConditionTest conditionTest = new ConditionTest();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(20, 20,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(()->{
conditionTest.set();
});
}
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(()->{
conditionTest.get();
});
}
}
}
公平锁和非公平锁
公平锁和非公平锁:锁lock分为“公平锁”和“非公平锁”,公平锁表示线程获取锁的顺序是按照入队的顺序来分配的,FIFO先进先出。而非公平锁就是一种获取锁的抢占机制,是随机获取锁的,和公平锁不一样的就是先来不一定先得到锁,这种方式可能造成某些线程一致拿不到锁,结果自然也就不公平了。
ReentrantLock默认是非公平的,带参的构造函数传入true则为公平锁。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
源码解析
ReentrantLock类的源码解析可以分为“公平的”和“非公平”的这两部分,首先可以查看这个类中的信息。类中有一个抽象的静态内部Sync类继承了AQS(AbstractQueuedSynchronizer简称AQS),NonfairSync和FairSync就是非公平和公平的具体实现。
FairSync
lock加锁
在使用ReentrantLock类进行公平同步的时候使用的加锁方法为lock(),源码如下,lock的核心就是acquire()方法。
final void lock() {
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire获取锁
这个章节的所有方法都是基于公平版本FairSync类,首先会调用tryAcquire方法获取锁,如果获取不到才会执行addWaiter方法和acquireQueued的方法。FairSync类继承了AQS,而AQS中的int类型state属性默认值为0。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
当多个线程并发进入方法的时候,此时state为0,队列中也没有排队的前辈,进行CAS的时候只有一个线程可以成功,成功之后记录当前线程的拥有者,并返回true,剩下的线程会往else if的逻辑走,当然不满足条件,将会返回false。
如果获取到锁的线程,继续调用lock方法的时候,会再次调用次方法,当时当前的state就不为0了,如果占用锁的线程拥有者和当前线程是一致的,那么将state的值加1,更新state的最新值,返回true。也就是说state的值不仅仅是0和1。
hasQueuedPredecessors有没有排队的前辈
多个线程并发的获取锁,可能就有一些线程已经入队了,即使当前state为0,为了公平,就得先看我当前线程是不是和虚结点的后继结点中存放的线程为同一个,如果相同,那么就允许其获取锁,不同那这个线程就先获取不到锁。
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
我们理解一下什么要判断头结点的下一个结点,第一个结点存储的数据是什么?
双向链表中,第一个结点我们称为虚结点,其实并不存储任何信息,知识占位,真正的第一个有数据的结点是从第二个结点开始的,当h!=t时:
a.如果(s = h.next) == null,等待队列正在有线程进行初始化,但只是进行到了tail指向head,下方的代码块是在enq方法中,也就是队列初始化的地方,一开始假定队列为空,那么线程并发的时候会创建head结点,也就是虚结点,此时 tail为null,此时队列中有元素,需要返回true。
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
b.如果(s = h.next) != null,说明此时等待队列中至少有一个有效结点,如果此时s.thread == Thread.currentThread(),说明等待队列的第一个有效结点中的线程与当前线程相同,那么当前线程是可以获取资源的;如果s.thread != Thread.currentThread()说明等待队列的第一个有效结点线程与当前线程不同,当前线程必须加入等待队列。
Node队列数据结构
我们一直说入队这个概念,得先了解它的数据结构到底长什么样,了解清楚之后才更方便理解。
先查看下Node类的源码,但是被其注释给吸引了,中间有个单向链表一样的图非常生动,这个注释属实整挺好,所以我google了一下拜读了这段注释。
注释一开始给我们介绍了CLH等待队列,这个队列是由(Craig、Landin 和 Hagersten)这三个人命名的,CLH锁通常用于自旋锁。使用CLH锁的时候相当于要保证入队的原子性,如果要出列,只需要设置head字段。
但是我们AQS中的队列并非长这样,作者沿用的是CLH这种思想,将其用于阻塞同步器,即在其结点的前驱中保存有关线程的一些控制信息,每个结点中的“state”字段跟踪线程是否应该阻塞。结点在其前驱结点唤醒时收到信号。队列中的每个结点都充当了一个特定的通知式监视器,并持有一个等待的线程。我看大牛们称这种队列为CLH变体队列,如下图所示:
addWaiter入队、排队
接下来就开始分析入队的源码,同时验证一下数据结构是否和上图CLH变体队列一致。
我们假定有三个线程,当第一个线程thread1抢到锁之后一直持有锁,thread2和thread3并发去执行入队操作。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
先假定thread2和thread3没有竞争,是按照顺序来进行入队。
thread2生成node结点,thread=thread2,nextWaiter=null,waitStatus=0。
此时AQS中的属性tail和head都是为null,所以pred为null,直接进入enq(node)方法。
因为tail为null,所以Node t指向null,进入分支if(t==null)中进行CAS,比较并交换。
compareAndSetHead(new Node()),当head为null的时候将head更新为new Node()假定名称为X。
最后tail=head;tail和head指向同一个对象X的地址。
但是thread2线程没有因此结束哈,这个for循环的终止条件return t并未触发,所以继续第二次循环。
此时的tail并非为null,而是指向X,X的thread=null,nextWaiter=null,waitStatus=0。
进入else中的逻辑,node.prev指向t,然后CAS当tail的值等于t时,tail更新指向到node结点。
执行t.next = nede,形成双向链表,整个过程如下图所示,到此thread2的入队执行完成。
thread3进入addWaiter方法或者是同样进入enq(node)方法都是一样的逻辑,因为此时tail不为null。
thread3生成node结点,thread=thread3,nextWaiter=null,waitStatus=0。
此时的pred指向tail,thread=thread2,nextWaiter=null。
进入if(pred !=null)的逻辑,将node(thread=thread3)的prev指向pred也就是当前的tail内存地址。
CAS更新tail的指向为当前thread=thread3的node内存地址。
将pred指向的node结点的内存地址的next指向更新后的tail内存地址。
正常的入队已经理清楚了,那看下并发情况这套代码执行的逻辑。
1、并发情况下会不会有并发安全问题,如果没有那道格利是怎么做到的?
我们发现不管是enq(node)方法还是addWaiter(Node mode)方法都是没有加锁的,那么并发的情况是它是安全的吗?首先我们看下enq方法是否存在线程安全问题。
假设当前tail为空,这个时候两个线程同时到达586行,587行只有一个线程可以返回true,也就是只有一个线程可以初始化队列,然而另外一个线程在compareAndSetHead的时候此时的期望值已不是null,返回的是false。那这个线程咋办呢,我们可以看到584行有个for(; ; ),无条件的循环,下一次进入循环的时候tail就不是null也,就会执行590行后的逻辑,道格利只使用了CAS就做到了此处的线程安全。
假设当前tail不为空的时候,两个线程同时到达590,生成的node1和node2的prev都指向t,但是也只有一个能够执行CAS返回true,另外一个线程无法更新tail的值,只有返回false,进入下一次循环。这么看来enq(node)方法仅仅通过两个CAS方法就实现了线程安全。
addWaiter方法就不累赘的讲解了,特殊的是当CAS返回false的时候addWaiter方法中的逻辑会再次进入enq(node)方法。
2、排队真的是公平的吗?
排队的核心逻辑就是上图enq(node)方法的591行-593行代码。假设100个线程停到了591行进行CAS,有一个成功了,另外99个没抢到进入下一次循环,代码中并不是直接按照时间来排队进入,也是去抢,所以也不见得是完全公平。这世界上或许有完全的公平吧~
acquireQueued入队之后做了什么
再次回来acquire(int arg)方法,这个方法中还有一个acquireQueued(final Node node, int arg)方法,线程加入了同步队列之后做了什么就在这个方法中体现。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
①final Node p = node.predecessor();①
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (③shouldParkAfterFailedAcquire(p, node)③ &&
②parkAndCheckInterrupt())②
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
上面的代码用①、②、③标注了特殊的地方,需要特别关注。
还是用三个线程,第一个线程抢到锁占有后长期持有,第二个线程和第三个线程入队之后进入次方法。
①获取当前线程所在结点的前驱结点。
第三个线程进入的时候,发现它的前驱结点和head并不相等,因为它前面还有线程二放入的结点。
第三个线程进入shouldParkAfterFailedAcquire方法,将前驱结点的默认的waitStatus用CAS改为-1,如果改成功返回true。如果不成功则进入下一次循环。
当CAS修改成功之后会进入②标记的代码行,执行LockSupport.park(this),代码阻塞在这等待唤醒再继续运行后续逻辑。
第二个线程进入的时候发现第一个结点与head结点相等,就进入获取锁的方法,如果不能用CAS将state=0改为1的话会进入③,③中也会修改head结点的waitStatus为-1,如果第二个线程第一次就将state修改成功的话,此时head的waitStatus还是等于0。
拿到锁之后会将当前结点设置为head,把之前的head与整个链表解绑。
finally中的方法,只有抛出异常和return的时候会执行。
②阻塞当前线程
public static void main(String[] args) {
List<Thread> threads = new ArrayList<>();
new Thread(() -> {
System.out.println("开始锁定");
threads.add(Thread.currentThread());
LockSupport.park();
System.out.println("逻辑开始");
}).start();
new Thread(() -> {
LockSupport.unpark(threads.get(0));
threads.remove(threads.get(0));
System.out.println("解锁成功");
}).start();
}
上面用了一个简单的示例来看LockSupport.park()的效果,执行park的时候相当于开车停在红灯前,unpark相当于绿灯放行。当不执行unpark的时候线程会阻塞不继续执行,直到unpark方法将这个线程放行为止。
③shouldParkAfterFailedAcquire获取失败后就应该停住
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
以上源码分析完之后的疑惑
- 我们分析完了整个加锁的过程,其中修改waitStatus的值是做什么的我们并不清楚。
- 何时执行unpark我们也不知道。
我们带着这两个问题进入到解锁的源码分析,看是否能找到答案。
unlock解锁
有加锁就必然有解锁,不然一个线程一直持有锁,第二个线程就一直在尝试加锁,后面的线程都在阻塞,所以猜测unpark肯定是在解锁的过程中执行的。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease尝试释放锁
获取当前state变量的值进行减1操作,如果当前线程和持有锁的线程不相同则抛出异常。
如果c等于0,那么就没有重入,将释放锁,将占用锁的线程变量exclusiveOwnerThread设置为null并返回true。
如果c不等于0,减1之后的值设置为state变量并返回true。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor唤醒接班人(下一个可以需要唤醒的结点)
如果当前head结点不为null,并且waitStatus不为0时就开始唤醒接班人了。head结点为空的话确实不需要进行唤醒node结点了,也知道了waitStatus变量的作用就是一个标记,用来标识是否能唤醒接班人,waitStatus在每获取到锁入队之后被修改为-1,如果异常了将被设置为1。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
首先获取waitStatus的值,如果是-1就进入ws<0的逻辑,CAS将waitStatus变量由-1更新为0。
变量s为head的next结点,如果s不为空,那么s结点获取锁的线程将被唤醒,从哪里阻塞就是哪里开始运行。
如果s结点的next结点为空,或者waitStatus变量的值大于0。(什么时候waitStatus大于0呢?CANCELLED变量值)
for循环中就是跳过了取消的线程,从尾部开始往前遍历,找到离head最近的一个结点的waitStatus值小于或等于0的结点进行替换。如果最近的这个结点不为空就唤醒其获取锁的线程,开始自旋获取锁。
cancelAcquire取消正在进行的尝试获取
我们从Node类中的属性中可以了解到waitStatus的值就几个,如下图所示,可以看到CANCELLED的注释就是线程已取消的waitStatus值,只有这个是大于1的。
Node节点的状态有如下五种:
状态 | 解释 | 值 |
---|
CANCELLED | 当前节点已经取消等待(超时或中断),该状态下的节点不会进入其他状态,也不会再阻塞。 | 1 | SIGNAL | 后继结点已经或即将阻塞,当前节点需要唤醒后继结点。后继结点获取锁的时候,必须先收到SIGNAL,才能调用tryAcquire(如果再次失败就会重新阻塞) | -1 | CONDITION | 该节点正处于Condition队列中,因为等待condition.signalAll()而阻塞 | -2 | PROPAGATE | 仅用于共享模式下、释放锁时的队列头节点,用于向整个队列传播锁释放信号 | -3 | 0 | 一般是新建的节点 | 0 |
注意取消获取锁的时机除了抛出异常之外,还有一个地方就是tryLock超时的时候。
下面是cancelAcquire的源码
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node;
}
}
根据代码分析,我们选择一种情况进行分析,如下图所示,B结点因为异常导致waitStatus被设置为1,当前结点为node结点,需要cancelAcquire(node)。
1、将node.thread设置为null
2、将node的前驱结点赋值给pred
3、此时pred指向B,B的waitStatus=1是大于0的
4、将pred指向pred.prev结点A,并且将node.prev指向pred,实际上就是node.prev指向A
5、pred.next是B,所以predNext=B
6、将node的waitStatus设置为1
7、当前node==tail为true,所以CAS更新tail的值,当前结点等于node时将tail的值指向pred的值,指向A
8、CAS当pred的next属性的值为B时,将其替换为null,也就是A的next指向null
上面的流程图还没有走完全流程,因为当node结点不是tail的时候分支还没有走,所以我们再次用图来画一下执行流程。
1、node结点不为空,将thread设置为null的意思是设置该节点不关联任何线程,就是所谓的虚结点
2、node.prev结点为A,生成pred结点并指向A
3、A的waitStatus不大于0跳过while循环
4、生成predNext结点指向pred.next,也就是node结点
5、将node结点的waitStatus赋值为1
6、node!=tail,进入else逻辑
7、定义int变量ws,用来存放pred结点的waitStatus的值
8、当前pred不等于head结点,ws的值是-1,当然如果A的值是0或者1进入后面的判断
9,ws是不是小于等于0,前面while(pred.waitStatus>0)判断注定了这里不会大于0,那么ws<=0是必然通过的
10、然后CAS不管是小于0还是等于0,将pred的waitStatus设置为-1,并且pred.thread!=null,进入if中的逻辑
11、生成next变量指向node.next也就是B
12、B不为空,并且B的waitStatus为0,那么pred的next变量等于predNext就将pred.next指向next结点B
13、if条件中任意条件不满足,则直接unparkSuccessor方法
公平锁小结
我们对于CANCELLED节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对Next指针进行了操作,而没有对Prev指针进行操作呢?什么情况下会对Prev指针进行操作?
(1)执行cancelAcquire的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过Try代码块中的shouldParkAfterFailedAcquire方法了),如果此时修改Prev指针,有可能会导致Prev指向另一个已经移除队列的Node,因此这块变化Prev指针不安全。
(2)shouldParkAfterFailedAcquire方法中,会执行下面的代码,其实就是在处理Prev指针。shouldParkAfterFailedAcquire是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更Prev指针比较安全。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
公平锁流程图
我们探究了公平锁的部分源码,但是知识点和结构还比较散乱,所以拉通一个整体流程来看下理解的内容。红色的方块是AQS中的代码,灰色的方块为FairSync中的代码。
下面是lock的流程图
下面是unlock的流程图
NonFairSync
lock加锁
我们已经详细的解析了加锁过程中的基本流程,接下来再对解锁的基本流程进行分析。由于ReentrantLock在解锁的时候,并不区分公平和非公平锁,所以我们重点关注加锁的流程。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
非公平锁加锁的时候,多线程并发的情况下直接CAS进行共享变量state的修改,只有一个线程可以成功,并且记录占用线程。
acquire获取锁
那么其他的线程就会进入到acquire方法,你会发现这个不是和公平锁的获取方法一样一样的嘛。不要慌,毕竟FairSync和NonFairSync是两个不同的实现,所以我们还是继续往代码里面查看。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire尝试获取锁/nonfairTryAcquire
接下来准备查看非公平锁的获取锁代码tryAcquire,发现它的实现是在类Sync中的nonfairTryAcquire方法,源码如下。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
此方法其实和公平锁中的tryAcquire相似度极高,眼尖的朋友们会发现if (compareAndSetState(0, acquires))这里有点不同,公平锁会先看有没有排队的前辈,而非公平锁去掉了这个校验。
方法获取到当前的线程,如果当前的state等于0,那么就CAS,将state的值改为1,如果线程修改成功,那么设置占用线程,返回true;如果没有改成功的线程返回false。
如果当C大于0的时候,会进入else-if逻辑,只有重入的时候才会进入此逻辑内。判断当前线程和占用锁的线程是否一致,如果一致那就是重入锁,更新state的值返回true。
非公平锁小结
后续的代码其实和FairSync的一致,我们看到公平锁和非公平锁的区别只是在于有没有排队的前辈这个逻辑,非公平锁在加锁的时候会有两个的直接抢占修改state的时机,虽然也有入队和出队的操作,队列中的排队线程还是有先后顺序,但是新加入的线程并不管队列中的线程排多久了,新线程执行lock方法的时候是直接参与竞争,这种方式就会导致某些线程可能一直获取不到锁,自然就是不公平的。
总结
我们日常开发中使用并发的场景是比较多的,但是对并发内部的基本框架原理了解的朋友却不多。本文仅仅介绍了可重入锁ReentrantLock加解锁的部分原理和AQS原理,希望能够成为大家了解AQS和ReentrantLock等同步器的“敲门砖”。
|