IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 深入分析AQS实现原理 -> 正文阅读

[Java知识库]深入分析AQS实现原理

一、AQS概述

AbstractQueuedSynchronizer简称AQS,它是java.util.concurrent包中,它提供了一套完整的同步编程框架。我们常用的ReentrantLock、CountDownLatch都是基于AQS实现的。
AQS的实现分为两种形式,一种是独占锁,另一种则是共享锁。

独占锁:每次只能有一个线程持有锁。我们比较熟悉的ReentrantLock就是通过独占锁实现互斥性的。
共享锁:允许多个线程同时获取锁,并发地访问资源。例如ReentrantReadWriteLock。
在这篇文章中,我们分析一下AQS的独占锁机制。
在这里插入图片描述

二、AQS内部实现

AQS的实现是底层底层维护了一个先进先出(FIFO)的双向队列,这个队列是基于链表实现的。如果线程竞争锁失败,那么就会进入到这个同步队列中进行等待。当获得锁的线程释放锁之后,会从队列中唤醒一个线程。
双向队列是基于Node节点实现的,当线程需要入队列的时候,会将线程的信息封装成一个Node对象,进行入队操作。
属性head就标记头节点,属性tail标记尾节点。
在这里插入图片描述
Node类的组成如下:

 static final class Node {
        //方式一:标记为共享锁(mode)
        static final Node SHARED = new Node();
        //方式二:标记为独占锁(mode)
        static final Node EXCLUSIVE = null;

       //节点从同步队列中取消
        static final int CANCELLED =  1;
        //后继节点的线程处于等待状态,如果当前节点释放锁,会通知后继节点
        static final int SIGNAL    = -1;
        //当前节点处于等待队列中
        static final int CONDITION = -2;
		//这个用于共享锁,表示共享式同步状态的传递
        static final int PROPAGATE = -3;

		//在同步队列中的等待状态
        volatile int waitStatus;

		//前驱节点
        volatile Node prev;

		//后继节点
        volatile Node next;

		//加入同步队列的线程引用
        volatile Thread thread;

		//等待队列中的下一个节点
        Node nextWaiter;

		//队列中的下一个对象是否为共享式
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

		//获得当前节点的前驱节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    
        }
		//添加等待者
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
	//头节点
    private transient volatile Node head;

	//尾节点
    private transient volatile Node tail;

	//标记状态
    private volatile int state;

三、AQS的源码分析

清楚了AQS的基本架构以后,我们来分析一下AQS的源码,仍然以ReentrantLock为模型。
入口:

public void lock() {
    sync.lock();
}
// NonfairSync.lock, 以下以非公平锁作为主要分析逻辑
final void lock() {
    if (compareAndSetState(0, 1)) //通过cas操作来修改state状态,表示争抢锁的操作
      setExclusiveOwnerThread(Thread.currentThread());//设置当前获得锁状态的线程
    else
      acquire(1); //尝试去获取锁
}
  • 当state=0时,表示无锁状态
  • 当state>0时,表示已经有线程获得了锁,也就是state=1,但是因为ReentrantLock允许重入,所以同一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放5次直到state=0其他线程才有资格获得锁

获得独占锁的源头就是从acquire()方法开始的。这个方法中分别调用了三个方法:tryAcquire()、addWaiter()、acquireQueued()。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

这个方法的主要逻辑是:

  • 通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false
  • 如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部
  • acquireQueued,将Node作为参数,通过自旋去尝试获取锁。

首先调用的是tryAcquire(arg),这个方法只抛出了一个异常,因为它的具体实现是交给子类去完成的。这个方法的主要功能是:尝试获得锁,进入临界区。

	protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

tryAcquire在NonfairSync中的实现代码如下:

