IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 聊聊AbstractQueuedSynchronizer -> 正文阅读

[Java知识库]聊聊AbstractQueuedSynchronizer

什么是AbstractQueuedSynchronizer

AbstractQueuedSynchronizer是一个框架,一个基于FIFO队列用于实现阻塞锁和同步器的框架。

AbstractQueuedSynchronizer类设计为能支持大多数情况的同步器,其中通过一个原子性的value来表示锁的状态(state),对于实现者来说,只需要安全的去改变这个state(什么叫安全),然后定义state处于什么情况(什么值)下是获取(acquired),释放(released)。

基于此同步器需要在内部创建一个helper来继承于AbstractQueuedSynchronizer,然后在定义自己的一些同步方法,在同步方法中按照自己的逻辑去访问helper方法以达到同步目的。(范例)。

AbstractQueuedSynchronizer提供了两种模式,一种独占模式(exclusive),一种共享模式(shared),通过独占表示只有一个线程能占有,共享代表着允许多个线程同时持有。

实现同步器步骤

要实现AQS只需要按照自己的需要完成下面几个方法的实现就可以了

  1. tryAcquir() 尝试获取独占锁
  2. tryRelease() 尝试释放独占锁
  3. tryAcquireShared() 尝试获取共享锁
  4. tryReleaseShared() 尝试释放共享锁
  5. isHeldExclusively() 是否持有独占锁

如果我们只需要其中一种方式,那么只需要实现对应的方法就行了,比如:

我们需要完成一个共享模式的同步器,那么只需要完成tryAcquireShared()和tryReleaseShared()方法的实现,当然我们也可以两种方式都需要比如说ReentrantReadWriteLock可重入的读写锁。

对于获取锁的场景会有两种:

  1. 直接返回是否能获取到锁,也就是我们实现的tryAcquir() 或者 tryAcquireShared()
  2. 当获取不到锁的时候希望能阻塞,一直到获取到锁为止,也就是AbstractQueuedSynchronizer提供的acquire() 或者 acquireShared()

第一种场景我们是自己实现的,而比较复杂的第二种AbstractQueuedSynchronizer已经帮我们完成了

安全的去改变state

AbstractQueuedSynchronizer中定义了getState(),setState(),compareAndSetState()三个方法来操作state,基于此在自己实现上述的五个方法,可以参考范例 中的tryAcquir(),tryRelease()等方法的实现。

范例

class Mutex implements Lock, java.io.Serializable {
 
    // 继承AbstractQueuedSynchronizer实现sync同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // Reports whether in locked state
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
 
         // Acquires the lock if state is zero
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
 
