IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 开发测试 -> AbstractQueuedSynchronizer(AQS)超详细解析 -> 正文阅读

[开发测试]AbstractQueuedSynchronizer(AQS)超详细解析

“锁”是计算机领域的一个及其重要且广泛的概念,什么是“锁”?广义上来讲,锁就是某种计算机资源,具体的讲,他可以是一个文件,一个硬件设备,一块内存,一个数据表,或者仅仅只是一个变量。当计算机上的不同的进程(或线程)对同一个资源进行访问时,如果他们不希望其他的进程(或线程)对这个资源进行访问,直到自己访问结束,那么在这个进程(或线程)访问这个资源的期间,他就需要对这个资源进行上锁,这样,当其他进程(或线程)对这个资源进行访问之前,他们需要先查看该资源是否已经上锁,如果上锁,那么就需要一直等待,直到正在访问这个资源的进程(或线程)结束访问并释放锁。

我们先来看一个十分简单的例子

    public static void main(String[] args) {
        final Lock listLock = new ReentrantLock();
        List<String> list = new LinkedList<>();
        Thread a = new Thread(() -> {
            listLock.lock();
            try {
                list.add("hello world");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                listLock.unlock();
            }
        });

        Thread b = new Thread(() -> {
            listLock.lock();
            try {
                list.add("hello world");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                listLock.unlock();
            }
        });

        a.start();
        b.start();
    }

线程a和线程b同时想要对list进行操作,在调用list.add()方法之前,需要先调用listLock.lock()进行加锁,调用完list.add()方法之后,再调用listLock.unlock()进行解锁。那么在listLock.lock()和listLock.unlock()这两个方法内部,究竟是怎么实现的呢?进入到ReentrantLock内部,发现ReentrantLock中有一个sync变量,ReentrantLock的所有的操作都是通过sync完成的,而sync又继承自

java.util.concurrent.locks.AbstractQueuedSynchronizer

AbstractQueuedSynchronizer(AQS)是JUC包下的一个极其重要的类,JUC包下的很多实现类都继承自他,那么AQS到底是个什么东西。

当多个线程同时获取同一个锁的时候,没有获取到锁的线程需要排队等待,等锁被释放的时候,队列中的某个线程被唤醒,然后获取锁。AQS中就维护了这样一个同步队列(CLH队列)。

?AQS的原理是如此的简单,然而具体的实现却稍微比较复杂。先来看一下AQS中重要的几个属性:

private transient volatile Node head;   //队列的头节点的指针。
private transient volatile Node tail;    //队列的尾节点的指针。
private volatile int state;   //同步器的状态

head指向的是同步队列的头节点,头节点是持有锁资源的节点,tail指向了队列的尾节点。state是什么呢?可以将state理解为资源的状态。以ReentrantLock为例,state的初始值为0,表明此时没有线程在使用资源,此时如果某个线程 t 尝试获取锁,那么他可以成功获取,并将state修改为1,表明此时有1个线程在访问资源,那么其他线程尝试获取锁的时候,发现state状态为1,就会加入到同步队列中然后阻塞。当然线程 t?可以重复获取锁(不然ReentrantLock怎么叫可重入锁?),并且每次重复获取锁都会将state加1。而每次当线程?t?释放锁的时候,会将state减1,这样当线程 t?完全释放完锁之后,state又恢复为0。

接下来看一下AQS的内部类Node的属性:

volatile Node prev;   //该节点的前置节点
volatile Node next;   //该节点的后置节点
volatile Thread thread;   //该节点所代表的线程
volatile int waitStatus;  //节点的状态
Node nextWaiter;   

prev,next和thread都不难理解,这里主要解释下watiStatus和nextWaiter

waitStatus表示节点的状态,他只有以下几种取值:

  • SIGNAL(-1): 表示该节点的后继节点处于阻塞状态,当该节点释放锁(releases)或者取消获取锁(cancels)时,需要唤醒他的后继节点。正常情况下,同步队列中的大部分节点的状态都为SIGNAL
  • CONDITION(-2): 表明该节点是在条件队列(condition queue)上等待,而不是同步队列。由于AQS不仅可以用来当锁(替代synchronized关键字),还具有替代Object的wait/notify的功能,如果一个线程调用了Condition.await(),该线程会加入到条件队列中(而不是同步队列),并将状态设置为CONDITION。关于条件队列更详细的介绍会在后面给出。我们现在只需要知道线程如果只是调用锁的lock()和unlock()方法时,是不会进入CONDITION状态的。
  • PROPAGATE(-3): 共享模式下,由于可以有不只一个线程享用锁,前置节点不仅会唤醒他的后继节点,还可能会将唤醒操作进行“传播”,也就是会唤醒后继节点的后继节点。关于共享锁也会在后面进行详细介绍。
  • CANCELLED(1):很好理解,表明该节点被取消了,例如timeout会将节点状态置为CANCELLED。
  • 0:不属于上述任何一种状态。只有在同步队列中的节点在初始化的时候watiStatus会设置为0。

nextWaiter有两种含义:1. 当该节点处于同步队列时(AQS自己本身的CLH队列),表明该节点是在竞争锁,如果是独占锁,那么nextWaiter=null,如果是共享锁,那么nextWaiter=new Node(),也就是说,nextWaiter此时仅仅相当于一个flag,用来表明该锁是独占锁还是共享锁。2. 当该节点处于条件队列时,nextWaiter指向他在队列中的后继节点。

了解了AQS的内部属性和简单的机制后,我们分别从几个具体的案例来剖析AQS。?

可重入锁(ReentrantLock

ReentrantLock内部又分为公平锁(FairSync)和非公平锁(NonfairSync),可在构造方法中指定参数来决定使用公平锁还是非公平锁,默认情况下采用非公平锁。

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

1. 公平锁

  • lock()
final void lock() {
    acquire(1);
}

调用acquire()方法为线程申请锁资源,首先会调用tryAcquire()方法“尝试性地获取一下锁”,如果获取成功,那么直接返回,如果失败,则调用addWaiter()方法将该线程加入到同步队列中,然后线程会在acquireQueued()方法内被阻塞,如果线程在该过程中被中断(interrupt)了,acquireQueued()方法会返回true,那么就需要调用selfInterrupt()方法将中断补上。接下来我们逐个分析这几个方法。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • tryAcquire (int acquires)?

tryAcquire()方法的目的是让线程“尝试性地去获取一下锁”,需要注意地是,acquire()方法是定义在AQS中的final方法,而tryAcquire()方法在AQS中的一个抽象方法,需要子类自己去实现(模板模式)。

//公平锁的tryAcquire()方法的实现
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {  //state为0,表面此刻没有线程占用锁,但这并不意味着当前线程就能马上获取锁
        if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { //ownerThread就是当前线程,可以重入
        int nextc = c + acquires; //将state加1
        if (nextc < 0)  //当前线程一直在重入,以至于重入次数超过了MAX_INT,直接变成负数
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

在公平锁的tryAcquire()方法中,先判断state是否为0,如果为0,说明此时没有线程占用锁,锁资源是可用的,但是这并不意味着线程就能直接占用锁,还需要满足两个条件:

  1. hasQueuedPredecessors()方法返回false
  2. compareAndSetState(0, acquires)方法返回true

hasQueuedPredecessors()方法会判断同步队列中是否有线程等待的时间比当前线程更久,如果有,则返回true。(hasQueuedPredecessors方法的源码解析放在后面讲)

compareAndSetState(0, acquires)会用CAS操作将state从0设置为1,设置成功则返回true。

那么为什么需要满足这样两个条件呢?

第一,考虑这样一种情况:上一个持有该锁的线程刚刚释放完锁并将state从1置为0,他的后继节点(a线程)还没来得及获取锁,这时当前线程(b线程)调用了tryAcquire()方法尝试获取锁,这时b线程会发现state为0,但这并不代表b线程就有资格获取锁,因为a线程都已经排到队列的最前面了,此时有资格获取锁的线程应该是a线程,你b线程才刚刚进来,肯定是不能插队的,否则就不符合公平锁的原则了。

第二,即使hasQueuedPredecessors()方法返回false,当前线程还需要用CAS操作成功将state从0置为1,因为在多线程的情况下,有可能有其他的线程同样走到了这一步,那么谁的CAS操作成功了,谁才能获得锁。

让我们再次回到tryAcquire()方法中,如果当前线程经历了重重考验,那么最终他能获取锁资源,并调用setExclusiveOwnerThread(current)方法将锁的持有者设置为当前线程。但如果在判断state时发现不为0,说明此时某个线程正在占用锁,但是这也不意味着当前线程就一定不能获取锁,因为如果此时占用锁的线程(ownerThread)就是他自己的话,那么他就可以重入。(不然为什么叫重入锁呢?)有趣的是,这里还做了这样一个判断:

int nextc = c + acquires; //将state加1
if (nextc < 0)
    throw new Error("Maximum lock count exceeded");

一开始我完全没想明白,nextc怎么可能会小于0呢?state的初始值为0,如果没有重入,那么state为1,如果有重入,state>1,怎么也不可能小于0,这个判断难道不是多次一举?后来终于想明白,如果一个线程一直在重入且没有释放锁,直到将state设置为了MAX_INT,那么下次重入的时候state会被置为MAX_INT+1(这是一个负数),这肯定是不允许的,因此会抛出一个Error。

  • addWaiter (Node mode)

addWaiter()方法会将当前线程的节点加入到同步队列中。结合enq(node)方法一起来看,addWaiter()方法会用CAS操作来将node插入到队伍的结尾,如果CAS操作成功,则方法直接返回;如果CAS操作失败,则调用enq(node),而enq(node)方法其实也是和addWaiter()同样逻辑的CAS操作,只不过CAS操作是放在一个死循环中,这样就能保证最后node总是能插入到队尾(其实在AQS的大量方法中都能看到这种将CAS操作放在循环中的做法)。除此之外enq方法还能在队列第一次有节点进来时对队列进行初始化。

为什么addWaiter不直接调用enq呢?主要是为了考虑效率,在线程竞争不是很激烈的情况下,一次CAS操作是很可能成功的,那么就不需要再调用enq了,毕竟多调用一次方法要花费更多的时间。

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    /**
     * mode是用来标记该线程竞争的锁是共享锁还是独占锁。他只有两种取值:
     * 1. 独占锁:mode=Node.EXCLUSIVE。 2.共享锁:mode=Node.SHARED
     * 真正插入到队列中的是node,不是mode
     */
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
  • enq (Node node)
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
  • ?acquireQueued (Node node, int arg)

前面讲了那么多,小伙伴们还记得最初的acquire方法的逻辑吗?(不记得了可以回去看一下)。acquire方法中会调用到?acquireQueued方法,该方法是一个死循环,在循环中,首先判断当前线程节点的前置节点p是否是头节点。如果是,则当前线程有一次通过调用tryAcquire()方法来获取锁的机会。为什么呢?因为此时可能p刚好释放完锁,那么显然当前线程可以直接获取锁而不需要被阻塞,acquireQueued()方法会直接返回true。如果p不是头节点或者没能通过tryAcquire获取锁,那么会进入下一个if判断。

首先调用shouldParkAfterFailedAcquire方法判断线程在获取锁失败的情况下是否需要阻塞,如果需要,则线程会继续调用parkAndCheckInterrupt方法并在该方法中被阻塞。

如果在上述过程中任何一个地方抛出了异常,那么failed无法被设置为false,则会在finally代码块中调用cancelAcquire将线程的waitStatus设置为CANCELLED。

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • ?shouldParkAfterFailedAcquire (Node pred, Node node)

该方法会通过前置节点pred的状态 (waitStatus)来判断当前节点是否需要被阻塞。

1. 如果前置节点pred的ws为SIGNAL,说明需要等pred给信号,那没啥好说的,肯定得等着。

2. 如果ws>0,那么ws肯定为CANCELLED,那么当前节点node需要一直向前找到ws不为CANCELLED的节点,然后将其设置为node的前置节点。即将node前面的ws为CANCELLED的节点都踢出队列。

3. ws为0或者PROPAGATE,PROPAGATE只有在共享锁的情况下才会出现,我们放在后面讲共享锁的时候再讲。ws为0有两种情况:

① pred刚加入队列,ws初始状态为0。那么会通过CAS操作将pred的ws设置为SIGNAL,这样, shouldParkAfterFailedAcquire方法返回false,node不会被阻塞,会进入acquireQueued方法的下一次for循环,并再次进入shouldParkAfterFailedAcquire方法,这一次node会发现pred的ws为SIGNAL,那么方法返回true,node会被阻塞。

②pred节点刚释放完锁时,会将他的ws设置为0(后面讲锁释放的时候会细说)。意味着node的前置节点pred为head并且他刚释放完锁,那么node会在下一次acquireQueued方法的for循环中调用tryAcquire方法并成功获取锁。?

总结一下,一个线程只有在他的前置节点pred的ws为SIGNAL的情况下才会被阻塞,否则无论如何他都不会被阻塞。

/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops.  Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

讲到这里,公平锁的锁获取过程基本已经讲完了。我们通过一张流程图来总结一下。

(暂时先写到这里,后面会继续更新。。。)

  开发测试 最新文章
pytest系列——allure之生成测试报告(Wind
某大厂软件测试岗一面笔试题+二面问答题面试
iperf 学习笔记
关于Python中使用selenium八大定位方法
【软件测试】为什么提升不了?8年测试总结再
软件测试复习
PHP笔记-Smarty模板引擎的使用
C++Test使用入门
【Java】单元测试
Net core 3.x 获取客户端地址
上一篇文章      下一篇文章      查看所有文章
加:2021-12-07 12:20:35  更:2021-12-07 12:20:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/18 6:15:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码