IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 鹅厂一面ReentrantLock差点一套带走我 -> 正文阅读

[Java知识库]鹅厂一面ReentrantLock差点一套带走我

序章

金九银十,又到了程序员迁徙的季节。抱着试一试的心态去鹅城闯闯,整理整理知识脉络,连接上远程面试的软件,接通语音,“吃着火锅”,“唱着歌”,就准备开始面试。

枪在手,跟鹅走!
在这里插入图片描述

面试官:请自我介绍一下。

我 :扒拉扒拉一堆废话。

面试官:介绍下做的项目吧。

我 :扒拉扒拉一堆废话。(越聊越对味)

面试官:那我们问一些Java基础知识

面试官:ReentrantLock用过吗?它的公平锁是真的公平吗?AQS知道吗?底层原理能描述下吗?

我 :???…

面试官:好的,再会。

在这里插入图片描述

总的来说面试的这个点我是回答的不好,所以面试完之后我就主动去看了下ReentantLock的源码,花了五天的时间研究编写了这篇文章,和大家一起探讨这个知识点,也希望能对不了解这个知识点的朋友有些帮助。

简介

其实在我们的日常开发中主动去使用ReentrantLock是比较少的,但是看过源码的朋友们会知道ReentrantLock在ConcurrentHashMap中是有用到的。当然如果你经常接触到多线程编程肯定不陌生。

在Java多线程中,可以使用synchronized关键字来实现线程之间的同步互斥,但在JDK1.5中新增了ReentrantLock类也能达到同样的效果,并且在扩展功能上也更加强大,比如具有嗅探锁定、多路分支通知等功能,而且在使用上也比synchronized更加灵活。

使用ReentrantLock类

我们在读源码看原理之前首先还是得了解这个类怎么用,大家都用ReentrantLock和synchronized比,既然ReentrantLock类在功能上比synchronized更多,那么就以一个初步的程序来介绍一下ReentrantLock类的使用。熟练使用的朋友可以跳过这个章节。

使用ReentrantLock实现同步

创建一个ReentrantLockTest类,如下所示,调用ReentrantLock对象的lock()方法获取锁,调用unlock()方法释放锁。

public class ReentrantLockTest {

    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
       // 不建议使用Executors创建线程池,从平时做起。
        ExecutorService executorService = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10));
        for (int i = 0; i < 5; i++) {
            executorService.execute(() -> {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + ":线程开始做事");
                lock.lock();
                System.out.println(Thread.currentThread().getName() + ":线程正在做事");
                lock.unlock();
                System.out.println(Thread.currentThread().getName() + ":线程做完事了");
            });
        }
    }
}

查看运行结果如下,从运行结果来看,当前线程开始做事,正在做事,做完事了都打印完毕了之后另外一个线程才可以继续打印。此处的unlock()方法释放了之后线程还能正常打印做完事了是因为进行了Thread.sleep(100)。

pool-1-thread-4:线程开始做事
pool-1-thread-4:线程正在做事
pool-1-thread-4:线程做完事了
pool-1-thread-1:线程开始做事
pool-1-thread-1:线程正在做事
pool-1-thread-1:线程做完事了
pool-1-thread-5:线程开始做事
pool-1-thread-5:线程正在做事
pool-1-thread-5:线程做完事了
pool-1-thread-2:线程开始做事
pool-1-thread-2:线程正在做事
pool-1-thread-2:线程做完事了
pool-1-thread-3:线程开始做事
pool-1-thread-3:线程正在做事
pool-1-thread-3:线程做完事了
使用Condition实现等待/通知

介绍Condition的时候先看下Object中的wait()方法,从注释中可以清楚得知关键字synchronized与wait()、notify()、notifyAll()等方法结合可以实现等待/通知模式,类ReentrantLock也可以实现相同的功能,但需要借助于Condition对象。

在这里插入图片描述