        // Releases the lock by setting state to zero
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0) throw new IllegalMonitorStateException();
                setExclusiveOwnerThread(null);
                setState(0);
            return true;
        }
    }
    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();
    /**
    * 
    * 定义自己的锁对象包含的功能,然后通过sync实现其功能
    *
    **/
    public void lock()                { sync.acquire(1); }
    public boolean tryLock()          { return sync.tryAcquire(1); }
    public void unlock()              { sync.release(1); }
    public Condition newCondition()   { return sync.newCondition(); }
    public boolean isLocked()         { return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    public void lockInterruptibly() throws InterruptedException {
      sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
  }

AbstractQueuedSynchronizer设计

通过上面描述我们简单的感受了下AbstractQueuedSynchronizer是做什么的,以及我们该怎么做,那么接下来我们就将详细看看AbstractQueuedSynchronizer中是如何设计整个框架的。

第一步:给框架进行定位

AbstractQueuedSynchronizer的定位是一个同步器框架,是一个基于FIFO队列用于实现阻塞锁和同步器的框架。

基于此,我们就需要准备一些东西:

  1. 锁的资源or同步的资源
  2. 一个FIFO队列的实现

对于第一点,既然是锁既然是同步器,那么我们就必须要有实际的物体来锁定,而不能是虚无的,它可以是对象,可以是任意一个实实在在的东西。AbstractQueuedSynchronizer选择int类型来做为资源标识,也就是上述的state成员变量。

对于第二点,AbstractQueuedSynchronizer选择CLH队列的变种,一个节点持有前后对象的双向链表来作为阻塞队列。

根据上面的结构和资源,一个简单的流程就有了:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z7yzQitg-1649401519743)(https://note.youdao.com/yws/res/10256/WEBRESOURCEc35f366090e1f3d6e164e18aa8fb3ead)]

第二步:抽象需自定义的API

对于争夺来说,不同的场景会存在不一样的需求,对于ReentrantLock可重入锁同一个线程允许多次进入,但是在范例中实现的锁就只能进入一次。

因此,需要将争夺相关的操作抽象出来,让实现方去自定义,同理争夺进行了自定义,那么释放必然就存在则差别,同样需要进行自定义。

而锁存在则两种模式,一种独占一种共享,对于框架来说,是没办法直接明白本次争夺是独占还是共享,那么只能进行机械式的拆分为两个方法来表示这层含义。

也就诞生出了这五种需要实现方关注的方法:

  1. tryAcquir() 尝试获取独占锁
  2. tryRelease() 尝试释放独占锁
  3. tryAcquireShared() 尝试获取共享锁
  4. tryReleaseShared() 尝试释放共享锁
  5. isHeldExclusively() 是否持有独占锁

第三步:CLH变种队列的设计与管理

现在我们明确了资源,队列结构,争夺实现。接下来就需要完成队列的管理。

  • 队列的结构?
  • 队列成员属性有哪些?
  • 节点成员有几种状态?
  • 队列操作

队列结构

基于CLH变种队列,在AbstractQueuedSynchronizer中将持有队列的头对象与尾对象来帮助完成快速进入队列和出队列。

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

对于头部head,只能通过setHead()方法进行修改,对于尾部tail只能通过enq()方法添加到队列中。

队列成员属性

通过队列结构我们知道队列中的成员类型都为Node类型。

static final class Node {
        //共享节点的标识
        static final Node SHARED = new Node(); 
        //独占节点的标识
        static final Node EXCLUSIVE = null;
        
        //前节点
        volatile Node prev;
        //后节点
        volatile Node next;
        //当前节点的线程
        volatile Thread thread;

        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    }

基于CLH变种的双向链表,必然存在前节点和后节点,对应的成员就是prev和next。
同时需要持有当前节点的线程来进行阻塞操作和唤醒操作,对应的成员就是thread。
通过nextWaiter成员来判断该节点的类型是共享还是独占

节点成员有几种状态

节点对应的操作通过waitStatus成员来进行表示

static final class Node {
        //共享节点的标识
        static final Node SHARED = new Node(); 
        //独占节点的标识
        static final Node EXCLUSIVE = null;

        static final int CANCELLED =  1;
        static final int SIGNAL    = -1; 
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        
        /**
        * 表示节点当前的状态,状态对应着后续该节点能进行的操作
        * SIGNAL:表示当前节点的后续节点已经或者将要阻塞,那么当前节点进行
        *         release或则cancel的时候需要将后续节点进行唤醒
        * CANCELLED:表示当前节点已经被取消,并且该状态不可逆,只能等待被
        *            移出队列
        * CONDITION:条件队列的节点使用,在等待队列中不存在该类型节点
        * PROPAGATE:为了修复共享锁多个线程同访问的bug而引入的中间状态
        * 0:初始化默认值
        **/
        volatile int waitStatus;
        
        //前节点
        volatile Node prev;
        //后节点
        volatile Node next;
        //当前节点的线程
        volatile Thread thread;
        
        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    }

节点状态机

创建节点时
节点进入队列后,将前驱节点变更为SIGNAL
doReleaseShare进行释放
doReleaseShared进行释放
0
初始化
SIGNAL
PROPAGATE

共享模式队列操作

通过上面对队列结构的分析和节点node的描述,接下来就该看下队列的操作中是如何使用到队列结构与节点的。

共享模式-总体思路

原则:

  1. SIGNAL的节点需要对后续节点进行唤醒(PROPAGATE也会,单场景单一,只在替换哨兵节点的时候)
  2. CANCELLED的节点需要移除队列中
  3. 执行完毕的节点需要重置为0
  4. PROPAGATE的节点需要被转换成SIGNAL节点(涉及到共享锁的一个bug)

由第一个节点的waitStatus状态(SIGNAL)来判断是否进行后继节点的唤醒(出队),唤醒时机在释放(releaseShared)是进行。
对于CANCELLED节点,在获取(acquiredShared),释放(releaseShared)进行踢出队列。

第一种简单情形:

node3进入队列前:

node1:SIGNAL
node2:0

node3进入队列后:

node1:SIGNAL
node2:SIGNAL
node3:0

第二种包含CANCELLED节点
node3进入队列前:

node1:SIGNAL
node2:CANCELLED

node3进入队列后:

node1:SIGNAL
node3:0

共享模式-出入队

    public final void acquireShared(int arg) {
        //访问自定义的争夺方法,但没获取到
        if (tryAcquireShared(arg) < 0)
            //进行入队操作
            doAcquireShared(arg);
    }
    private void doAcquireShared(int arg) {
        //1.进行入队操作
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        //2.告知前驱节点
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                
                if (p == head) {
                    //2.1 前驱节点是头节点,则直接轮到当前节点,尝试获取资源
                    int r = tryAcquireShared(arg);
                    
                    if (r >= 0) {
                        //获取成功,则将自己设置为head节点,然后进行传播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //2.2 前驱节点不是头部节点,那么告诉前驱节点记得唤醒当前节点
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //如果出现异常情况,则取消当前节点。
            if (failed)
                cancelAcquire(node);
        }
    }

  1. 当队列初始化之后,头结点和尾节点不会为null,始终会保持存在头结点(哨兵节点),在共享模式下,当头结点获得之后,将让后继节点继续进行获取,并重置为head以达到共享的目的
addWaiter()
    private Node addWaiter(Node mode) {
        //1.创建入队的节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //2.尝试快速加入队列尾部
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //3.失败后则一直尝试入队(初始化链表or冲突过高才会需要进行自旋入队)
        enq(node);
        return node;
    }
    
    //不停尝试入队
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }    
setHeadAndPropagate()
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /**
        * 对于是否唤醒后续节点的条件
        * 1.如果通过tryAcquireShare()获取的值大于0,代表有多余资源空出来,进行唤醒后继节点
        * 2.如果节点为SIGNAL,本就代表需要唤醒后续节点
        * 3.如果节点为PROPAGATE,代表虽然我propagate不满足,可是在判断逻辑之后,有其它线程释放了资源,需要唤醒查看。
        * 2,3 合并成waitStatus<0
        * 引入PROPAGATE,可查看后面详细解释。
        **/
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                //对共享节点进行释放
                doReleaseShared();
        }
    }
