一、AQS原理
1.1 什么是AQS
1.1.1 粗识AQS
AQS(AbstractQueuedSynchronized)抽象队列同步器是rt.jar包下java.util.concurrent下的并发工具包中核心中的核心。其集成和实现关系如下: 可以看到,AQS的实现场景非常多,诸如信号量(限流)、可重入锁、可重入读写锁、线程池、Latch闭锁(同步协助类)等等。
1.1.2 AQS特质
-
AQS具备的特性
- 阻塞等待队列
- 共享锁、独占锁
- 公平、非公平锁
- 可重入(在部分实现中)
- 允许中断
-
AQS内部维护的属性 volatile int state
private volatile int state;
- state表示锁同步状态[独占锁表示是否占有,重入时自增。共享锁一般有其他count表示重入]
- state的三种访问方式:
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
- getState() 直接从内存中读取同步器状态值
- setState() 设置同步器状态值(一般是释放锁时使用)
- compareAndSetState() 尝试修改锁状态(抢占锁时使用)
-
AQS定义资源共享的方式(通过Node同步等待队列实现) static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
}
- Exclusive 排它锁也称独占锁,只有一个线程可以执行(ReentrantLock )
- Share 共享锁,多个线程可同时执行(Semaphore/CountDownLatch)
-
AQS定义两种队列
- 同步等待队列:用于维护获取锁失败的入队线程
- 条件等待队列:调用await()方法释放锁后,加入条件队列,等待条件唤醒再次争抢锁
-
AQS定义5个队列中节点状态 static final class Node {
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
}
-
自定义同步器主要实现方法(AQS已经完成等待队列的入队、出队),用户只需要定制共享资源变量state的获取与释放即可。
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
1.1.3 核心思想
AQS解决线程同步问题的思想和synchronized相同,都是来自于管程EMSA思想。
1.2 同步等待队列
同步等待队列也称CLH,双向链表结构的队列,FIFO先进先出。
public abstract class AbstractQueuedSynchronizer{
private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
}
1.3 条件等待队列
AQS条件队列使用单向链表保存,用nextWaiter连接
- await()方法阻塞线程,进入条件队列
- 当前线程存在于同步队列的头节点,await()来阻塞
- signal()方法将条件队列节点转入同步等待队列
- unlock()方法唤醒同步等待队列节点
1.4 Condition接口详解
Condition接口用于条件队列的设置,在循环屏障和读写锁中应用广泛。
- await()方法, 释放锁 -> 阻塞当前线程 -> 向condition尾部插入节点
- signal()方法,将Condition队列的首节点移动到阻塞队列的尾部,然后唤醒同步等待队列的首节点去竞争锁。
Condition的用法在于通过满足某种条件后,让持有锁的当前线程condition.await()释放锁,并进入等待队列addConditionWaiter()。当某个条件达成后,再通过condition.signal();唤醒因调用Condition#await方法而阻塞的线程。从而达到线程间的条件协作。
public class ConditionTest {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
log.debug(Thread.currentThread().getName() + " 开始处理任务");
condition.await();
log.debug(Thread.currentThread().getName() + " 结束处理任务");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
lock.lock();
try {
log.debug(Thread.currentThread().getName() + " 开始处理任务");
Thread.sleep(2000);
condition.signal();
log.debug(Thread.currentThread().getName() + " 结束处理任务");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
}
}
二、ReentrantLock详解
2.1 ReentrantLock的使用
特点:
- 可中断
- 可设置超时时间
- 可选择公平、非公平
- 支持多个条件变量
- 可重入
使用ReentrantLock加锁的编程范式,伪代码如下:
public class ReentrantLockDemo {
public static void main(String[] args){
ReentrantLock lock = new ReentrantLock();
ReentrantLock lock = new ReentrantLock(true);
lock.lock();
try {
} finally{
lock.unlock();
}
}
}
2.2 ReentrantLock源码分析(执行流程图+源码配套解读)
public class ReentrantLockDemo {
private static int sum = 0;
private static final Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(()->{
lock.lock();
try {
for (int j = 0; j < 10000; j++) {
sum++;
}
} finally {
lock.unlock();
}
});
thread.start();
}
Thread.sleep(2000);
System.out.println(sum);
}
}
2.2.1 ReentrantLock加锁逻辑
核心代码逻辑如下
-
公平锁与非公平锁逻辑相似,但非公平锁在cas state失败入队前(同步等待队列),会多尝试几次cas -
addWaiter(Node.EXCLUSIVE) for循环保证一定能入队(进入同步等待队列) private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- acquireQueued(node, 1)) for循环保证一定能阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.2.2 ReentrantLock线程被唤醒后的逻辑
唤醒线程后,将同步等待队列当前节点设置为头节点,删除原头节点。开始执行临界区代码。
2.2.3 ReentrantLock线程解锁的逻辑
2.3 ReentrantLock的思考
- 公平锁和非公平锁哪个吞吐高?
毫无疑问,非公平锁的效率更高。从上面加锁逻辑图可以清晰的看到,非公平锁在acquire(1)尝试抢占锁(cas state的状态)前后,会有额外的2次cas。这个时段内,如果有其他线程退出锁,非公平锁可以不入队park()线程直接获取锁,执行临界区代码。
三、Semaphore详解
3.1 Semaphore
3.1.1 Semaphore基本概念
Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于AbstractQueuedSynchronizer实现的。
Semaphore的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,当每个线程只争抢1个资源时,可以实现只能有n个线程同时获取信号量。
3.1.2 Semaphore是操作系统PV操作的实现
PV操作是操作系统一种实现进程互斥与同步的有效方法。PV操作与信号量(S)的处理相关,P表示通过的意思,V表示释放的意思。用PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性。
P操作的主要动作是:
①S减1;
②若S减1后仍大于或等于0,则进程继续执行;
③若S减1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度。
V操作的主要动作是:
①S加1;
②若相加后结果大于0,则进程继续执行;
③若相加后结果小于或等于0,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度。
3.1.3 Semaphore常用方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
public void acquire() throws InterruptedException {}
public boolean tryAcquire() {}
public void release() {}
3.2 应用场景(限流)
下面场景中,网吧只有3台电脑,5位用户先后来到,同时只允许3位用户使用电脑,其他人需要阻塞等待。
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore windows = new Semaphore(3);
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
windows.acquire();
System.out.println(Thread.currentThread().getName() + ": 老板,上机!");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
windows.release();
System.out.println(Thread.currentThread().getName() + ": 下机,走了!");
}
}
}).start();
}
}
}
#Print Result:
Thread-0: 老板,上机!
Thread-1: 老板,上机!
Thread-2: 老板,上机!
Thread-1: 下机,走了!
Thread-4: 老板,上机!
Thread-2: 下机,走了!
Thread-3: 老板,上机!
Thread-0: 下机,走了!
Thread-4: 下机,走了!
Thread-3: 下机,走了!
Process finished with exit code 0
3.3 Semaphore源码分析
-
加锁等待阻塞的入队逻辑 -
解锁逻辑
3.4 Semaphore限流的思考
四、CountDownLatch
4.1 CountDownLatch闭锁
4.1.1 CountDownLatch介绍
CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。
CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值(count)由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。这是一个一次性现象 —— count不会被重置。如果你需要一个重置count的版本,那么请考虑使用CyclicBarrier。
4.1.2 常用方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void countDown() {
sync.releaseShared(1);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
4.2 CountDownLatch应用场景
public class CountDownLatchTest2 {
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(() -> {
try {
Thread.sleep(1000 +
ThreadLocalRandom.current().nextInt(1000));
System.out.println(Thread.currentThread().getName()
+ " finish task" + index);
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
countDownLatch.await();
System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
}
}
#Print Result:
Connected to the target VM, address: '127.0.0.1:57114', transport: 'socket'
Thread-0 finish task0
Thread-1 finish task1
Thread-3 finish task3
Thread-4 finish task4
Thread-2 finish task2
主线程:在所有任务运行完成后,进行结果汇总
Disconnected from the target VM, address: '127.0.0.1:57114', transport: 'socket'
-
场景二:多线程任务阻塞住,等待发令枪响,一起执行。 public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
countDownLatch.await();
String parter = "【" + Thread.currentThread().getName() + "】";
System.out.println(parter + "开始执行……");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(2000);
countDownLatch.countDown();
}
}
#Print Result
【Thread-0】开始执行……
【Thread-4】开始执行……
【Thread-3】开始执行……
【Thread-1】开始执行……
【Thread-2】开始执行……
Process finished with exit code 0
4.3 CountDownLatch实现原理
- countDownLatch.countDown();表示任务已完成,将需要统计的线程数-1
- 当countDown()后发现最终的count==0,表示所有任务已经执行完毕。doReleaseShared() 唤醒被阻塞的主线程。
- countDownLatch.await(); 当主线任务中,发现count不等于0,需要等待子线任务完成。此时线程park()阻塞
4.4 CountDownLatch思考
- countDownLatch与Thread.join()区别
- CountDownLatch的作用就是允许一个或多个线程等待其他线程完成操作,看起来有点类似join() 方法,但其提供了比 join() 更加灵活的API。
- CountDownLatch可以手动控制在n个线程里调用n次countDown()方法使计数器进行减一操作,也可以在一个线程里调用n次执行减一操作。
- 而 join() 的实现原理是不停检查join线程是否存活,如果 join 线程存活则让当前线程永远等待。所以两者之间相对来说还是CountDownLatch使用起来较为灵活。
- CountDownLatch与CyclicBarrier的区别
- CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次
- CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤醒的,而CountDownLatch则是通过AQS的“共享锁”实现
五、CycliBarrier
5.1 CycliBarrier使用
5.1.1 CycliBarrier介绍
CycliBarrier的字面意思为回环栅栏,通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
5.1.2 CycliBarrier常用方法
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier();
nextGeneration();
} finally {
lock.unlock();
}
}
5.2 CycliBarrier应用场景
类似于生活中汽车站,人满发车;或者LOL类游戏,满10人一组,开始游戏等。
5.3 CycliBarrier实现原理及源码
- CycliBarrier采用ReentrantLock+AQS实现锁屏障
- 多线程进入共享一把独占锁
- await()方法尝试加锁失败后,直接入队同步等待队列
- 指定state信号量自减到0时,doSignalAll()将所有条件等待队列的元素唤醒,开跑
- 信号量不为0时,加入到条件等待队列,并唤醒同步等到队列中的线程尝试获取锁
5.4 CycliBarrier思考
六、ReentrantReadWriteLock读写锁
6.1 ReentrantReadWriteLock
6.1.1 ReentrantReadWriteLock介绍
生产中,数据的读写更多情况下是读多写少。当多线程同时发生读操作,而没有写操作时,此时多线程间并发读是线程安全的;而当有线程对资源进行写操作时,其他线程则不应该继续对该资源进行读和写操作。ReentrantReadWriteLock则是针对这种场景,设计出来的读写分离的两把锁。读是共享锁,写是独占锁。
读写锁的特性:
-
锁公平性选择:默认非公平锁和可选公平锁。 -
可重入型:可锁和写锁均可重入。且按照锁特性线程获取读锁后,可继续获取读锁。线程获取写锁后,可以再次获取写锁或读锁。 -
锁降级:遵循获取写锁、在获取读锁最后释放写锁的次序,写锁将降级为读锁。 -
共用state信号量: private volatile int state
6.1.2 读写锁接口ReadWriteLock
ReadWriteLock是ReentrantReadWriteLock的父类,在ReentrantReadWriteLock中,分别创建共享读锁和独占写锁赋值。
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
6.1.3 ReentrantReadWriteLock常用方法
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
6.1.4 锁降级
6.2 ReentrantReadWriteLock应用场景
适用于: 读多写少的场景 [读锁用共享锁,共享的最大值为MAX_COUNT=65535],多线程间不会存在排他。
对临界数据的读写如下:
public class ReadWriteLockTest {
public static void main(String[] args) throws InterruptedException {
final Queue queue = new Queue();
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
queue.put(new Random().nextInt(10000));
}
}).start();
}
Thread.sleep(100);
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
queue.get();
}
}).start();
}
}
}
class Queue {
private Object data = null;
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock readLock = readWriteLock.readLock();
private Lock writeLock = readWriteLock.writeLock();
public void get() {
log.debug(Thread.currentThread().getName() + " be ready to read data!");
readLock.lock();
try {
Thread.sleep(1000);
log.debug(Thread.currentThread().getName() + " have read data :" + data);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}
public void put(Object data) {
log.debug(Thread.currentThread().getName() + " be ready to write data!");
writeLock.lock();
try {
Thread.sleep(5000);
this.data = data;
log.debug(Thread.currentThread().getName() + " have write data: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
}
6.3 ReentrantReadWriteLock实现原理及源码分析
6.3.1 读写状态的设计&设计精髓
- 写状态,存于低16位。当需要对写状态+1[抢夺写锁时]
- S & 0x0000FFFF,低16位保留,高16位清零。读到S值。
- S+1即可
- 读状态,存于高16位。当需要对读状态+1时[读锁线程+1]
- S >>> 16(S右移16位,左侧高位补0),读到S值。
- S + (1<<<16) 即可(+ SHARED_UNIT = 65535)
- 读锁中,state高16位仅表示持有读锁的线程数,还需要额外的值存取重入次数,即HoldCounter。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
6.3.2 HoldCounter计数器
ThreadLocalHoldCounter静态内部类,重写ThreadLocal#initialValue()方法,每次生成HoldCounter,每个线程生成自己的tid,保存重入的count值。
static final class HoldCounter {
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}
6.2.4 写锁与读锁的获取与释放
实现类似于原先的独占锁和共享锁的获取与释放,特殊之处仅在于读与写共同操作同一个信号量state。当独占时,其他锁都将进入等待队列。
- 读写互斥
- 写写互斥
- 写锁支持同线程的重入
6.4 ReentrantReadWriteLock的思考
-
写锁 -> 读锁 -> 释放写锁,是如何降级为读锁的? -
当存在一个write线程,所有读操作都阻塞? 当有线程占用写锁时,state高低16位分别表示不同的锁状态。高16位表示共享锁状态,低16位表示独占写锁状态。 -
读写锁是怎样实现分别记录读写状态的? 通过位运算,分别获得高低两个值。 -
写锁是怎样获取和释放的?读锁是怎样获取和释放的? 后面有时间再画流程图
|