Condition类是在JDK1.5中出现的技术,在一个Lock对象里面可以创建多个Condition实例,线程对象可以注册在指定的Condition中,从而可以有选择性的进行线程通知,在调度线程上更加灵活。

在使用notify()/notifyAll()方法进行通知时,被通知的线程是由JVM随机选择的,线程进行通知的时候没有选择权。

接下来就用一个简单的例子演示一下Condition的等待/通知

public class ReentrantLockTest {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void await(){
        lock.lock();
        try {
            System.out.println("等待开始时间"+System.currentTimeMillis());
            condition.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void signal(){
        lock.lock();
        System.out.println("等待结束时间"+System.currentTimeMillis());
        condition.signal();
        lock.unlock();
    }

    public static void main(String[] args) throws InterruptedException {
        ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
        new Thread(()->{
            reentrantLockTest.await();
            System.out.println("逻辑执行时间"+System.currentTimeMillis());
        },"A").start();
        Thread.sleep(2000);
        reentrantLockTest.signal();
    }
}

ReentrantLock中还有很多其他的API没有一一举例了,本文主要讲的是加锁解锁的知识点,而这个知识点只是ReentrantLock类中的冰山一角。

实现生产者消费者模式

写一个用ReentrantLock实现的生产者和消费者的代码示例:

public class ConditionTest {
    private ReentrantLock lock =  new ReentrantLock();
    private Condition condition = lock.newCondition();

    private boolean hasValue = false;

