| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> Java知识库 -> ReentrantLock源码解析及ReentrantReadWriteLock源码解析 -> 正文阅读 |
|
[Java知识库]ReentrantLock源码解析及ReentrantReadWriteLock源码解析 |
??在我们工作中经常用ReentrantLock,感觉像使用redis加锁和解锁一样,但是其底层是如何实现的呢? 鉴于我对Doug Lea这个人的崇拜,因此大牛的东西,还是要去研究的,有人说,为人不识Doug Lea,学懂并发也惘然,那我们还是先来认识一下这个人吧。 ReentrantLock??ReentrantLock是一种基于AQS框架的应用实现,是JDK 中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全,而且它具有比synchronized更多的特性,比如它支持手动加锁与解锁 ,支持加锁的公平性。 ??话不多说,我们先来看一个例子,下面是ReentrantLock最简单的使用方式 。 public class ReentrantLockTest { public final static ReentrantLock lock = new ReentrantLock(); public static void main(String[] args) { try { lock.lock(); // 加锁 System.out.println("xxxxxxxxxxxxxxx"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock();// 解锁 } } } ??ReentrantLock是如何实现synchronized不具备的公平与非公平性呢?
除了Lock外,java.concurrent.util当中的同步器的实现如Latch,Barrier,BlockingQueue 等,都是基于AQS框架的实现
AQS 内部维护属性volatile int state(32位)
State三种访问方式 AQS 定义了两种资源共享的方式
??不同的自定义同步器共享资源的方式也不同,自定义同步器的实现时需要实现共享资源state的获取与释放方式即可,至于具体的线程等待队列的维护(如获取资源失败入队/ 唤醒出队等),AQS已经在顶层实现好了,自定义同步器实现时主要实现了以下几种方式 :
同步等待队列 条件等待队列?? Condition是一个多线程协调通信的工具类,使得某个线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会唤醒,从而重新争夺锁。 public ReentrantLock() { sync = new NonfairSync(); } ??我们知道ReentrantLock有公平锁和非公平锁,而ReentrantLock默认是实现了非公平锁。那么NonfairSync的结构是怎样的呢?请看下图。 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; protected AbstractQueuedSynchronizer() { } // 不管是条件队列,还是CLH等待队列 ,都是基于Node类的 // AQS 当中的同步等待队列也称为CLH队列,CLH队列是Craig,Landin,Hagersten 三人 // 发明的一种基于双向链表的数据结构队列,是FIFO先入先出线程等待队列,java中的CLH 队列是原CLH队列的一个变种, // 线程由原自旋机制改为阻塞队列机制 static final class Node { // 标记节点未共享模式 static final Node SHARED = new Node(); // 标记节点为独占模式 static final Node EXCLUSIVE = null; // 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待 static final int CANCELLED = 1; // 后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,将会通知手继节点 // 使后继节点,使后继节点的线程得以运行 static final int SIGNAL = -1; // 节点在等待队列中,节点的线程等待在Condition状态上,其他线程对Condition调用了signal()方法后, // 该节点会从等待队列中转移到同步队列 中,加入到同步状态的获取中 static final int CONDITION = -2; // 表示下一次共享同步状态获取将会被无条件地传播下去 static final int PROPAGATE = -3; // 标记当前节点的信号量状态(1,0,-1,2,-3)5 种状态,使用CAS更新状态,volatile保证线程的可见性,在高并发的场景下, // 即被一个线程修改后,状态会立马让其他线程可见 volatile int waitStatus; // 前驱节点,当前节点加入到同步队列中被设置 volatile Node prev; // 后继节点 volatile Node next; // 节点同步状态的线程 volatile Thread thread; // 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED 常量 // 也就是说点类型(独占和共享)和等待队列中的后继节点共用同一个字段 Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } // 返回当前节点的前驱节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 空节点,用于标记共享模式 Node() { // Used to establish initial head or SHARED marker } // 用于同步队列 CLH Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } // 用于条件队列 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } // 指向同步等待队列的头节点 private transient volatile Node head; // 指向同步等待队列的尾节点 private transient volatile Node tail; // 同步资源状态 private volatile int state; static final long spinForTimeoutThreshold = 1000L; } Unsafe应用解析??在看lock()方法之前,我们先来理解Unsafe。Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别,不安全操作的方法,如直接访问内存资源,自主管理内存资源等,这些方法在提升Java运行效率,增强Java语言底层资源操作能力方面起到了很大的作用,但是由于Unsafe类使用了Java语言拥有了类似C语言的指针一样的操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险,在程序中过度,不正确的使用Unsafe类会使得程序出错的概率变大,使得Java这种安全语言变得不再安全,因此Unsafe的使用一定要慎重 。 @CallerSensitive public static Unsafe getUnsafe() { Class var0 = Reflection.getCallerClass(); // 仅在引导类加载器 BootstrapClassLoad 加载才合法 if (var0.getClassLoader() != null) { throw new SecurityException("Unsafe"); } else { return theUnsafe; } } 如何获取Unsafe实例呢?
public class UnsafeInstance { public static Unsafe reflectGetUnsafe() { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); return (Unsafe) field.get(null); } catch (Exception e) { e.printStackTrace(); } return null; } } Unsafe 功能介绍
// 获取给定地址值,忽略修饰限定符的访问限制,与此类似的操作还有getInt,getDouble,getLong,getChar等 public native Object getObject(Object var1, long var2); // 为给定地址设置值,忽略修饰限定符的访问限制,与此类似的操作还有putInt,putDouble,putLong,putChar 等 public native void putObject(Object var1, long var2, Object var4); public native byte getByte(long var1); // 为给定的地址设置byte类型的值(当且仅当内存地址为allocateMemory分配时,此方法的结果才是确定的) public native void putByte(long var1, byte var3); // 分配内存,相当于C++的malloc 函数 public native long allocateMemory(long var1); // 扩充内存 public native long reallocateMemory(long var1, long var3); // 在给定的内存块中设置值 public native void setMemory(Object var1, long var2, long var4, byte var6); // 内存拷贝 public native void copyMemory(Object var1, long var2, Object var4, long var5, long var7); // 释放内存 public native void freeMemory(long var1); ??通常,我们在Java中创建的对象都处于堆内存(heap)中,堆内存是由JVM 所管控的Java进程内存,并且它们遵循JVM内存管理机制,JVM会采用垃圾回收机制统一管理堆内存,与之相对的是堆外内存,存在于JVM管控之外的内存区域,java中对堆外内存的操作,依赖于Unsafe提供的操作堆外内存native方法 。
典型应用 ??下图为某个AtomicInteger对象自增操作前后的内存示意图,对象的基地址 baseAddress=“0x110000”,通过baseAddress+valueOffset得到value的内存地址 valueAddress=“0x11000c”;然后通过CAS进行原子性的更新操作,成功则返回,否则 继续重试,直到更新成功为止。 // 取消阻塞线程 public native void unpark(Object var1); // 阻塞线程 public native void park(boolean var1, long var2); // 获得对象锁(可重入锁) public native void monitorEnter(Object var1); // 释放对象锁 public native void monitorExit(Object var1); // 尝试获取对象锁 public native boolean tryMonitorEnter(Object var1); ??方法park ,unpark即可实现线程的挂起与恢复,将一个线程进行挂起是通过park方法实现的,调用park方法之后,线程将一直阻塞直到超过或中断等条件出现,unpark可以终止一个挂起的线程,使其恢复正常。 典型应用??Java锁和同步器框架的核心类AbstractQueuedSynchronizer,就是通过调用LockSupport.park()和LockSupport.unpark()实现线程的阻塞和唤醒的,而LockSupport的park,unpark方法实际上是调用Unsafe的park,unpark方式来实现。 内存屏障 // 内存屏障,禁止load操作重排序,屏障前的load操作不能被重排序到屏障后,屏障后的load操作不能被重新排序到屏障前 public native void loadFence(); // 内存屏障,禁止store操作重排序,屏障前的store操作不能被重新排序到屏障后,屏障后的store操作不能被重新排序到屏障前 public native void storeFence(); // 内存屏障,禁止load,store 操作重新排序 public native void fullFence(); 典型应用在Java 8中引入了一种锁的新机制——StampedLock,它可以看成是读写锁的一个改
??上面这些关于AQS的相关知识来源于 图灵课堂 杨过老师的资料,当然啦,我不是图灵课堂的托,我只是说,他们的课确实有独到的见解,如果你有钱,并且有时间,同时你也是一个奋发向上的人,但你感觉自己已经达到了瓶颈,也就是工作中什么问题都能解决,但是别人一问你,比如Hash Map 的底层实现,ConcurrentHashMap的底层实现,线程池的底层实现,Spring 源码等,好像都不知道,但自己去研究,发现又困难重重,没有什么进展 ,这种情况可以到网上找一些好的培训班,并不是培训班就一定能提升你的技能,但是至少师傅将你引进门了,修行看个人 。所以这篇博客呢?还是要致敬 图灵课堂 ,因为有了前面的这些基础知识,在理解后面的源码,你才看得下去,不然,你会觉得默名其妙。 话不多说,我们还是切入正题 。 非公平锁??接下来,我们进入lock()方法的分析 。 public void lock() { // 在ReentrantLock构造函数中,默认sync为NonfairSync sync.lock(); } static final class NonfairSync extends Sync { // 先来看看非公平锁的lock()方法实现 final void lock() { // CAS 操作将state的值由0设置为1,state = 0 ,当前锁没有被其他线程获取 if (compareAndSetState(0, 1)) // 独占模式下,设置当前占有锁的线程 setExclusiveOwnerThread(Thread.currentThread()); else // 如果CAS 操作失败,表示有其他线程占用锁,请看没有获取到锁的逻辑 acquire(1); } protected final boolean tryAcquire(int acquires) { // 非公平锁尝试获取锁的逻辑 return nonfairTryAcquire(acquires); } } private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } // 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值。此操作具有易失性读写的内存语义。 // 参数: // 期望——期望值 // 更新- 新值 //回报: // 如果成功,则为true 。 false return 表示实际值不等于预期值。 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this // 从之前的基础知识 AtomicInteger对象自增操作前后的内存示意图中可以得知 // stateOffset为AbstractQueuedSynchronizer对象和其属性state的内存偏移地址 // 每次操作的就是对象地址 + stateOffset 的内存地址,expect 为期望内存的值 // update为更新之后的值,也就是说对象地址 + stateOffset 的内存地址的值为expect值,则用update替换掉expect, // 否则返回false ,大家要注意 ,这一步骤肯定是原子的 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread t) { exclusiveOwnerThread = t; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } } ??下面就是线程没有获取到锁的逻辑处理 public final void acquire(int arg) { // 尝试获取锁,这里我们进入非公平锁的尝试获取锁的逻辑 ,当然,后面也有公平锁尝试获取锁的逻辑 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ??请看下面代码,非公平锁尝试获取锁的逻辑 final boolean nonfairTryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取state的值,因为state用volatile修饰,因此其他线程修改,当前线程立即可见 // 为了避免不必要的自旋与等待,因此在没有获得到锁的情况下,在这里再次获取state的值 int c = getState(); // 如果state的值为0,表示之前占用锁的线程已经释放掉锁了,此时可以再次参与锁竞争 if (c == 0) { // 再次申请将state的值设置为1,如果CAS操作成功,表示获取到锁 if (compareAndSetState(0, acquires)) { // 设置当前线程为占有锁的线程 setExclusiveOwnerThread(current); return true; } } // 如果当前占有锁的线程就是自身线程 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 默认state 不能小于0,按道理 ReentrantLock 不可能出现小于0的情况 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 直接设置state的值为nextc ,这里就是ReentrantLock可入锁的实现 // 这里对于state的赋值为什么不用CAS操作呢?首先state本身就是volatile修饰,禁止了指令重排序 // 另外一方面,在同一个线程中,不可能存在 同时 对state设置 值的情况,总会一先一后, // 在happen-before 规则的定义限制下,代码的逻辑语义是顺序执行的 setState(nextc); // 同一个线程可重入锁 return true; } // 如果再次获取不到锁,同时当前获取锁的线程不是自身线程,则返回false return false; } ??我相信此时此刻,大家肯定看到了大牛的思想所在了,在获取不到锁的情况下,再次获取锁,如果还获取不到,则看当前获取到锁的线程是否是自身线程,如果是自身线程,支持锁的重入,state = state + 1 ,如果还是获取不到锁,则返回false ,那么再次获取不到锁,怎么办呢?请接着看下面代码 。 private Node addWaiter(Node mode) { // 以独占模式创建一个节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 如果当前队列中尾节点不为空 if (pred != null) { // 当前新创建节点的前驱节点为 之前的尾部节点 node.prev = pred; // CAS操作将当前新创建的节点设置为队列的尾部节点 if (compareAndSetTail(pred, node)) { // 如果CAS操作成功,则将之前队列的尾节点的next 指向当前新创建的节点 pred.next = node; return node; } } // 如果将新创建节点设置为队列的尾节点失败,也就是同时有其他线程也是调用lock()方法, // 此时需要进行自旋,将当前节点设置为队列的尾部节点,而上面代码块,提供了一个快速设置当前节点为队列尾节点的过程 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; // 如果队列为空,通过CAS操作设置队列的头节点,设置头节点的过程也可能存在并发操作 // 因此写了一个死循环,但是有没有发现,队列的头节点并不是我们新创建的节点,而是一个new 了一个Node()节点 // 为什么要这么做呢?后面的代码再来分析 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // 将当前节点的前驱节点设置为 之前的尾部节点,再进行CAS操作,将当前节点设置为尾部节点 if (compareAndSetTail(t, node)) { // 之前的尾部节点的next节点指向当前节点 t.next = node; return t; } } } } ??大家想过没有,为什么要加一个enq()方法呢?好像里面的代码差不多嘛 ,一方面为了保证当前节点一定会设置为队列的尾节点,因此在enq()方法中用了死循环,那么为什么不直接调用enq()方法即可,而在addWaiter()方法中加一层快速插入尾节点的操作呢? 聪明的小伙伴肯定发现了,因为在enq()方法中有一个t == null的判断,如果不在addWaiter()方法中加一层快速插入尾节点的操作,那么每次都需要判断 当头队列头节点是否存在的判断,这样影响性能,大家看到没有,大牛的代码就是这样为性能考虑。 ??从这里,如果当前线程没有获取锁,则创建一个node节点,加入到CLH队列尾部,加入到队列尾部又怎样呢?我们接着看代码 。 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 如果当前节点的前驱节点为CLH队列的头节点,则调用tryAcquire()再次获取锁 if (p == head && tryAcquire(arg)) { // 如果获取到锁了,则设置当前节点为队列的头节点,当node的thread和prev设置为空 setHead(node); p.next = null; // p节点的next节点为空,则p节点成为孤立的节点,jvm 很快发现并回收掉 // 设置当前获取锁成功 failed = false; return interrupted; } // 如果当前线程所在的节点不是头节点的next节点 ,之前我们知道,Node节点是一个线程为空的节点 // 我们需要将当前线程park住 if (shouldParkAfterFailedAcquire(p, node) && // park住当前线程 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 取消锁的获取 cancelAcquire(node); } } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } ??在park线程之前,我们先来看当前线程是否应该park private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 如果当前线程所在节点的前驱节点的waitStatus为-1 ,则当前线程可以park int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; // 如果当前线程的前驱节点的waitStatus大于0,则表示pred是被取消状态,则一直向前查找,直到 // 找到 waitStatus<=0 的节点为止,并用找到的pred指向当前node ,此时返回false。 // 这里做了两件事情,将node.prev 指向一个waitStatus <= 0 的pred ,同时将pred的next指向node // 如 A -> B -> C -> D , 同时 D -> C -> B -> A ,但是 B 和C 的Node的waitStatus 大于 0 ,则 // A->D ,D -> A ,此时虽然B.prev 还是指向A ,C的next 还是指向D,但也避免不了成为孤立节点的事实, // 因此B 和C 最终会被JVM回收掉。请看图1 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 将当前节点的前驱节点设置为-1 ,当然本方法还是返回false ,也就是还是可以去抢锁的机会 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } ??不知道大家看明白Doug Lea写这个方法的意图没有,先我们不来分析waitStatus为什么会大于0 ,后面的代码再来分析,我们先来看Doug Lea写这个方法的意图写这个方法的意图。请看下图 public class LockSupportTest { public static void main(String[] args) throws Exception{ Thread t = new Thread(new Runnable() { @Override public void run() { LockSupport.park(this); System.out.println("xxxxxxxxxxxx"); } }); t.start(); Thread.sleep(2000); System.out.println("yyyyyyyyyyyyyyyyyy"); LockSupport.unpark(t); } } 结果输出 : yyyyyyyyyyyyyyyyyy xxxxxxxxxxxx 从结果可以看出 ,程序被park住了,只有程序调用unpack(t)方法,park()后的代码才会执行。 private final boolean parkAndCheckInterrupt() { // park住当前线程 LockSupport.park(this); // 加一个interrupted()方法有什么意思呢? return Thread.interrupted(); } 关于interrupt(),interrupted()和isInterrupted()这三个方法的关系,请看这篇博客,写得很详细了,如果不懂,可以去看看 public static void main(String[] args) throws Exception{ Thread t = new Thread(new Runnable() { @Override public void run() { LockSupport.park(this); System.out.println("111111111111"); // 只要当前线程被其他线程设置中断标识,则park()将无效 LockSupport.park(this); LockSupport.park(this); System.out.println("xxxxxxxxxxxx"); } }); t.start(); // t 线程被其他线程设置中断标识 t.interrupt(); Thread.sleep(2000); System.out.println("yyyyyyyyyyyyyyyyyy"); LockSupport.unpark(t); } 结果输出 : 111111111111 xxxxxxxxxxxx yyyyyyyyyyyyyyyyyy 只要当前线程被其他线程设置了中断标识,即使再多的park()方法调用也无效。如果想让park()标识生效,怎么办呢?请看下面代码 public static void main(String[] args) throws Exception{ Thread t = new Thread(new Runnable() { @Override public void run() { LockSupport.park(this); System.out.println("111111111111"); // 加一个清除中断标识的方法后,park(this)将再次被阻塞住 boolean isInterrupted = Thread.interrupted(); System.out.println(isInterrupted); LockSupport.park(this); System.out.println("xxxxxxxxxxxx"); } }); t.start(); // t 线程被其他线程设置中断标识 t.interrupt(); Thread.sleep(2000); System.out.println("yyyyyyyyyyyyyyyyyy"); LockSupport.unpark(t); } 111111111111 true yyyyyyyyyyyyyyyyyy xxxxxxxxxxxx 从上面程序执行结果来看,interrupt()给线程设置中断标识,如果中断标识一直在的话,则park(this)方法将无用, 此时调用线程的 Thread.interrupted()方法,则清除掉中断标识,使得park(this)方法的调用会再次生效, 注意的是isInterrupted()方法是不会清除中断标识的,而Thread.interrupted()会清除中断标识,并返回true, 当然如果中断标点被清除掉,再次调用Thread.interrupted()方法,将返回false。 当然看一下源码就知道 interrupted()方法实现如下 public static boolean interrupted() { // isInterrupted()方法的参数为ClearInterrupted,是否清除中断标识 return currentThread().isInterrupted(true); } 而isInterrupted()方法实现如下 , public boolean isInterrupted() { return isInterrupted(false); } 当然,我们在这里不要被误导了,调用线程的interrupted()方法,将抛出InterruptedException ,这是一个错误的理解 ??我相信,此时此刻,你对parkAndCheckInterrupt()方法应该有了新的理解,如果当前线程在park()过程中,其他线程调用了线程的interrupt()方法,此时线程将从等待中苏醒,并且设置acquireQueued()方法中的interrupted变量为true,再次走acquireQueued()方法中的for循环代码块,如果当前节点的pred为头节点,并且再次通过CAS去抢锁,如果抢到了,则将当前节点置为头节点并返回true(如果线程是通过其他线程调用unpark(t)方法,默认返回的是false,或者线程一进入acquireQueued()方法,还没有被park(),就获取到了锁,也是返回false ),如果线程在获取锁的过程中被中断过,则外层acquire()方法中需要调用Thread.currentThread().interrupt(); 来恢复中断标识(因为在parkAndCheckInterrupt() 方法中调用了Thread.interrupted()清除过中断标识,因此需要恢复 ),当然,如果线程被其他线程调用了interrupt()方法,但是在for()循环过程中 依然没有获取到锁,终究是无用,最终还是会调用park()方法阻塞住,但是有一点不同,只要被其他线程调用过interrupt()方法,最终acquireQueued()方法将返回true, 外层方法将调用Thread.currentThread().interrupt(); 来恢复中断标识。 private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // 跳过祖先节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 记录前驱节点的next节点,当前当前的前驱节点可能是node的prev,也可能不是。 // 如果不是,则就是上面代码块跳过的,如图1一样 Node predNext = pred.next; // 将当前节点的状态设置大于 1 ,方便其他节点跳过我们 node.waitStatus = Node.CANCELLED; // 如果当前节点是尾节点,则设置当前尾节点为pred节点 if (node == tail && compareAndSetTail(node, pred)) { // 用CAS 操作,设置前驱节点的next节点为空,这样,当前节点就成为孤立的节点,将被JVM回收掉 compareAndSetNext(pred, predNext, null); } else { // 如果当前节点不是尾节点 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 如果当前节点不是头节点,前驱节点的waitStatus 为-1 // 并且前驱节点的线程不为空(因为可能存在并发情况,之前的pred变成了head节点) pred.thread != null) { // 记录当前node的next节点 Node next = node.next; // 如果当前node的next节点没有被取消,则通过CAS设置pred的next为当前node的next if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 如果当前前驱节点是头节点 ,或者当前节点的所有前驱节点都被取消掉了 unparkSuccessor(node); } node.next = node; // 自己指向自己,加快node节点被JVM回收 } } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 如果当前节点的next节点也是被取消状态,并且tail节点不为空,则从尾部向前遍历 // 找到离队头最近并且waitStatus 小于等于0的节点 ,并unpark()节点中的线程 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 当前节点的next节点中,找到第一个第一个waitStatus 小于等于0的节点,并unpark()节点中的线程 if (s != null) LockSupport.unpark(s.thread); }
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 通知head节点的next 节点的线程unpark() unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; // 如果释放锁的不是当前获得锁的线程,则抛出异常,这一点需要注意,如果别人写一个 // 笔试题,你相信大部分人会晕 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; // 如果c == 0 ,则说明锁释放完全 ,设置当前占用锁的线程为null setExclusiveOwnerThread(null); } setState(c); return free; } ??从tryRelease方法中可以得知, 释放锁的线程一定和加锁的线程是同一线程,lock()方法调用了几次,那么unlock()也需要调用几次 ,不然锁不会释放掉。如果锁释放成功,则调用unparkSuccessor()方法,通知当前节点的next节点unpark(),线程unpark()之后,就再次获得抢锁的机会。 公平锁static final class FairSync extends Sync { final void lock() { acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 如果当前CLH队列中没有别的线程在排队,则进行锁竞争,如果队列中有其他线程,则当前线程 // 加到队列的尾部 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } // 判断当前CLH队列中是否有别的线程在排队 public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } ?? 从上面的代码来看,上面的代码就是公平锁和非公平锁的最大区别。我们再回顾一下非公平锁的lock()方法。 final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } ??上面加粗代码就是非公平锁的多出的代码,为什么这么做呢?聪明的小伙伴肯定一想就知道了,每个新调用lock()方法的线程,都会和队列中的被唤醒节点的线程进行锁竞争,现实生活中什么意思呢?非公平锁,就相当食堂排队打饭,后面外面新来一个小伙伴,先就跑到窗口去和排在队头的步伙伴同时抢饭,当然食堂阿姨将饭给谁是随机的,此时新来的小伙伴可能抢到饭,如果抢到饭,则离开了,如果没有抢到饭,则排到队列的尾部。 是不是每新来一个小伙伴都会和排在队列中的第一个小伙伴抢饭,这样就不公平了。但是我们不要误解,以为非公平锁,当锁空出来时,队列中所有的节点的线程都会去争抢锁,这是错误的想法。当然如果没有其他线程调用lock()方法来抢锁,排在队列中的线程是按队列的先后顺序来执行的。因为排在队列中的线程是按队列的先后顺序来执行的,也可以说,非公平锁有一定的公平性, ??那么公平锁是如何实现的呢?还是用食堂打饭来举例,新来一个小伙伴,看当前是否有其他人在排队,如果有,则排到队列的尾部,当然,如果同时来了两个人,在插入队列尾部时,因为是采用CAS操作,也可能先来的小伙伴插入到后来的小伙伴的尾部,这和CPU调度相关,公平锁也并不一定准确的按时间的先后顺序执行,只能说整体上按时间的先后执行。可以说,公平锁也并非那样公平 。 ??我们使用ReentrantLock,当然离不开Condition的使用,接下来,我们看ReentrantLock是如何实现Condition源码的。 public class ConditionTest { public static ReentrantLock reentrantLock = new ReentrantLock(); public final static Condition condition = reentrantLock.newCondition(); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { reentrantLock.lock(); condition.await(); System.out.println("await after run "); } catch (InterruptedException e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { reentrantLock.lock(); condition.signal(); System.out.println("signal other thread "); } catch (Exception e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } } }).start(); } } ??在看源码实现之前,我们来看看reentrantLock.newCondition(),这个Condition到底是什么 ? final ConditionObject newCondition() { return new ConditionObject(); } ??newCondition()默认是创建的ConditionObject对象,而ConditionObject和Condition是什么关系呢? public class ConditionObject implements Condition, java.io.Serializable { // 条件队列的第一个节点 private transient Node firstWaiter; // 条件队列的最后一个节点 private transient Node lastWaiter; public ConditionObject() { } private static final int REINTERRUPT = 1; // 模式意味着在等待的过程中被中断将抛出 InterruptedException private static final int THROW_IE = -1; } ??但是我们在看条件队列前,还是要回顾一下之前杨过老师的条件队列图 await()方法??上面好像看不出什么东西,接下来,我们来看await()方法的源码实现。 public final void await() throws InterruptedException { // 如果在调用await()方法时,当前线程已经被设置了中断标识,将抛出InterruptedException异常 if (Thread.interrupted()) throw new InterruptedException(); // 添加当前节点到等待队列尾部 Node node = addConditionWaiter(); // 通知等待队列中的线程,unpark()它,并返回当前线程的state值 int savedState = fullyRelease(node); int interruptMode = 0; // 当前节点是否在同步队列中,从 addConditionWaiter()方法可以看出,第一次进入时,当前节点肯定不在同步队列中, // 而是在等待队列中,但当线程被唤醒后,当前Node一定在同步队列中,这个需要结合transferAfterCancelledWait()方法来看 while (!isOnSyncQueue(node)) { // 将调用wait()方法的线程阻塞住,并释放锁 LockSupport.park(this); // 正常调用signal()或signalAll()方法的,则interruptMode等于1或0 // 如果通过t.interrupt()让线程继续执行,则interruptMode = -1 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 获取锁资源,注意,如果外部线程通过调用t.interrupt();方法设置了中断标识,恢复当前线程的执行 // 但是在checkInterruptWhileWaiting()方法中,通过 Thread.interrupted()这一行代码, // 已经将中断标识已经被清除了,因此在acquireQueued()方法中,如果当前节点不是头节点 // 或者没有获取到锁,则在parkAndCheckInterrupt()方法中,仍然被park()住 // 区别在于,之前是在等待队列中被阻塞住,现在是在同步队列中被阻塞住 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 如果当前节点的nextWaiter不为空,处理掉所有非等待中的节点,当然自己节点也会被从等待队列中清除 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) // 根据 interruptMode 判断是否需要抛出异常 reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { // 如果有节点状态不是CONDITION,则移除它 unlinkCancelledWaiters(); t = lastWaiter; } // 创建新的节点,如果 firstWaiter 为空,则用首节点指向当前新创建的节点,否则 // 用尾部节点指向当前新创建的节点,并且当前新创建的节点为尾节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } ??下面就是移除队列中waitStatus状态不是CONDITION的节点。 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; }else{ trail = t; } t = next; } } ??上面的代码是写得非常好,也写得非常的精简,我也是看了半天才明白他的意思,上面的代码分为三种情况情况来考虑。
??请看下图 final int fullyRelease(Node node) { boolean failed = true; try { // 获取当前线程的state的值 int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) // 如果通知其他线程失败,则当前线程等待状态为取消状态 node.waitStatus = Node.CANCELLED; } } ??判断当前线程是否是同步队列 final boolean isOnSyncQueue(Node node) { // 如果当前node的waitStatus 为CONDITION ,或者当前node的前驱节点为空 // 则肯定不是同步队列,因为同步队列的状态不可能为CONDITION ,并且同步队列的头节点为一个线程为空的节点 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果node的next不为空,则肯定是同步队列,因为等待队列没有设置next属性 if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); } // 否则从尾部向前遍历整个同步队列,看当前节点是否在同步队列中存在 private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } ??从上面的逻辑看出什么没有,如果当前节点在同步队列中,那么他的prev节点肯定不为空,因此增加了快速判断机制,我猜这也是为什么Doug Lea为什么设计同步队列有头节点,而等待队列没有头节点的原因吧。 ??下面我们来看,非正常unpark()调用后,线程恢复执行。 private int checkInterruptWhileWaiting(Node node) { // 如果当前线程被其他线程调用了 t.interrupt(); 方法 ,则进一步判断 // 否则返回0 return Thread.interrupted() ? // 如果返回true ,则表示 t.interrupt(); 成功了,否则表示 signal() 或signalAll()方法方法 // CAS 设置waitStatus成功 ,并将当前等待队列的node加入到了同步队列 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } final boolean transferAfterCancelledWait(Node node) { // 将当前node的waitStatus设置由CONDITION变成0 // 如果设置成功,则将当前node加入到同步队列中 // 如果设置失败,肯定其他线程也调用了signal() 或signalAll()方法,已经将node的状态变成了0的状态 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 如果CAS 设置waitStatus的状态为0,设置成功,则将当前节点加入到同步队列的尾节点 // 并且返回true,则表示当前wait方法,是通过 t.interrupt(); 唤醒的,会抛出InterruptedException异常 enq(node); return true; } // 如果当前线程被其他线程调用了interrupt(); 方法,但同步其他线程也调用了signal() 或signalAll()方法 // 并且signal()方法通过CAS操作设置waitStatus状态成功。 while (!isOnSyncQueue(node)) // 如果当前Node被唤醒,但是还没有加入同步队列中,需要先释放CPU的时间片 // 直到同步队列已经加入了当前节点 Thread.yield(); return false; } private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { // 如果是t.interrupt(); 使得当前线程恢复执行,则抛出InterruptedException if (interruptMode == THROW_IE) throw new InterruptedException(); // 即使其他线程调用了t.interrupt(); 使得当前线程恢复执行,但最终还是signal()或signalAll() // 竞争设置waitStatus的值成功,此时恢复中断标识即可,代码能执行到这里,肯定也是获取到锁资源的 else if (interruptMode == REINTERRUPT) // 恢复中断标识 selfInterrupt(); } ??我相信此时此刻,大家对wait()方法已经有了深刻的理解,但是还有一点不明白,为什么总是要调用selfInterrupt();方法恢复中断标识呢?聪明的小伙伴肯定想到了,如果wait()方法获得了锁继续执行,不能因为在wait()方法内部调用了Thread.interrupted() 方法将其他线程调用了t.interrupt();设置的中断标识给清除了吧,如果在wait()后面的代码块还需要这个中断标识怎么办呢?所以对于这种情况,Doug Lea也帮我们想到了,如果我在内部wait()方法或lock()方法内部调用了Thread.interrupted()方法清除过中断标识,那么在退出wait()或lock()方法之前调用 selfInterrupt();方法帮你恢复不就可以了不,这样也不影响后面程序执行逻辑。 signal()和signalAll()方法??接下来,我们来看signal()方法的执行。 public final void signal() { // 如果执行signal()方法的非当前线程,则抛出IllegalMonitorStateException异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 将等待队列的首节点 firstWaiter唤醒 if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; //如果transferForSignal()方法返回false,则遍历整个等待队列 } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { // 将当前node的waitStatus由CONDITION设置为0 // 为什么会有失败的情况呢? 我们在分析await()方法时分析过,如果其他线程在调用t.interrupt();方法时 // 又有另外的线程调用了signal()方法,此时就会存在CAS设置waitStatus状态的锁竞争 ,因此下面的代码可能有失败的情况 , // CAS操作失败,返回false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 通过CAS操作,将当前节点加入到队列的尾节点,返回的p为当前node的前驱节点 Node p = enq(node); int ws = p.waitStatus; // 如果前驱节点是取消状态或CAS设置waitStatus的状态失败(如此时前驱节点可能被设置为无效状态1 ) , // 则将当前节点线程 unpark() if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } ??接下来,我们来看signalAll()方法具体实现。 public final void signalAll() { // 如果调用signalAll()方法的非当前线程,则抛出IllegalMonitorStateException if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; // 遍历整个等待队列,将等待队列中所有的节点都加入到同步队列中 transferForSignal(first); first = next; } while (first != null); } ??我相信此时此刻,大家对condition的原理已经有了深刻理解了。既然深刻理解,那我们来看看下面几个面试题,当然是我自己想的,如果你来我们公司面试,说不定就会遇到 。 第一题目 public class ReentrantTestxx { public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); public static void main(String[] args) throws Exception { new Thread(new Runnable() { @Override public void run() { try { lock.lock(); condition.await(); System.out.println("1"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }).start(); Thread.sleep(200); new Thread(new Runnable() { @Override public void run() { try { lock.lock(); condition.await(); System.out.println("2"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }).start(); Thread.sleep(200); new Thread(new Runnable() { @Override public void run() { try { lock.lock(); condition.signal(); System.out.println("3"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }).start(); } } c a 答案输出 c ,a ??为什么呢? 第一个子线程加入到等待队列中释放锁,等待了200毫秒,第二个子线程也加入到等待队列中,并释放锁,再过200毫秒,第三个子线程调用获得锁,并调用singal()方法,将等待队列中第一个节点的线程加入到同步队列,也就是第一个线程所在的节点加入同步队列 ,此时同步队列中并没有其他节点,因此unpark()掉第一个子线程,虽然unpark()了,因为第三个子线程调用singal()方法后并没有释放锁,所以在接下来的acquireQueued()方法中仍然会park()住,因此先打印c,线程3 unlock()后,通知同步队列中的首节点的next节点获取锁,此时第一个线程获得锁,打印出a,而第二个子线程的节点因为没有被其他线程signal()或signalAll(),所以一直还在等待队列中等待。所以b不会打印 。请看下图 public class ReentrantTestxx { public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); public static void main(String[] args) throws Exception { Thread t1 = new Thread(new Runnable() { @Override public void run() { try { lock.lock(); condition.await(); System.out.println("a"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }); t1.start(); Thread.sleep(200); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { lock.lock(); Thread.sleep(200); condition.signal(); System.out.println("b"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }); t2.start(); Thread.sleep(50); Thread t3 = new Thread(new Runnable() { @Override public void run() { try { lock.lock(); System.out.println("c"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }); t3.start(); } } 上面结果输出 b c a ??线程1加入到等待队列中,并释放锁,等待200毫秒,线程2启动,获得锁,等待200毫秒,此时线程3启动,但是线程3并没有获得锁,线程3被加入到同步队列的首节点的next节点,线程2睡200毫秒后,signal()其他线程,此时线程1从等待队列转移到同步队列,但是线程2依然没有释放锁,继续执行,打印b, 线程1和线程3只能继续park()住,最后线程2 unlock()代码,通知同步队列的首节点的next节点的线程获取锁,此时线程3先执行,打印c,线程3执行完毕,释放锁,通知同步队列中的线程1执行,此时打印a 。 public class ConditionTest2 { public static ReentrantLock reentrantLock = new ReentrantLock(); public final static Condition condition = reentrantLock.newCondition(); public static void main(String[] args) throws Exception { Thread t1 = new Thread(new Runnable() { @Override public void run() { try { reentrantLock.lock(); condition.await(); System.out.println("await after run "); } catch (InterruptedException e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } } }); t1.start(); Thread.sleep(100); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { reentrantLock.lock(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } condition.signal(); System.out.println("signal other thread "); } catch (Exception e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } } }); t2.start(); Thread.sleep(100); t1.interrupt(); } } signal other thread java.lang.InterruptedException at java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052) at com.snc.test.ConditionTest2$1.run(ConditionTest2.java:20) at java.lang.Thread.run(Thread.java:745) 代码始终输出上面的结果 为什么呢? 线程1获取到锁,调用await()方法,线程1被加入到等待队列中,此时睡眠100毫秒后,线程2 获得锁,立即睡眠2000毫秒,在这个过程中,主线程睡眠100秒后,调用线程1的interrupt()方法,此时线程1 unpark()将没用了,线程1被从阻塞队列转移到同步队列中, 但此时线程2并没有释放锁,因此线程1还是被park()住,先打印signal other thread ,线程2进入finally代码,调用unlock()方法释放锁,释放锁后将通知同步队列的节点竞争锁,此时线程1竞争到锁接着执行,因为是通过中断才使得线程得以恢复执行,此时interruptMode == THROW_IE,在reportInterruptAfterWait()方法中将抛出异常。 public class ReentrantTestxx2 { public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); public static void main(String[] args) throws Exception { Thread t1 = new Thread(new Runnable() { @Override public void run() { try { lock.lock(); condition.await(); System.out.println("a"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }); t1.start(); Thread.sleep(200); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { lock.lock(); condition.await(); System.out.println("b"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }); t2.start(); Thread.sleep(200); Thread t3 = new Thread(new Runnable() { @Override public void run() { try { lock.lock(); condition.signalAll(); System.out.println("c"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }); t3.start(); } } c a b 理解了之前的几个例子,再来看上面这个例子,是不是轻而易举了,线程1被加入到等待队列中,200毫秒后,线程2也被加入到等待队列中,200毫秒后,线程3获得锁,调用signalAll()方法 ,将等待队列中的线程1,线程2 依次转移到同步队列中,线程3执行完,通知同步队列中的线程1执行,执行完毕后,再通知同步队列的线程2执行,因此依次打印出c ,a ,b public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); long lastTime = System.nanoTime(); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 如果nanosTimeout <= 0 时,则直接将等待队列的节点加入到同步队列中 // 这里分为两种情况, // 1.传入的nanosTimeout <=0 // 2. 在下面parkNanos(this, nanosTimeout)之后 ,重新计算nanosTimeout后小于0 // 上面两种情况 ,任何出现一种,则当前节点将从等待队列转移到同步队列中 if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } // 当前线程阻塞 nanosTimeout 纳秒后自动唤醒 // 这里需要注意的一点是,如果nanosTimeout 为1000纳秒,CPU并不能准确的在1000 纳秒后恢复线程 // 可能在999纳秒返回 ,也可能是1001纳秒后返回 LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; long now = System.nanoTime(); // 如果CPU在999纳秒后返回 ,则此时nanosTimeout = 1000 - 999 = 1 // nanosTimeout 依然大于0,此时需要再次进入循环,调用parkNanos(this, 1) // 直到 nanosTimeout <= 0 时,则将当前节点加入到同步队列中, // 从实现来看,肯定能保证需要等待时间 大于 等于nanosTimeout 纳秒后,才将当前节点加入到同步队列中 nanosTimeout -= now - lastTime; lastTime = now; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); // 返回在同步队列等待的时间 return nanosTimeout - (System.nanoTime() - lastTime); } 上面黑色加粗的代码,就是和wait()方法不一样的地方,从上面的代码来看呢?我也没有什么好说的了,觉得大牛就是大牛,写出来的代码就是既精简,又功能全面 。 ??这个方法和awaitNanos()实现大同小异,因此就不再赘述 。 public final boolean await(long time, TimeUnit unit){ ... } ??接下来,来看看awaitUninterruptibly()方法 。 public class ConditionTest3 { public static ReentrantLock reentrantLock = new ReentrantLock(); public final static Condition condition = reentrantLock.newCondition(); public static void main(String[] args) throws Exception { final Thread t1 = new Thread(new Runnable() { @Override public void run() { try { reentrantLock.lock(); condition.awaitUninterruptibly(); System.out.println(Thread.interrupted()); System.out.println("await after run "); } catch (Exception e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } } }); t1.start(); Thread.sleep(100); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { reentrantLock.lock(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } condition.signal(); System.out.println("signal other thread "); } catch (Exception e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } } }); t2.start(); Thread.sleep(100); t1.interrupt(); } } signal other thread true await after run ??在这个例子中,发现程序并没有打印出异常,并正常从等待中唤醒。那awaitUninterruptibly()方法和await()方法有什么区别呢?请看下面代码 public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; // 当前线程退出while()的唯一办法就是其他线程调用signal()或signalAll()方法 // 将当前线程从等待队列转移到同步队列中 while (!isOnSyncQueue(node)) { LockSupport.park(this); // 如果其他线程调用了t1.interrupt();方法,当前线程并不会像await()方法那样抛出异常 // 只是记录当前的interrupted值,方便后面恢复中断标识 if (Thread.interrupted()) interrupted = true; } // 当前线程从同步队列中获得锁, // 这里分3种情况, // 1.在调用acquireQueued()方法过程,被中断了,无论上面while()代码块中是否被中断,都需要恢复中断标识 // 2.acquireQueued()调用过程中没有被中断,但在上面while()循环中被中断过,interrupted也为true,也会恢复中断标识 // 3.acquireQueued()没有被中断过,并且在上面while()循环中也没有被中断,则不需要恢复中断标识 if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } ReentrantReadWriteLock源码解析??在阅读ReentrantReadWriteLock源码之前,我们还是来看一个例子 public class ReentrantReadWriteLockTest { public static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); public static final Lock readLock = readWriteLock.readLock(); public static final Lock writeLock = readWriteLock.writeLock(); public static void main(String[] args) { Thread t1 = new Thread(new Runnable() { @Override public void run() { readLock.lock(); System.out.println("xxx"); readLock.unlock(); } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { writeLock.lock(); readLock.unlock(); } }); t1.start(); t2.start(); } } ??在看源码之前,我们先来了解读写锁的几个基本功能,如果有线程占用读锁,允许其他线程再次获取读琐,但不允许写锁获取,如果有写锁,不允许其他任何线程再获取锁。 允许锁降级,写锁可以转化为读锁,不允许锁升级,读锁不能升级为写锁,上面这几点是我们研究读写锁应该弄明白的。 ??接下来,我们继续来看锁的代码实现。 readLock.lock()static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int sharedCount(int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = Thread.currentThread().getId(); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null; private transient int firstReaderHoldCount; public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) // 将当前线程加入到同步队列中 doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { // 获取当前线程 Thread current = Thread.currentThread(); // 获取state状态 int c = getState(); // 判断是否有写锁占用,如果有写锁占用exclusiveCount(c) !=0 if (exclusiveCount(c) != 0 && // 如果占用写锁的线程不是自身,则返回负1,并将当前线程加入到同步队列中 getExclusiveOwnerThread() != current) return -1; // 看当前是否有读锁占用 int r = sharedCount(c); // 如果读锁不需要阻塞 if (!readerShouldBlock() && // 判断当前获取读锁次数是不是少于 2 ^ 16 -1 ,也就是 65535 r < MAX_COUNT && // CAS设置c = c + 65536 ,为什么需要这么做,后面我们来分析 compareAndSetState(c, c + SHARED_UNIT)) { // 如果当前线程是第一个获取读锁的,设置firstReader为当前线程,firstReaderHoldCount持有 //线程数为1 ,为什么要给firstReader和firstReaderHoldCount赋值呢?主要是为了给调用方法提供api // 调用方可以通过方法获取当前线程获取读锁的次数 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { // 如果当前线程是第二个获取读锁的,当然第一个获取读锁的线程此时还没有释放读锁 // 此时我们需要用readHolds 来记录当前线程获取读锁的次数,可以看一下readHolds实际上是一个ThreadLocal变量 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); // 当前线程获取锁的次数 + 1 rh.count++; } // 返回1 ,表示当前线程已经获得了读锁了 return 1; } // 当前同步队列的第一个节点是写线程获取锁时 return fullTryAcquireShared(current); } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } // 如果同步队列的第一个节点不为空并且其nextWaiter != SHARED // 则表明读需要阻塞 final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; } ??可能大家在看exclusiveCount()方法和sharedCount()方法有点晕哦,为什么c & (1 << 16 -1 ) 如果大于0 , 继然懂了整形变量怎样分配给相应的锁了,那我们来看看例子。 假如读锁获得锁,c = c + 2 ^ 16 = 0 + 2^ 16 = 65536 此时state 等于 0000 0000 0000 0001 0000 0000 0000 0000 那么 state & 2 ^ 16 -1 0000 0000 0000 0001 0000 0000 0000 0000 & 0000 0000 0000 0000 1111 1111 1111 1111 = 0000 0000 0000 0000 0000 0000 0000 0000 = 0 ,表示肯定没有写锁获得锁。 那么我们再来看一个例子,如果锁被写锁占用,则state = 1 ,对应int的二进制如下 0000 0000 0000 0001 0000 0000 0000 0001 与2 ^ 16 - 1相与 0000 0000 0000 0001 0000 0000 0000 0001 & 0000 0000 0000 0000 1111 1111 1111 1111 = 0000 0000 0000 0001 0000 0000 0000 0001 也就是1 & (2 ^ 16 - 1 ) = 1 > 0 ,此时应该明白了,如果 state & (2^16 -1 ),如果大于0,当前锁肯定被写锁占用, 如果等于0 ,不好说,可能当前锁被读锁占用,也可能没有没有任何线程占用锁(因为0 & (2 ^ 16 - 1 ) = 0 ), 因此,如果要判断当前锁有没有被线程占用,只能通过另外的方法 ,Doug Lea 用 (state >>> 16 )来实现,如果state 无 符号右移16大于0 ,则表示当前线程被读锁占用,并且得到的值就是读锁占用的次数 。如果线程获取了读锁两次 则 state = 0 + (2 ^ 16 ) + (2 ^ 16 ) = 2 ^ 17 ,对应的二进制如下 0000 0000 0000 0010 0000 0000 0000 0000 将2 ^ 17 右移16 位,得 0000 0000 0000 0000 0000 0000 0000 0010 = 2 是不是锁被获取了两次。 ??我相信此时此刻,大家应该明白Doug Lea的设计意图了,接下来,我们继续看fullTryAcquireShared()方法。 public class ReentrantReadWriteLockTest2 { public static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public static final Lock readLock = readWriteLock.readLock(); public static final Lock writeLock = readWriteLock.writeLock(); public static void main(String[] args) throws Exception{ final long startTime = System.currentTimeMillis(); Thread t1 = new Thread(new Runnable() { @Override public void run() { readLock.lock(); System.out.println("aaaaaa"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("aaaaaa" + (System.currentTimeMillis() - startTime)); readLock.unlock(); } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { writeLock.lock(); System.out.println("bbbbbb"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("bbbbbb"+ (System.currentTimeMillis() - startTime)); writeLock.unlock(); } }); Thread t3= new Thread(new Runnable() { @Override public void run() { readLock.lock(); System.out.println("cccccc"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("cccccc" + (System.currentTimeMillis() - startTime)); readLock.unlock(); } }); t1.start(); Thread.sleep(100); t2.start(); Thread.sleep(100); t3.start(); } } 可能有很多人会想,上面例子应该是先输出aaaaaa ,紧接着输出cccccc ,过两秒后,再输出bbbbbb吧。 如果锁被读锁占用,此时接着继续有线程来获取读锁,应该获取得到的哈,按常理是如此,但实际上并非如此。 源码实际上,如果有一个线程获取读锁,接着有线程来获取写锁,写线程肯定获取不到锁,只能将当前线程加入到同步队列, 接着继续有线程来获取读锁,即使当前锁被读线程占用,但发现同步队列head节点的next节点有nextWaiter != SHARED 当前获取读锁的线程将被加入同步队列并阻塞。因此,本例中输出的结果如下。 aaaaaa aaaaaa2005 bbbbbb bbbbbb4007 cccccc cccccc6008 ??通过上面的例子,你已经明白为什么readerShouldBlock()方法会返回true了吧。 接着我们看fullTryAcquireShared()方法的实现。 final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); // 如果等待队列的第一个节点已经获得了写锁,并且获得写锁的线程不是自身线程 // 则直接返回-1,将当前线程加入到同步队列中 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // 同步队列的第一个节点是获取锁写的节点 } else if (readerShouldBlock()) { // 如果当前第一次获得读锁的线程是自身线程,则不需要阻塞 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 这里有另外一种考虑,如果当前线程并不是第一次获得读锁线程,但是在写线程获取锁之前 // 已经获得过读锁,也就是写线程在加入同步队列之前当前线程获取过读锁 ,则当前线程也不应该被阻塞 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } // 如果当前线程在写线程获取锁之前,从来没有获取过读锁,则当前线程需要加入到同步队列中 if (rh.count == 0) return -1; } } // 如果获取读锁的次数超过65536,则抛出异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 当前读锁获取成功,对state 进行CAS操作 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); // 记录当前线程获取读锁的次数 rh.count++; cachedHoldCounter = rh; // cache for release } // 当前线程获取读锁成功 return 1; } } } ??当然不知道大家理解上面加粗注释的含义了, 我们还是先一个例子。 public class ReentrantReadWriteLockTest3 { public static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public static final Lock readLock = readWriteLock.readLock(); public static final Lock writeLock = readWriteLock.writeLock(); public static void main(String[] args) throws Exception{ final long startTime = System.currentTimeMillis(); Thread t1 = new Thread(new Runnable() { @Override public void run() { readLock.lock(); System.out.println("aaaaaa"); sleep(2000); System.out.println("aaaaaa" + (System.currentTimeMillis() - startTime)); readLock.unlock(); } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { writeLock.lock(); System.out.println("bbbbbb"); sleep(2000); System.out.println("bbbbbb"+ (System.currentTimeMillis() - startTime)); writeLock.unlock(); } }); Thread t3= new Thread(new Runnable() { @Override public void run() { readLock.lock(); System.out.println("cccccc"); sleep(2000); System.out.println("cccccc" + (System.currentTimeMillis() - startTime)); readLock.lock(); System.out.println("再次获取cccccc"); sleep(2000); System.out.println("再次获取cccccc" + (System.currentTimeMillis() - startTime)); readLock.unlock(); readLock.unlock(); } }); t1.start(); Thread.sleep(100); t3.start(); Thread.sleep(100); t2.start(); } public static void sleep( long time ){ try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } } 执行结果: aaaaaa cccccc aaaaaa2007 cccccc2105 再次获取cccccc 再次获取cccccc4108 bbbbbb bbbbbb6111 ??这个例子什么意思呢?首先t1启动,过100毫秒, 此时t3也启动,此时t3线程获得共享锁,经过100毫秒后, private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 如果当前节点的前驱节点是head节点 if (p == head) { // 再次尝试获取共享锁 int r = tryAcquireShared(arg); // 如果大于 1 ,则表示共享锁获取成功 if (r >= 0) { // 如果当前节点的next 节点也是读锁,则通知它,也就是传递当前共享锁 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 看当前节点的前驱节点的waitStatus 是否等于-1,如果不等于,则将当前节点的前驱节点 // 状态设置为-1,再一次循环,如果等于-1,则当前线程park()住 ,之前分析过, // shouldParkAfterFailedAcquire()方法的设计,可能就是为了贼心不死 if (shouldParkAfterFailedAcquire(p, node) && // 如果当前线程的前驱节点的waitStatus = -1 ,则将当前线程park()住 parkAndCheckInterrupt()) // 用户恢复中断标识,用于被 锁包含的代码块中利用中断标点来处理业务逻辑 interrupted = true; } } finally { if (failed) cancelAcquire(node); } } ??接下来,我们看共享锁如何传递 private void setHeadAndPropagate(Node node, int propagate) { // 设置当前节点为头节点 Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; // 如果node的next不是读锁线程节点,则不会调用doReleaseShared()方法继续unpark() next node if (s == null || s.isShared()) // 释放共享锁 doReleaseShared(); } } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果当前头节点的状态为waitStatus 为-1,则将其设置为0 ,并通知他的next 节点所在线程unpark() if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 通知当前线程的next()节点所在线程unpark() unparkSuccessor(h); } // 如果h线程为0,则将其设置为 -1 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果h == head ,则退出循环 if (h == head) // loop if head changed break; } } ??上面这段代码,不多,从字面意思理解呢?也不难,但是真正的含义确令人费解,当然这段代码,我也是看了两天,才真正的明白其含义,先看下图 ??在上面的分析过程中,有没有发现Doug Lea的良苦用尽呢?
??代码看到这里,我相信你对大牛思想有所理解 。 ??我相信此时此刻,你对读锁的获取已经有了深刻的理解了,下面来分析读锁的释放 。 public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 如果当前线程是第一个获取读锁的线程 if (firstReader == current) { // 如果持有读锁的次数为1,是直接将第一个获取读锁的线程设置为空,主要是其他 // 地方到了firstReader == current的判断 if (firstReaderHoldCount == 1) firstReader = null; else // 否则当前线程持有读锁的次数减1 firstReaderHoldCount--; } else { // 如果当前线程不是每一个获取读锁的线程 HoldCounter rh = cachedHoldCounter; //从threadlocal中获取当前线程获取读锁的次数 if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); int count = rh.count; if (count <= 1) { // 如果持有读锁的次数为1 ,释放锁时,则将当前线程从threadLocal变量中移除 // 这里可以看到大牛细腻的思想,remove()掉,减少内存的占用 readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } // 当前线程持有锁的次数减少1 --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; // 使用死循环 加 CAS 操作,保证state 的正确性, // 因为你可能在一个线程中同时多次unlock()方法的调用 if (compareAndSetState(c, nextc)) return nextc == 0; } } ??上面的代码,上面的代码原理很简单,就是减少记录当前线程获取锁的次数,CAS 操作,减少state 变量,但是从细微处看到大牛的思想,我觉得这是必要的。接下来就是doReleaseShared()方法调用,上面做过详细分析,这里就不再赘述了。 public void lock() { sync.acquire(1); } public final void acquire(int arg) { // 如果写锁获取不成功,则将当前线程加入到同步队列中 //其他方法都一样,只有tryAcquire()方法有区别,下面我们来分析写锁tryAcquire()方法的实现 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); // 判断当前是否有线程已经获取了写锁 int w = exclusiveCount(c); // 如果c !=0 ,则表明已经有线程获得了锁 if (c != 0) { // 如果w == 0 ,则表明是有线程获取了读锁,但获取读锁的线程不是自身线程,则需要将当前线程加入到同步队列中 if (w == 0 || current != getExclusiveOwnerThread()) return false; // if (w > 0 && current == getExclusiveOwnerThread()) 才会走到如下代码 // 也就是说,获取写锁的是当前线程 if (w + exclusiveCount(acquires) > MAX_COUNT) // 如果获取写锁的次数超过2^16次方,则抛出异常 throw new Error("Maximum lock count exceeded"); // 设置state的状态,同样这里没有用CAS操作的原因,是因为获取写锁的线程是当前线程 setState(c + acquires); return true; } // 如果是非公平锁,则写锁不需要被阻塞,如果是公平锁,则当前同步队列中如果第一个节点的线程不是自身,则需要被阻塞 if (writerShouldBlock() || // 如果CAS 抢锁失败,也进入阻塞 !compareAndSetState(c, c + acquires)) return false; // 抢锁成功,设置获取锁的线程为当前线程 setExclusiveOwnerThread(current); return true; } ??接下来,来看写锁的释放。 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 通知当前节点的next节点unpark(),当然next 的waitStatus 不能为1 (也就是取消状态) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { // 如果释放锁的线程不是自身线程,将抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; // 如果写锁当前的state = 1,则将当前获取锁的线程置为空 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } ??上面的释放锁的代码和ReentrantLock基本是一样,也没有什么说的,只是有些需要注意,发现有些时间设置state状态用CAS操作,有些时候又是直接set ,大家发现规律没有,凡是存在多线程并发操作的,都用CAS,凡是能确定只有自身才能操作的代码,就用普通的state,这是需要注意的,因为CAS操作的性能肯定不如直接set操作,这也是作者在性能上的优化吧。 ??当然,我们再来看读锁的另外一个方法tryLock() final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } } ??上面方法的注释我就不写了,之前已经分析过了,大家发现和读锁的lock()方法有什么区别没有。 public class ReentrantReadWriteLockTest4 { public static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public static final Lock readLock = readWriteLock.readLock(); public static final Lock writeLock = readWriteLock.writeLock(); public static void main(String[] args) throws Exception{ final long startTime = System.currentTimeMillis(); Thread t1 = new Thread(new Runnable() { @Override public void run() { readLock.lock(); System.out.println("aaaaaa"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("aaaaaa" + (System.currentTimeMillis() - startTime)); readLock.unlock(); } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { writeLock.lock(); System.out.println("bbbbbb"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("bbbbbb"+ (System.currentTimeMillis() - startTime)); writeLock.unlock(); } }); Thread t3= new Thread(new Runnable() { @Override public void run() { // readLock.lock(); readLock.tryLock(); System.out.println("cccccc"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("cccccc" + (System.currentTimeMillis() - startTime)); readLock.unlock(); } }); t1.start(); Thread.sleep(100); t2.start(); Thread.sleep(100); t3.start(); } } 输出结果: aaaaaa cccccc aaaaaa2006 cccccc2206 bbbbbb bbbbbb4211 ??之前也有一个例子,但这个例子和之前的例子的唯一区别就是上面的加粗代码,只有一行代码不一样,输出结果就完全不一样,从源码来看读锁的lock()方法和tryLock()有什么区别呢?其实区别在于,lock()方法需要判断当前同步队列中的第一个节点是否有线程获取写锁,则需要判断当前获取读锁的线程在写锁获取之前是否有获取过读锁,如果获取了,则当前获取读锁成功,如果没有获取过,则需要将当前线程加入到同步队列中。 而tryLock()方法就不一样,只要锁没有被获取过或当前锁被读线程获取,当前读锁就获取成功,这样可能会导致一个问题,如果不断的有获取读锁的线程,将会导致获取写锁的线程处于饥饿中,在极端的情况下可能会永远获取不到锁。这一点需要注意的。 ??接下来,我们来看另外一个方法tryLock(long timeout, TimeUnit unit),这个方法的源码如下 public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { long lastTime = System.nanoTime(); final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } if (nanosTimeout <= 0) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } ??上面加粗的代码就是和读锁lock()方法不同的地方,其他最主要的区别就是parkNanos(this, nanosTimeout)方法,如果过了nanosTimeout没有被其他的线程唤醒,将自动唤醒,并返回false,线程继续执行,如果在nanosTimeout毫秒内有其他线程唤醒了当前线程,则和普通的lock()没有什么区别。 final int getReadHoldCount() { // 如果当前没有读锁占用,则返回 0 ,下面的代码是提供捷径方式 if (getReadLockCount() == 0) return 0; Thread current = Thread.currentThread(); // 如果当前线程是第一个获取读锁的线程,则直接返回 firstReaderHoldCount即可 if (firstReader == current) return firstReaderHoldCount; HoldCounter rh = cachedHoldCounter; if (rh != null && rh.tid == current.getId()) return rh.count; // 如果以上都不是,则到threadLocal中查找 int count = readHolds.get().count; // 如果当前线程的读锁获取次数剩余0时,将threadLocal变量移除,节省内存 if (count == 0) readHolds.remove(); return count; } final int getReadLockCount() { return sharedCount(getState()); } ??从上面来看,看当前线程获取读锁的次数,先看是否有线程获取读锁,如果没有则返回0,否则看firstReader 的值是否和当前线程相等,如果是,则获取firstReaderHoldCount的值返回 ,否则看cachedHoldCounter的线程id和当前线程id是否相等,如果相等,取cachedHoldCounter.count返回 ,否则到threadLocal中去查找,其实整个过程代码不难,难是作者的思想,相当于作者用了3级缓存,先到firstReader中查找,如果找到到,则到cachedHoldCounter中查找,如果还找不到,则到threadLocal中查找,虽然代码写得烦索,但是性能高,逻辑严谨,这是我们应该学习的。 ??ReentrantReadWriteLock的源码也就解析到这里了,我相信看到这里的小伙伴肯定有所收获,当然,如果我写得有问题,请给我的博客下方留言, 欢迎较正。 ??通过ReentrantReadWriteLock源码的分析,我们至少学习到了Doug Lea细腻的心思 ,严谨的代码,巧妙的构思,这些都是作为一个程序员应该学习的地方。 |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 | -2024/11/24 7:15:57- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |