前言
为什么会出现Java并发? 摩尔定律失效。
JVM线程模型
??编写的 .java 文件,经过编译后生成 .class 文件,然后经过 类加载子系统,送达到JVM中,所以 Java 字节码文件是运行在 JVM 中,而 JVM 是运行在 OS 上。继而可以知道,JVM 创建,回收线程时候,需要跟 OS 交互,交互必然涉及到 规范和协议。这就是 JVM 线程模型。
3种JMM线程模型
一对一,一对多,多对多。 UT-用户线程,LWP-轻量级进程,KLT-内核线程。
悲观锁
顾名思义,认为每次的操作都是线程不安全的。 在Java中,每个Object对象都拥有一把锁,这把锁放在了 对象头 中,记录了当前对象被哪个对象占用。 Java对象分为 3 部分:对象头,实例数据,对其填充。 对象头分为 2 部分:MarkWord, ClassPointer。
synchronized
可以用来保证线程安全,经过编译后生成了 monitorenter 和 monitorexit 两个字节码指令,底层是依赖的的 OS 的 mutex lock。 从JDK6开始,对象锁有了 4 种状态:无锁,偏向锁,轻量级锁,重量级锁。 无锁:判断MarkWord的锁标志位,01 的话,接着判断倒数第 3 个,即 判断是否是偏向锁,0 的话,代表当前对象的锁状态是 无锁。 偏向锁:判断MarkWord的锁标志位,01 的话,接着判断倒数第 3 个,即 判断是否是偏向锁,1 的话,代表当前对象的锁状态是 偏向锁。 轻量级锁:判断MarkWord的锁标志位,00 的话,代表当前对象的锁状态是 轻量级锁。 重量级锁:判断MarkWord的锁标志位,10 的话,代表当前对象的锁状态是 重量级锁。
乐观锁
认为每次操作都是安全的,例如CAS,如果发现空闲,就可以使用,如果被占用,那么继续尝试获取。
CAS
CompareAndSwap,比较并交换。CAS必须是原子性的。在 x86_64 架构下,通过 cmpxchg 指令支持;在 ARM 架构下,通过 LL/SC 来实现。
AQS
AQS是对CAS的一种封装和丰富,AQS引入了独占锁、共向锁等性质。
尝试设计
如果要设计一个组件,他需要是通用的,并且能够对竞争对资源同步管理,需要怎么做呢? 先来看看考虑哪些点呢? 1.既然要做一个组件,那么就需要具有通用性,因为上层业务总是变化,所以这个组件要保证对外接口的简单性和纯粹性。 2.既然CAS能够原子地对一一个值进行写操作,那么我可以将这个值(称为status) 作为竞争资源的标记位。在多个线程想要去修改共享资源时,先来读status, 如果status 显示目前共享资源空闲可以被获取,那么就赋予该线程写status的权限,当该线程原子地修改status成功后,代表当前线程占用了共享资源,并将status置为不可用,拒绝其他线程修改status,也就是拒绝其他线程获取共享资源。 3.怎么设计拒绝其他线程调用呢?尝试获取时就算获取不到,立马返回,还是继续等待直到获取成功呢?可以一直等待直到成功吗?应该是不能的,理由如下:1.通用组件对外调用时,内部实现应该简单纯粹;2.CPU一直空转,浪费资源。 当大量请求请求 lock() 时,如何对他们进行管理呢?可以放进队列中。队列头部的线程自旋的访问 status,其他线程挂起,这样就避免了其他线程的自旋消耗,当头部线程成功占用资源后,再唤醒后一个被挂起的线程人,让他自旋的访问 status。 OK,思路完毕,接下来是求证。
看源码意义
0.坚持。 1.耐心。不一定要通读,但是核心部分要精读。 2.为了理解原理和细节,从细节之处学习高手的思想
AbstractQueuedSynchronizer的内部实现
属性
private volatile int state;
??state 就是用来判断是否获取成功的标志位,诶,那为什么不是 boolean 类型的呢?这里就要谈到获取锁的两种状态,独占和共享。独占模式下,一个线程获取锁后,其他线程只能等待;共享模式下,一个线程获取锁后,其他想以共享模式获取锁的线程也可以一起访问资源,但是其他想以独占方式访问锁的线程就需要等待了。这就说明,共享模式下,是可以有多个线程共享同一个资源的,所以 state 用来表示线程占用数量,就是 int 类型了。 一句话表示:锁的获取可以是独占,也可以是共享,共享时,需要查看有几个线程正在共享资源,所以是 int 类型。
private transient volatile Node head;
private transient volatile Node tail;
CLH 双向队列,用来对等待的线程进行管理,是 FIFO 类型的。
内部类
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
?? 重点关注下,waitStatus 这个属性,这个一个枚举值,AQS工作时必然伴随着 waitStatus 值的变化,如果理解了 waitStatus 变化的时机,那对理解 AQS 有很大帮助。 ?? waitStatus 主要包含 4 种状态: 0:节点初始化默认值或节点已经释放 CANCELLED = 1:表示节点释放锁的请求已经取消了。 CONDITION = -2:表示当前节点正在等待某一个 Condition 对象。 PROPAGAT = -3: 传递共享模式下锁释放状态。 Node的方法只有 2 个,一个锁判断是不是共享,一个获取前置 Node。 ?? 还记得上面说,尝试获取锁时, 一开始想到的两种场景?不管能不能获取到,立即返回;获取锁失败;一直自旋,直到成功。看看源码呢?
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
?? 接下来一起来看看核心的和复杂的 acqire 方法。 ?? tryAcquire(arg) 的意思是,如果获取成功,那么 !tryAcquire(arg) 就是 false,说明已经获取锁成功了,根本不用参加排队,后面的判断就没必要看了。如果 !tryAcquire(arg) = true,接着执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 方法,这是一个嵌套方法,我们接着看 addWaiter(Node.EXCLUSIVE) 方法。 ?? 故名思义,这个方法的作用就是 将当前线程封装成一个 Node,然后加入等待队列,返回值即为该 Node。逻辑呢,也是非常简单的,首先是创建一个 Node 对象,之前也说过,这个队列是 FIFO 的,意味着是要插入队尾。但是我们需要考虑到多线程场景,即 假设多个线程正在调用 addWaite()。 ?? 新建 pred 节点引用,指向当前的尾节点: 1.将当前节点的 pre 指针指向 pred 节点(尾节点) 2.尝试通过 CAS 将当前节点置为尾节点, 如果返回 false,说明 pred 节点已经不再是尾节点,尾节点已经被其他线程修改,那么退出判断,调用 end(),准备重新进入队列。 如果返回 true,说明 CAS 之前, pred 依然是尾节点,CAS 成功使得当前节点变尾节点,若当前节点顺利成为尾节点,那么 pred 节点和当前 node 之间的相对位置已经确定,此时将 pred 节点的 next 指针指向当前 node, 是不会存在问题的。 ?? 由于是多线程环境,这里有 3 个细节容易忽略,也是该方法的重点: 1.当线程执行到第 13 行时,pred 引用指向的可能不再是尾节点,所以 CAS 失败。 2.如果 CAS 成功,诚然 CAS 具有原子性,但是 14,15 两行在执行时并不具有原子性,只不过此时 pred 节点和当前节点的相对位置已经确定,其他线程只是正在插入的新的尾节点,并不会影响到这里的操作,所以线程是安全的。 3.想要记住的是,当前后两个节点建立连接的时候,首先是后节点的 pred 指向前节点,当后节点成功成为尾节点后,前节点的 next 才指向 后节点。 ?? 如果理解了这些,我们来看 18 行,如果程序运行到了这一行,说明出现了 2 种情况之一: 1.队列是空 2.快速插入失败,想要进行完整流程的插入,这里所说的快速插入,指的就是14-17 行的逻辑,当并发线程较少的情况下,快速插入成功率很高,程序不用进入完整流程插入,效率会很高。 ?? 既然程序来到了 18 行,我们来看看完整流程的插入是什么样子的:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
?? 如果队列 tail == null,那么就尝试初始化,如果尾节点插入失败,那就一直重试,直到插入成功为止。一旦 addWaiter 成功之后,不能就这么不管了,猜测可能是:既然存在一个 FIFO 队列,那么可能会使用了 “生产-消费” 模式,有一个消费者不断的从头节点获取节点,出队节点种封装的线程拥有拿锁的权限。但是实际上 AQS 实际上并没这么做,而是在各个线程当中维护了当前 Node 的 waitStatus,根据不同的状态,程序作出不同的操作,通过调用 acquireQueue 方法,开始对 Node 的 waitStatus 进行跟踪维护。我们来继续看 acquireQueued 方法: ?? 首先,acquireQueue 方法内部定义了一个局部变量 failed,初始值为 true,默认失败。还有一个变量 interrupted,初始值为 false,意思是等待锁的过程中当前线程没有被中断。再来看看整个方法中,哪里用到了这些变量: 1.第 11 行,return 之前,failed 值会改成 falls,代表执行成功,并且返回 interrupted。 2.第 15 行,如果满足条件,interrupted 会被改成 true,最终在第 11 行返回。 3.第 18 行,finally 块,通过判断一个 failed 值来进行一个名为 cancelAcquire 的操作, 即 取消当前线程获取锁的行为。 ?? 那基本可以 acquireQueued 分成 3 部分: 1.第 7-11行,当前节点置为 head,说明当前节点有权限去尝试拿锁,这是一种约定。如果 tryAcquire 返回 true,代表拿到了锁就返回。 2.第13-15 行,if 包含 2 个方法,看名字(详细方法体后续再看)是首先判断当前线程是否需要挂起等待?如果需要,那么就挂起,并且判断外部是否调用线程中断;如果不需要,那么继续尝试拿锁。 3.第 18-19行,如果 try 块抛出非预期异常,那么取消当前线程获取锁的行为。 ?? 这里呢,有 3 点需要关注下: 1.一个约定:head 节点表示当前正在持有锁的节点。若当前节点的前置节点是 head,那么该节点就开始自旋的获取锁。一旦 head 节点释放,当前节点就能第一时间获取到。 2.shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法体细节。 3.interrupted变量最终被返回出去后,上层 acquire 方法判断该值,来判断是否调用当前线程中断。这里属于一种延迟中断机制。 ?? 我们来看第 2 点提到的两个方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
?? 若当前节点没有拿到锁的权限或者拿锁失败,那么将会进入 shouldParkAfterFailedAcquire 判断是否需要挂起(park),方法的参数是 pred Node 和 当前 Node 的引用。 ??
|