    public void set(){
        try {
            lock.lock();
            while (hasValue==true){
                condition.await();
            }
            System.out.println("打印***1***");
            hasValue = true;
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void get(){
        try {
            lock.lock();
            while (hasValue==false){
                condition.await();
            }
            System.out.println("打印***2***");
            hasValue = false;
            condition.signal();
        }catch(InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ConditionTest conditionTest = new ConditionTest();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(20, 20,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.execute(()->{
                conditionTest.set();
            });
        }
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.execute(()->{
                conditionTest.get();
            });
        }
    }
}

在这里插入图片描述

公平锁和非公平锁

公平锁和非公平锁:锁lock分为“公平锁”和“非公平锁”,公平锁表示线程获取锁的顺序是按照入队的顺序来分配的,FIFO先进先出。而非公平锁就是一种获取锁的抢占机制,是随机获取锁的,和公平锁不一样的就是先来不一定先得到锁,这种方式可能造成某些线程一致拿不到锁,结果自然也就不公平了。

ReentrantLock默认是非公平的,带参的构造函数传入true则为公平锁。

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

源码解析

ReentrantLock类的源码解析可以分为“公平的”和“非公平”的这两部分,首先可以查看这个类中的信息。类中有一个抽象的静态内部Sync类继承了AQS(AbstractQueuedSynchronizer简称AQS),NonfairSync和FairSync就是非公平和公平的具体实现。

在这里插入图片描述

FairSync
lock加锁

在使用ReentrantLock类进行公平同步的时候使用的加锁方法为lock(),源码如下,lock的核心就是acquire()方法。

final void lock() {
    // 获取锁的方法,传入了一个int类型值为1
    acquire(1);
}

public final void acquire(int arg) {
    //假定A&&B,A为false B不会执行。
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
tryAcquire获取锁

这个章节的所有方法都是基于公平版本FairSync类,首先会调用tryAcquire方法获取锁,如果获取不到才会执行addWaiter方法和acquireQueued的方法。FairSync类继承了AQS,而AQS中的int类型state属性默认值为0。

protected final boolean tryAcquire(int acquires) {
    //获取当前的线程
    final Thread current = Thread.currentThread();
    //获取state的值
  	int c = getState();
    //c的值如果为0.说明该线程之前是没有获取到锁的
    if (c == 0) {
        // 先判断队列里面有没有排队的前辈,如果没有那么就进行CAS,如果state等于0的时候将state更新为1
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
          	// 记录当前线程的拥有者
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果c的值不为0,那么就要看是否是冲入锁
    // 判断是不是重入的条件就是占用锁的线程拥有者和当前线程是否一致
    else if (current == getExclusiveOwnerThread()) {
        // 一致,那么c的值+1
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        //更新state的值为nextc
        setState(nextc);
        return true;
    }
    return false;
}

当多个线程并发进入方法的时候,此时state为0,队列中也没有排队的前辈,进行CAS的时候只有一个线程可以成功,成功之后记录当前线程的拥有者,并返回true,剩下的线程会往else if的逻辑走,当然不满足条件,将会返回false。

如果获取到锁的线程,继续调用lock方法的时候,会再次调用次方法,当时当前的state就不为0了,如果占用锁的线程拥有者和当前线程是一致的,那么将state的值加1,更新state的最新值,返回true。也就是说state的值不仅仅是0和1。

hasQueuedPredecessors有没有排队的前辈

多个线程并发的获取锁,可能就有一些线程已经入队了,即使当前state为0,为了公平,就得先看我当前线程是不是和虚结点的后继结点中存放的线程为同一个,如果相同,那么就允许其获取锁,不同那这个线程就先获取不到锁。

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    // 生成t指向tail结点
    Node t = tail; // Read fields in reverse initialization order
    // 生成h指向head结点
    Node h = head;
    // 生成s当h!=t的时候指向h的后继结点
    // h!=t说明什么?说明队列有值。那么意思就是h==t的时候,队列没有值,直接返回false
    // 队列有排队的前辈们就看head的后继结点是不是为null,如果为null的话直接返回true,如果不为null
    // 就获取head的后继结点中的thread变量拿到此结点占用的线程信息与当前线程比较,不相同返回true,相同返回false
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

我们理解一下什么要判断头结点的下一个结点,第一个结点存储的数据是什么?

双向链表中,第一个结点我们称为虚结点,其实并不存储任何信息,知识占位,真正的第一个有数据的结点是从第二个结点开始的,当h!=t时:

a.如果(s = h.next) == null,等待队列正在有线程进行初始化,但只是进行到了tail指向head,下方的代码块是在enq方法中,也就是队列初始化的地方,一开始假定队列为空,那么线程并发的时候会创建head结点,也就是虚结点,此时 tail为null,此时队列中有元素,需要返回true。

if (t == null) { // Must initialize
    if (compareAndSetHead(new Node()))
        tail = head;
} else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
    }
}

b.如果(s = h.next) != null,说明此时等待队列中至少有一个有效结点,如果此时s.thread == Thread.currentThread(),说明等待队列的第一个有效结点中的线程与当前线程相同,那么当前线程是可以获取资源的;如果s.thread != Thread.currentThread()说明等待队列的第一个有效结点线程与当前线程不同,当前线程必须加入等待队列。

Node队列数据结构

我们一直说入队这个概念,得先了解它的数据结构到底长什么样,了解清楚之后才更方便理解。

先查看下Node类的源码,但是被其注释给吸引了,中间有个单向链表一样的图非常生动,这个注释属实整挺好,所以我google了一下拜读了这段注释。
在这里插入图片描述

注释一开始给我们介绍了CLH等待队列,这个队列是由(Craig、Landin 和 Hagersten)这三个人命名的,CLH锁通常用于自旋锁。使用CLH锁的时候相当于要保证入队的原子性,如果要出列,只需要设置head字段。

但是我们AQS中的队列并非长这样,作者沿用的是CLH这种思想,将其用于阻塞同步器,即在其结点的前驱中保存有关线程的一些控制信息,每个结点中的“state”字段跟踪线程是否应该阻塞。结点在其前驱结点唤醒时收到信号。队列中的每个结点都充当了一个特定的通知式监视器,并持有一个等待的线程。我看大牛们称这种队列为CLH变体队列,如下图所示:
在这里插入图片描述

addWaiter入队、排队

接下来就开始分析入队的源码,同时验证一下数据结构是否和上图CLH变体队列一致。

我们假定有三个线程,当第一个线程thread1抢到锁之后一直持有锁,thread2和thread3并发去执行入队操作。

private Node addWaiter(Node mode) {
    // 生成一个node结点,任何一个线程到达都会产生一个新的结点
  	// node.thread = Thread.currentThread()
    // 传入的参数Node.EXCLUSIVE=null,node.nextWaiter=null。
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
  	// 将pred指向tail
    Node pred = tail;
    // 如果pred不为空,说明队列里面有结点在排队
    if (pred != null) {
      // 把生成的node结点的prev指向tail,也就是说要入队。
      // 这里可能存在多个node结点的prev都指向当前tail,并发情况下
        node.prev = pred;
        // 执行CAS,原子操作入队。刚刚在CLH变体队列的描述中有讲过这个。
        // 如果当前tail的值等于pred,那么将tail重新指向生成的node结点。
      	// 并发情况下只有一个可以成功,剩下的指向之前tail的结点怎么处理呢?
        if (compareAndSetTail(pred, node)) {
            // 完成指向
            pred.next = node;
            return node;
        }
    }
    // 如果tail为空会直接到这个方法
    // 并发情况下,除了CAS设置tail成功的线程结点,其他的都走enq方法
    enq(node);
    return node;
}
private Node enq(final Node node) {
        // 开始自旋,结束条件在else中的return t。
        for (;;) {
            // tail为空的时候,并发情况下会用CAS初始化队列
            // tail不为空的时候,会进入else方法
            Node t = tail;
            if (t == null) { // Must initialize
                // 生成head结点,tail也指向head。此结点被称为虚结点。thread=null,waitStatus=0
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 并发的时候有多个node结点的前驱指向t(tail)
                node.prev = t;
                // CAS又一个能设置成tail成功,其他的自旋入队尾。
                if (compareAndSetTail(t, node)) {
                    // 建立指针指向。
                    t.next = node;
                    return t;
                }
            }
        }
    }

先假定thread2和thread3没有竞争,是按照顺序来进行入队。

thread2生成node结点,thread=thread2,nextWaiter=null,waitStatus=0。

此时AQS中的属性tail和head都是为null,所以pred为null,直接进入enq(node)方法。

因为tail为null,所以Node t指向null,进入分支if(t==null)中进行CAS,比较并交换。

compareAndSetHead(new Node()),当head为null的时候将head更新为new Node()假定名称为X。

最后tail=head;tail和head指向同一个对象X的地址。

但是thread2线程没有因此结束哈,这个for循环的终止条件return t并未触发,所以继续第二次循环。

此时的tail并非为null,而是指向X,X的thread=null,nextWaiter=null,waitStatus=0。

进入else中的逻辑,node.prev指向t,然后CAS当tail的值等于t时,tail更新指向到node结点。

执行t.next = nede,形成双向链表,整个过程如下图所示,到此thread2的入队执行完成。

在这里插入图片描述

thread3进入addWaiter方法或者是同样进入enq(node)方法都是一样的逻辑,因为此时tail不为null。

thread3生成node结点,thread=thread3,nextWaiter=null,waitStatus=0。

此时的pred指向tail,thread=thread2,nextWaiter=null。

进入if(pred !=null)的逻辑,将node(thread=thread3)的prev指向pred也就是当前的tail内存地址。

CAS更新tail的指向为当前thread=thread3的node内存地址。

将pred指向的node结点的内存地址的next指向更新后的tail内存地址。

在这里插入图片描述

正常的入队已经理清楚了,那看下并发情况这套代码执行的逻辑。

1、并发情况下会不会有并发安全问题,如果没有那道格利是怎么做到的?

我们发现不管是enq(node)方法还是addWaiter(Node mode)方法都是没有加锁的,那么并发的情况是它是安全的吗?首先我们看下enq方法是否存在线程安全问题。
在这里插入图片描述

假设当前tail为空,这个时候两个线程同时到达586行,587行只有一个线程可以返回true,也就是只有一个线程可以初始化队列,然而另外一个线程在compareAndSetHead的时候此时的期望值已不是null,返回的是false。那这个线程咋办呢,我们可以看到584行有个for(; ; ),无条件的循环,下一次进入循环的时候tail就不是null也,就会执行590行后的逻辑,道格利只使用了CAS就做到了此处的线程安全。

假设当前tail不为空的时候,两个线程同时到达590,生成的node1和node2的prev都指向t,但是也只有一个能够执行CAS返回true,另外一个线程无法更新tail的值,只有返回false,进入下一次循环。这么看来enq(node)方法仅仅通过两个CAS方法就实现了线程安全。

addWaiter方法就不累赘的讲解了,特殊的是当CAS返回false的时候addWaiter方法中的逻辑会再次进入enq(node)方法。

2、排队真的是公平的吗?

排队的核心逻辑就是上图enq(node)方法的591行-593行代码。假设100个线程停到了591行进行CAS,有一个成功了,另外99个没抢到进入下一次循环,代码中并不是直接按照时间来排队进入,也是去抢,所以也不见得是完全公平。这世界上或许有完全的公平吧~

acquireQueued入队之后做了什么

再次回来acquire(int arg)方法,这个方法中还有一个acquireQueued(final Node node, int arg)方法,线程加入了同步队列之后做了什么就在这个方法中体现。

final boolean acquireQueued(final Node node, int arg) {
    // 生成一个标记,用来取消获取锁的线程,当for循环中异常的时候。
    boolean failed = true;
    try {
        // 中断标识
        boolean interrupted = false;
        for (;;) {
             // 找到当前结点的前驱结点final Node p = node.predecessor();// 如果p等于head结点,相当于排队排在最前边(第二),第一个结点为虚结点。
            // 然后回去执行获取锁的方法,获取成功进入if逻辑获取失败进入下方if条件判断
            // p不等于head直接进入下方if条件判断
            if (p == head && tryAcquire(arg)) {
                // head指向node结点,让node的thread=null,prev=null
              	// 把node结点变成虚结点了,这个时候waitStatus还是为0
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // ③方法名的直译是获取失败后就应该停住,但是内部做的是整理队列和修改前驱结点的waitStatus=-1。
            // ②park住,等待唤醒
            if (shouldParkAfterFailedAcquire(p, node)&&parkAndCheckInterrupt())②
                interrupted = true;
        }
    } finally {
        // 异常进入,return后也进入
        if (failed)
            // 取消获取
            cancelAcquire(node);
    }
}

上面的代码用①、②、③标注了特殊的地方,需要特别关注。

还是用三个线程,第一个线程抢到锁占有后长期持有,第二个线程和第三个线程入队之后进入次方法。

①获取当前线程所在结点的前驱结点。

第三个线程进入的时候,发现它的前驱结点和head并不相等,因为它前面还有线程二放入的结点。

第三个线程进入shouldParkAfterFailedAcquire方法,将前驱结点的默认的waitStatus用CAS改为-1,如果改成功返回true。如果不成功则进入下一次循环。

当CAS修改成功之后会进入②标记的代码行,执行LockSupport.park(this),代码阻塞在这等待唤醒再继续运行后续逻辑。

第二个线程进入的时候发现第一个结点与head结点相等,就进入获取锁的方法,如果不能用CAS将state=0改为1的话会进入③,③中也会修改head结点的waitStatus为-1,如果第二个线程第一次就将state修改成功的话,此时head的waitStatus还是等于0。

拿到锁之后会将当前结点设置为head,把之前的head与整个链表解绑。

finally中的方法,只有抛出异常和return的时候会执行。

②阻塞当前线程

public static void main(String[] args) {
        List<Thread> threads = new ArrayList<>();
        new Thread(() -> {
            System.out.println("开始锁定");
            threads.add(Thread.currentThread());
            LockSupport.park();
            System.out.println("逻辑开始");
        }).start();

        new Thread(() -> {
            LockSupport.unpark(threads.get(0));
            threads.remove(threads.get(0));
            System.out.println("解锁成功");
        }).start();

    }

在这里插入图片描述

上面用了一个简单的示例来看LockSupport.park()的效果,执行park的时候相当于开车停在红灯前,unpark相当于绿灯放行。当不执行unpark的时候线程会阻塞不继续执行,直到unpark方法将这个线程放行为止。

③shouldParkAfterFailedAcquire获取失败后就应该停住

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取前驱结点的waitStatus
    int ws = pred.waitStatus;
    // ws == -1直接返回true
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    // ws大于0的时候就是cancelled,这个时候需要对队列进行处理
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
       //当前置结点被cancelled的时候,需要将node结点的前驱结点的指向进行更新
       //直到前驱结点的waitStatus大于0。
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 将node结点的前驱结点的值修改为-1.
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
以上源码分析完之后的疑惑
  • 我们分析完了整个加锁的过程,其中修改waitStatus的值是做什么的我们并不清楚。
  • 何时执行unpark我们也不知道。

我们带着这两个问题进入到解锁的源码分析,看是否能找到答案。

unlock解锁

有加锁就必然有解锁,不然一个线程一直持有锁,第二个线程就一直在尝试加锁,后面的线程都在阻塞,所以猜测unpark肯定是在解锁的过程中执行的。

public void unlock() {
  	//相当于执行state-1
    sync.release(1);
}

public final boolean release(int arg) {
    // 尝试释放锁
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
tryRelease尝试释放锁

获取当前state变量的值进行减1操作,如果当前线程和持有锁的线程不相同则抛出异常。

如果c等于0,那么就没有重入,将释放锁,将占用锁的线程变量exclusiveOwnerThread设置为null并返回true。

如果c不等于0,减1之后的值设置为state变量并返回true。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
unparkSuccessor唤醒接班人(下一个可以需要唤醒的结点)

如果当前head结点不为null,并且waitStatus不为0时就开始唤醒接班人了。head结点为空的话确实不需要进行唤醒node结点了,也知道了waitStatus变量的作用就是一个标记,用来标识是否能唤醒接班人,waitStatus在每获取到锁入队之后被修改为-1,如果异常了将被设置为1。

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    // 当前node结点的waitStatus
    int ws = node.waitStatus;
    // 如果小于0,就更新waitStatus的值为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // 获取当前结点的后继结点s
    Node s = node.next;
    // 后继结点为null,进入if逻辑;或者后继结点被中断也进入逻辑
    if (s == null || s.waitStatus > 0) {
        // 将后继结点指向null
        s = null;
        // 然后遍历tail结点,拿到一个离head最近的waitStatus<=0的结点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果s不为空,就唤醒下一个结点的占用线程
    if (s != null)
        LockSupport.unpark(s.thread);
}

首先获取waitStatus的值,如果是-1就进入ws<0的逻辑,CAS将waitStatus变量由-1更新为0。

变量s为head的next结点,如果s不为空,那么s结点获取锁的线程将被唤醒,从哪里阻塞就是哪里开始运行。

如果s结点的next结点为空,或者waitStatus变量的值大于0。(什么时候waitStatus大于0呢?CANCELLED变量值)

for循环中就是跳过了取消的线程,从尾部开始往前遍历,找到离head最近的一个结点的waitStatus值小于或等于0的结点进行替换。如果最近的这个结点不为空就唤醒其获取锁的线程,开始自旋获取锁。

cancelAcquire取消正在进行的尝试获取

我们从Node类中的属性中可以了解到waitStatus的值就几个,如下图所示,可以看到CANCELLED的注释就是线程已取消的waitStatus值,只有这个是大于1的。

在这里插入图片描述

Node节点的状态有如下五种:

状态解释
CANCELLED当前节点已经取消等待(超时或中断),该状态下的节点不会进入其他状态,也不会再阻塞。1
SIGNAL后继结点已经或即将阻塞,当前节点需要唤醒后继结点。后继结点获取锁的时候,必须先收到SIGNAL,才能调用tryAcquire(如果再次失败就会重新阻塞)-1
CONDITION该节点正处于Condition队列中,因为等待condition.signalAll()而阻塞-2
PROPAGATE仅用于共享模式下、释放锁时的队列头节点,用于向整个队列传播锁释放信号-3
0一般是新建的节点0

注意取消获取锁的时机除了抛出异常之外,还有一个地方就是tryLock超时的时候。

下面是cancelAcquire的源码

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    // 无效结点过滤掉
    if (node == null)
        return;
		// 需要取消的结点thread设置为null
    node.thread = null;

    // Skip cancelled predecessors
    // 生成pred指向node的前驱结点
    Node pred = node.prev;
    // 通过前驱结点,跳过取消状态的node
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    // 通过前驱结点过滤后的前驱结点的后继结点
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // 把当前nod的状态设置为CANCELLED
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    //如果当前结点是尾结点,将从后往前的第一个非取消状态的结点设置为尾结点
    // 如果更新成功,将tail的后继结点设置为null,不成功直接进入else逻辑
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        // 如果当前结点不是head的后继结点,
        // 1:判断当前结点的前驱结点是否为-1
        // 2:如果不是,则把前驱机结点设置为-1看是否可以成功
        // 如果1和2中有一个为true,再判断当前结点的线程是不是为不null
        // 如果上述条件都满足,把当前结点的前驱结点的后继指针指向当前结点的后继结点
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 如果当前结点是head的后继结点,或者上述条件都不满足的时候,直接唤醒后继结点
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

根据代码分析,我们选择一种情况进行分析,如下图所示,B结点因为异常导致waitStatus被设置为1,当前结点为node结点,需要cancelAcquire(node)。

在这里插入图片描述

1、将node.thread设置为null

2、将node的前驱结点赋值给pred

3、此时pred指向B,B的waitStatus=1是大于0的

4、将pred指向pred.prev结点A,并且将node.prev指向pred,实际上就是node.prev指向A

5、pred.next是B,所以predNext=B

6、将node的waitStatus设置为1

7、当前node==tail为true,所以CAS更新tail的值,当前结点等于node时将tail的值指向pred的值,指向A

8、CAS当pred的next属性的值为B时,将其替换为null,也就是A的next指向null

上面的流程图还没有走完全流程,因为当node结点不是tail的时候分支还没有走,所以我们再次用图来画一下执行流程。

在这里插入图片描述

1、node结点不为空,将thread设置为null的意思是设置该节点不关联任何线程,就是所谓的虚结点

2、node.prev结点为A,生成pred结点并指向A

3、A的waitStatus不大于0跳过while循环

4、生成predNext结点指向pred.next,也就是node结点

5、将node结点的waitStatus赋值为1

6、node!=tail,进入else逻辑

7、定义int变量ws,用来存放pred结点的waitStatus的值

8、当前pred不等于head结点,ws的值是-1,当然如果A的值是0或者1进入后面的判断

9,ws是不是小于等于0,前面while(pred.waitStatus>0)判断注定了这里不会大于0,那么ws<=0是必然通过的

10、然后CAS不管是小于0还是等于0,将pred的waitStatus设置为-1,并且pred.thread!=null,进入if中的逻辑

11、生成next变量指向node.next也就是B

12、B不为空,并且B的waitStatus为0,那么pred的next变量等于predNext就将pred.next指向next结点B

13、if条件中任意条件不满足,则直接unparkSuccessor方法

公平锁小结

我们对于CANCELLED节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对Next指针进行了操作,而没有对Prev指针进行操作呢?什么情况下会对Prev指针进行操作?

(1)执行cancelAcquire的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过Try代码块中的shouldParkAfterFailedAcquire方法了),如果此时修改Prev指针,有可能会导致Prev指向另一个已经移除队列的Node,因此这块变化Prev指针不安全。

(2)shouldParkAfterFailedAcquire方法中,会执行下面的代码,其实就是在处理Prev指针。shouldParkAfterFailedAcquire是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更Prev指针比较安全。

do {
  node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
公平锁流程图

我们探究了公平锁的部分源码,但是知识点和结构还比较散乱,所以拉通一个整体流程来看下理解的内容。红色的方块是AQS中的代码,灰色的方块为FairSync中的代码。

下面是lock的流程图

在这里插入图片描述

下面是unlock的流程图

在这里插入图片描述

NonFairSync
lock加锁

我们已经详细的解析了加锁过程中的基本流程,接下来再对解锁的基本流程进行分析。由于ReentrantLock在解锁的时候,并不区分公平和非公平锁,所以我们重点关注加锁的流程。

final void lock() {
  // 如果state的值为0,就更新为1
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

非公平锁加锁的时候,多线程并发的情况下直接CAS进行共享变量state的修改,只有一个线程可以成功,并且记录占用线程。

acquire获取锁

那么其他的线程就会进入到acquire方法,你会发现这个不是和公平锁的获取方法一样一样的嘛。不要慌,毕竟FairSync和NonFairSync是两个不同的实现,所以我们还是继续往代码里面查看。

public final void acquire(int arg) {
  	// 尝试获取锁,如果tryAcquire返回false,则执行&&后面的方法
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
tryAcquire尝试获取锁/nonfairTryAcquire

接下来准备查看非公平锁的获取锁代码tryAcquire,发现它的实现是在类Sync中的nonfairTryAcquire方法,源码如下。

final boolean nonfairTryAcquire(int acquires) {
    //获取当前线程
    final Thread current = Thread.currentThread();
    // 获取贡献变量state的值
    int c = getState();
    // c只有可能大于等于0,当c大于0,说明是要么是重入,要么是其他线程占有锁。
    if (c == 0) {
        // 更新state的值为-1
        if (compareAndSetState(0, acquires)) {
            // 设置占有线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 当c>0的时候进入,判断当前线程是否和占有锁的线程一致
    else if (current == getExclusiveOwnerThread()) {
        // 重入加1
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新state的值
        setState(nextc);
        return true;
    }
    return false;
}

此方法其实和公平锁中的tryAcquire相似度极高,眼尖的朋友们会发现if (compareAndSetState(0, acquires))这里有点不同,公平锁会先看有没有排队的前辈,而非公平锁去掉了这个校验。

方法获取到当前的线程,如果当前的state等于0,那么就CAS,将state的值改为1,如果线程修改成功,那么设置占用线程,返回true;如果没有改成功的线程返回false。

如果当C大于0的时候,会进入else-if逻辑,只有重入的时候才会进入此逻辑内。判断当前线程和占用锁的线程是否一致,如果一致那就是重入锁,更新state的值返回true。

非公平锁小结

后续的代码其实和FairSync的一致,我们看到公平锁和非公平锁的区别只是在于有没有排队的前辈这个逻辑,非公平锁在加锁的时候会有两个的直接抢占修改state的时机,虽然也有入队和出队的操作,队列中的排队线程还是有先后顺序,但是新加入的线程并不管队列中的线程排多久了,新线程执行lock方法的时候是直接参与竞争,这种方式就会导致某些线程可能一直获取不到锁,自然就是不公平的。

总结

我们日常开发中使用并发的场景是比较多的,但是对并发内部的基本框架原理了解的朋友却不多。本文仅仅介绍了可重入锁ReentrantLock加解锁的部分原理和AQS原理,希望能够成为大家了解AQS和ReentrantLock等同步器的“敲门砖”。
Java核心编程

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-09-14 13:11:30  更:2021-09-14 13:12:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 16:57:21-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码