doReleaseShared()

1.要么将后继节点唤醒,把自身状态重置为0
2.要么把自身状态修改为PROPAGATE,解决共享锁同时访问的并发问题


    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            //1.队列中存在多个节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //对后续节点进行唤醒
                if (ws == Node.SIGNAL) {
                    //将标识位重置为0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //对后继节点进行唤醒
                    unparkSuccessor(h);
                }
                //头部节点为0则将状态设置为PROPAGATE
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //2.如果只有head节点,代表队列空了
            if (h == head)                   // loop if head changed
                break;
        }
    }

PROPAGATE状态产生的原因

起因是由于共享锁,同时进行释放导致有资源的情况下线程需要持续等待。
bug地址

在没有PROPAGATE的情况下
假设:我们有2个资源,同时呢有2个线程已经持有了,有2个线程等待获取。

当node1进行释放时,唤醒node3:
在这里插入图片描述

node1释放,state+1。
node3获取,state-1。
这时候node3通过doAcquireShared获取资源为0(刚好够自己用,不需要通知后续节点),但是没有执行完将自己设置为头部。


当node3准备将自己设置为头部之前,node2进行释放:
在这里插入图片描述
尴尬的情况出现了,资源state为1,由于前面获取的资源为0,那么node3将自己设置为head之后不会对后续的节点进行唤醒(因为它认为资源是空,唤醒没有意义,但是在它变为head之前已经有其它持有资源的线程释放了资源)。


场景描述清楚了,那么解决方案就是新增一个PROPAGATE状态,将0变更为PROPAGATE状态表示在进行头部节点的替换过程中存在资源的释放,同时在setHeadAndPropagate()判断是否唤醒后续节点的条件加上,当head处于PROPAGATE状态时。

共享模式队列操作

跟共享差别不大,获取锁时不在有扩散行为(PROPAGATE),不再做分析。

ConditionObject条件队列

后续在聊

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-04-09 18:09:53  更:2022-04-09 18:12:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 6:01:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码