IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Java AQS源码分析 -> 正文阅读

[Java知识库]Java AQS源码分析

AQS是AbstractQueuedSynchronizer的简称,通过一个原子变量+双向队列来实现多线程安全。

核心思想:
若资源state空闲则当前线程设置为拥有线程,并将资源设置为占用。
若资源state被占用,则请求线程进入一个FIFO同步队列等待唤醒机制保证资源分配。
参考资料
https://www.cnblogs.com/truestoriesavici01/p/13213978.html
https://blog.csdn.net/sifanchao/article/details/84343848

原子变量state

state的定义:private volatile int state;
使用Unsafe+CAS更新state.

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }
    //CAS修改原子变量state
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

同步双向队列

当资源state被占用时,其他竞争线程会进入同步队列。等待资源被释放后唤醒如下图。
在这里插入图片描述

AQS结点结构

AQS同步队列是一个FIFO的双向有序队列,头结点占用资源。AQS有个静态内部类Node,Node属性如下:

        volatile int waitStatus;//当前结点的状态
        volatile Node prev; //当前结点的前置结点指针
        volatile Node next; //当前结点的后结点指针
        volatile Thread thread; //当前结点对应的线程
        Node nextWaiter; //指向下一个CONDITION状态节点

结点状态

AQS每个Node都有不用状态。在AQS获取锁和释放锁都用到这个状态。

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1; //当前线程获取锁的请求取消
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1; //后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
        static final int CONDITION = -2; //表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
        static final int PROPAGATE = -3; //共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
        static final int INIT = 0; //新结点入队时的默认状态。

独占锁获取

ReentrantLock 对象创建,有带参数和不带参数的实现。默认不带参是非公平锁。具体看ReentrantLock源码

   @Test
    public  void testAqs() {
        ReentrantLock lock = new ReentrantLock();
        lock.lock();
        System.out.println("");
        lock.unlock();
    }
    //Sync是抽象方法,lock方法由子类实现。有公平锁(FairSync)、非公平锁(NonfairSync)
    public void lock() {
        sync.lock();
    }
    //非公平锁实现
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    //公平锁
    final void lock() {
        acquire(1);
    }

公平锁:直接调用acquire获取同步锁。
非公平锁: 先调用CAS尝试同步状态,如果成功就把当前线程设置为独占线程。失败在调用acquire获取同步锁
区别:非公平锁在调用acquire之前,会尝试先获取一次锁。acquire方法公平锁和非公平锁共用代码在AQS实现。

acquire方法

公平锁和非公平锁都会调用到acquire方法尝试获取同步状态。详细见AbstractQueuedSynchronizer#acquire。实现逻辑:首先尝试获取同步资源,获取失败加入队列等待唤醒。

    public final void acquire(int arg) {
         //尝试获取同步状态。成功则直接返回执
         //失败则先添加结点后把新结点添加到同步队列中。
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    //公平锁tryAcquire具体实现
    protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //当前同步资源空闲,判断同步队列是否有数据。如果没有就尝试占用资源并设置线程;其他则返回获取同步状态失败、
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果同步资源占用且是线程同入,则资源+1
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
     }
        
    //非公平锁tryAcquire实现
    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //同步资源state空闲,尝试占用同步资源。成功则设置资源占用线程为当前线程;否则返回获取线程失败
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            
            //如果同步资源占用且是线程同入,则资源+1
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
      }

公平锁和非公平锁实现区别:公平锁当同步状态空闲的时会判断同步队列是否有数据,没有才尝试抢占资源。非公平锁则忽略同步队列的存在直接抢占资源。

    //tryAcquire获取同步锁失败之后,创建新结点添加到同步队列。
    private Node addWaiter(Node mode) {
        //将线程以指定模式封装为Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        //如果同步队列存在数据,则调整新结点和前结点的前后结点指针并把新建结点CAS设置为新的尾结点
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //尾节点为空 || CAS尾插失败 里面是个自旋,不断尝试设置当前结点到同步队列
        enq(node);
        return node;
    }
   
    private Node enq(final Node node) {
        //自旋直到有返回。
        for (;;) {
            Node t = tail;
            //当前尾结点为空即同步队列为空
            if (t == null) { 
                //初始化同步队列
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                // 不断CAS将当前节点尾插入同步队列中
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued方法

添加完结点到同步队列时,结点排队获取同步资源

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //自旋
            for (;;) {
                //获取当前节点的前置结点
                final Node p = node.predecessor();
                //若前置节点为头结点,就尝试获取锁。头部结点占用着锁。
                if (p == head && tryAcquire(arg)) {
                    //把当前节点设置为头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前驱节点的节点状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             * 前线程的前一个线程处于唤醒状态,当前线程阻塞
             */
            return true;
        if (ws > 0) {// 前驱节点已被取消
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            // 不断重试直到找到一个前驱节点状态不为取消状态
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 前驱节点状态不是取消状态时,将前驱节点状态置为-1,
            // 表示后继节点应该处于等待状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    //阻塞当前线程,返回当前线程的中断状态(清除线程字段标志)
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

acquireQueued流程:
在这里插入图片描述
shouldParkAfterFailedAcquire方法逻辑:
在这里插入图片描述

总结

AQS用同步资源state和双向同步队列实现。首先尝试获取锁,获取不到则添加结点到队尾。
添加到队尾之后,尝试出队。获取失败就unpark等待被前一个节点唤醒详见unlock源码。被前一个节点唤醒之后又重新尝试获取同步资源,可能失败被重新unpark等待后重新被唤醒。如此自旋直到获取锁。但自始至终只有一个线程在自旋获取锁,其他节点都在堵塞

释放锁unlock

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 若当前线程非占有资源的线程则抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread()) 
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 若持有的线程全部释放,则占有资源的线程设为null,更新状态state
    if (c == 0) { 
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

// 当前线程释放锁
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 若队列非空或者当前线程不处于唤醒状态,唤醒当head节点后的的一个节点的线程
        if (h != null && h.waitStatus != 0) 
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0) 
        compareAndSetWaitStatus(node, ws, 0);
    // node为当前线程的后一个等待线程    
    Node s = node.next;
    if (s == null || s.waitStatus > 0) { // 若当前线程下一个线程为null或者被取消
        s = null;
        // 从后向前查找第一个没有被取消的等待线程,将其唤醒
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-07-22 14:00:25  更:2021-07-22 14:02:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/22 7:59:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码