前言
Java锁机制2.0 已经讲到了CAS,可是CAS存在了一些问题:
- 虽然Java的底层已经在unsafe类中封装了
compareAndSwap 方法,支持了对于系统CAS原语的调用,但是对于上层业务来说,怎样才能无感地进行调用? - 此外我们在业务代码中往往想要同步的是某项被竞争的资源,这种被竞争的资源常常以对象的形式进行封装,而CAS只是能够原始地去修改内存上的一个值,那么该如何利用CAS去同步对象呢?
而这就需要进一步的抽象。
目前Java已经提供了CAS的能力,那么该如何利用它来开发一个通用的,能够对竞争资源进行同步管理的框架?
- 通用性,下层实现透明的同步机制,同时与上层业务解耦
- 利用CAS,原子地修改共享标记位
CAS能原子地对一个值进行修改,那么就将这个值作为竞争资源的标志位,在多个线程想要去修改共享资源的时候,先来读这个标记位,如果标记位显示目前的共享资源时空闲的,就可以被获取,那么久赋予该线程写权限,同时将这个标记位置设置为不可用,阻碍其它线程的调用——利用CAS的原子性。 - 等待队列(阻碍其他线程的调用)
有两种业务场景:一种线程可能指向快速地去尝试一下获取共享资源,如果获取不到也没关系,它会进行其他的处理;另一种业务线程它可能一定要获取共享资源才能进行下一步处理,如果当前没有获取到,它会继续等待。 针对第一种场景:直接返回true或者false就可以了。 针对第二种场景:如果让其一直轮询访问会不断地占用CPU的时间片,在高并发场景或者CPU密集型的场景下一定会降低系统的性能。并且实现框架的目的就是为了简化上层的操作,屏蔽复杂度,如果框架还需要上层进行业务处理,就增加了上层业务开发的难度。所以第二种场景不能直接返回,可以设计一个队列来将这些想要获取共享资源的线程进行排队,一旦共享资源空闲下来,那么这个队列里的线程就可以依次去进行获取。 注意:FIFO队列中的头结点是虚节点,只是用来占位,第二个节点才是真正要去拿锁的节点,节点拿到锁之后就会变成头结点,然后头结点就会出队。
上面框架的思想和 java.util.concurrent.locks 中的 AQS(AbstractQueuedSynchronizer) 类似,JUC中的很多工具以及目前一些主流的开源中间件都大量使用了AQS。
1.为啥要用并发编程?
由于计算机芯片发展遇到了瓶颈(CPU发展遇到瓶颈),为了使用最低的成本开发出运行效率最高的软件,就诞生了并发编程(压榨CPU)。
为了拆分计算机处理的任务,并将其发布到多个廉价的CPU上同时执行,这样就能低成本地提高软件执行的效率。 但并非所有的处理任务都是独立的,当多个任务在执行期间想要进行读取同一块内存值时,就出现了一个问题:我们该如何对这多个任务进行同步管理,以此来保证程序不会出错,同时好要保证高性能的目标,找到最优的解决方法? 而这,就是并发编程的意义。
2.看一看AQS的源码
1.AQS的成员变量
为什么这个标记位不用boolean类型?boolean占用的空间更小,只要1bit,而且能保证二义性。
这里就要说到线程获取锁的两种模式:独占和共享。
- 独占模式:一旦被占用,其他线程都不能占用,只能等待。
- 共享模式:一旦被占用,其他共享模式下的线程仍能占用,从而一起访问线程资源。
如此,在共享模式下,可能会存在多个线程正在共享资源,所以 state 需要表示线程占用的数量,因此使用 int
另外两个节点的作用:如果一个线程在当前时刻没有获取到共享资源,他可以选择进行排队,而这个队列是一个 FIFO 的双向链表
队列里的节点有两种模式:独占和共享,虽然这两者表现得意义不一样,但在底层的处理逻辑上几乎没有太大差别,故而此处就只讲独占模式。
队列中的Node类
2.AQS的方法
尝试获取锁(修改标记位),立即返回 → tryAcquire
获取锁(修改标记位),愿意进入队列等待 → acquire
tryAcquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
- int:代表对state的修改,boolean:代表是否成功获得锁
- 方法体只抛出一个异常:AQS的子类必须重写这个方法,否则就抛出不支持该操作的异常
设计意义:给上层调用开放空间,上层可以重写自定义业务逻辑
子类自定义业务逻辑:
public class TestTA extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (arg != 1){
return false;
}
if (getState() == 1){
return false;
}
return compareAndSetState(0,1);
}
}
在上层调用tryAcquire 成功时获取锁,此时可以对相应的共享资源进行操作,使用完之后再进行释放。 如果获取失败,且上层用户不想等待,则可以直接进行处理,如果选择等待锁,就可以直接调用acquire 方法,而不用自己去进行复杂的排队处理。
acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- final:不能重写,只能直接调用此方法 → 此方法一定能得到锁
- if判断:当
tryAcquire 得到锁,!tryAcquire为false,直接跳出判断条件,不用再执行后面的判断条件;反之,就执行acquireQueued 方法,进行排队等待锁
先来看看acquireQueued 中的addWaiter
为当前线程和给定模式创建和排队节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
方法作用:将当前线程封装成一个node,然后加入等待队列,返回值为当前节点。
接着点开 enq —— 完整的入队方法
将节点插入队列,必要时进行初始化。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
看了上述两个方法的代码,问题来了:为什么不直接enq 进行完整的入队操作呢?而是先去尝试一下快速入队?二者相比,完全入队只多了一个判断为空的操作
作者应该是为了追求极致的效率
acquireQueued
配合release 方法是对线程进行挂起和响应,以此来实现队列的先入先出
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 可以看出,只有线程抛出了异常 failed 才会抛出为 true ,此时才会进行
cancelAcquire(node); //将Node的waitStatus设置为CANCEL以及其他的一些清理操作 - 自旋操作内容:如果当前节点的前置节点是头结点,且当前线程尝试获取锁成功了,就直接返回;
shouldParkAfterFailedAcquire 返回true的两个情况:此节点是头结点 或者 此节点不是头结点,但是获取锁失败 为什么要用parkAndCheckInterrupt 判断线程是否需要挂起,而不是直接去自旋呢? 自旋→CPU→消耗性能→还没有轮到出队的线程挂起,再在适合的时间唤醒
TIPS:Java中断: 作用于线程对象,并不会直接促成线程挂起,而是会根据Thread当前的活动状态产生不同的效果。 注意: 一个线程通过wait ,sleep 等方法进行挂起,那么再调用interrupt 将会直接抛出异常,而使用LockSupport.park() 挂起,就不会抛出异常而是会更改此线程内部的一个中断值。
public class TestDemo1 {
public static void main(String[] args) {
Thread a = new Thread(new Runnable() {
@Override
public void run() {
while (true){
System.out.println("a...");
}
}
});
a.start();
a.interrupt();
System.out.println("a线程中断");
}
}
class TestDemo2{
public static void main(String[] args) {
Thread b = new Thread(new Runnable() {
@Override
public void run() {
while (true){
System.out.println("b...");
}
}
});
try {
b.wait();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
b.interrupt();
}
}
接下来看看 shouldParkAfterFailedAcquire ,有哪些线程需要被挂起?
通过前置节点的 waitStatus 判断
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
最后来看看 parkAndCheckInterrupt 对当前线程进行挂起
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
总结:
- 如果当前线程所在节点处于头结点后面一个,那么将会不断尝试拿锁,除非拿锁成功,否则进行判断是否需要挂起
- 如何判断是否需要挂起?
如果当前线程所在的节点之前除了head还有其他节点,并且 waitStatus 为 SINGLE ,那么当前节点就需要挂起 这样就能保证head之后就只有一个节点在通过CAS来获取锁,队列中的其他线程都已经被挂起或正在被挂起→最大限度避免无用的自旋消耗CPU
问题: 何时去唤醒这么一大堆被挂起的线程?
在一个线程使用完了共享资源并且要释放锁的时候,才去唤醒其他正在等待锁的线程。
release
开放给上层自定义调用的方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
尝试唤醒锁成功,就去唤醒其他在等待队列中的节点
unparkSuccessor
为了唤醒Head后面的Node,使其自旋地获取锁
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
被唤醒的线程会继续执行之前 acquireQueued 方法,进行自旋尝试获取锁,这样就形成了一个能够良好工作的闭环。
|