ffinal boolean nonfairTryAcquire(int acquires) {
    //获得当前执行的线程
    final Thread current = Thread.currentThread();
    int c = getState(); //获得state的值
    if (c == 0) { //state=0说明当前是无锁状态
        //通过cas操作来替换state的值改为1,大家想想为什么要用cas呢?
        //理由是,在多线程环境中,直接修改state=1会存在线程安全问题,你猜到了吗?
        if (compareAndSetState(0, acquires)) {
             //保存当前获得锁的线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //这段逻辑就很简单了。如果是同一个线程来获得锁,则直接增加重入次数
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires; //增加重入次数
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

当tryAcquire方法获取锁失败以后,则会先调用addWaiter将当前线程封装成Node,然后添加到AQS队列

private Node addWaiter(Node mode) { //mode=Node.EXCLUSIVE
        //将当前线程封装成Node,并且mode为独占锁
        Node node = new Node(Thread.currentThread(), mode); 
        // Try the fast path of enq; backup to full enq on failure
        // tail是AQS的中表示同步队列队尾的属性,刚开始为null,所以进行enq(node)方法
        Node pred = tail;
        if (pred != null) { //tail不为空的情况,说明队列中存在节点数据
            node.prev = pred;  //讲当前线程的Node的prev节点指向tail
            if (compareAndSetTail(pred, node)) {//通过cas讲node添加到AQS队列
                pred.next = node;//cas成功,把旧的tail的next指针指向新的tail
                return node;
            }
        }
        enq(node); //tail=null,将node添加到同步队列中
        return node;
    }

通过上面的代码我们知道,如果这个同步队列是空队列或者CAS失败,那就调用enq()方法。

private Node enq(final Node node) {
        //自旋
        for (;;) {
            Node t = tail; //如果是第一次添加到队列,那么tail=null
            if (t == null) { // Must initialize
                //CAS的方式创建一个空的Node作为头结点
                if (compareAndSetHead(new Node()))
                   //此时队列中只一个头结点,所以tail也指向它
                    tail = head;
            } else {
//进行第二次循环时,tail不为null,进入else区域。将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向Node
                node.prev = t;
                if (compareAndSetTail(t, node)) {
//t此时指向tail,所以可以CAS成功,将tail重新指向Node。此时t为更新前的tail的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向Node,返回头结点
                    t.next = node;
                    return t;
                }
            }
        }
    }

将添加到队列中的Node作为参数传入acquireQueued方法,这里面会做抢占锁的操作

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();// 获取prev节点,若为null即刻抛出NullPointException
            if (p == head && tryAcquire(arg)) {// 如果前驱为head才有资格进行锁的抢夺
                setHead(node); // 获取锁成功后就不需要再进行同步操作了,获取锁成功的线程作为新的head节点
//凡是head节点,head.thread与head.prev永远为null, 但是head.next不为null
                p.next = null; // help GC
                failed = false; //获取锁成功
                return interrupted;
            }
//如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())// 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志
                interrupted = true;
        }
    } finally {
        if (failed) // 如果抛出异常则取消锁的获取,进行出队(sync queue)操作
            cancelAcquire(node);
    }
}

  • 获取当前节点的prev节点
  • 如果prev节点为head节点,那么它就有资格去争抢锁,调用tryAcquire抢占锁
  • 抢占锁成功以后,把获得锁的节点设置为head,并且移除原来的初始化head节点
  • 如果获得锁失败,则根据waitStatus决定是否需要挂起线程 最后,通过cancelAcquire取消获得锁的操作

前面的逻辑都很好理解,主要看一下shouldParkAfterFailedAcquire这个方法和parkAndCheckInterrupt的作用

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; //前继节点的状态
    if (ws == Node.SIGNAL)//如果是SIGNAL状态,意味着当前线程需要被unpark唤醒
               return true;
如果前节点的状态大于0,即为CANCELLED状态时,则会从前节点开始逐步循环找到一个没有被“CANCELLED”节点设置为当前节点的前节点,返回false。在下次循环执行shouldParkAfterFailedAcquire时,返回true。这个操作实际是把队列中CANCELLED的节点剔除掉。
    if (ws > 0) {// 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点'的前继节点”。
       
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else { // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
}

LockSupport类是Java6引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:

public native void unpark(Thread jthread);  
public native void park(boolean isAbsolute, long time);  

unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。
permit相当于0/1的开关,默认是0,调用一次unpark就加1变成了1.调用一次park会消费permit,又会变成0。 如果再调用一次park会阻塞,因为permit已经是0了。直到permit变成1.这时调用unpark会把permit设置为1.每个线程都有一个相关的permit,permit最多只有一个,重复调用unpark不会累积

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-08 22:14:58  更:2022-03-08 22:19:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 11:31:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码