一、简介
1. 什么是Condition
任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object 上),主要包括wait() 、wait(long timeout) 、notify() 以及notifyAll() 方法,这些方法与synchronized 同步关键字配合,可以实现等待/通知模式。Condition 接口也提供了类似Object 的监视器方法,与Lock 配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。——摘自《Java并发编程的艺术》
下面是Condition 与Object 的监视器方法的对比(摘自《Java并发编程的艺术》)
对比项 | Object Monitor Methods | Condition |
---|
前置条件 | 获取对象的锁 | 1.调用Lock.lock() 获取锁 2.调用Lock.newCondition() 获取Condition对象 | 调用方式 | 直接调用。 如Object.wait() | 直接调用。 如condition.await() | 等待队列的个数 | 一个 | 多个 | 当前线程释放锁并进入等待状态 | 支持 | 支持 | 当前线程释放锁并进入等待状态,且在等待状态中不响应中断 | 不支持 | 支持 | 当前线程释放锁并进入超时等待状态 | 支持 | 支持 | 当前线程释放锁并进入等待状态直到将来的某个时间 | 不支持 | 支持 | 唤醒等待队列中的一个线程 | 支持 | 支持 | 唤醒等待队列中的全部线程 | 支持 | 支持 |
2. Condition接口
我们来看一下Condition接口的定义
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
Condition 是一种广义上的条件队列。他为线程提供了一种更为灵活的等待/通知模式,线程在调用await方法后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。Condition 必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。
二、基本使用
以基本的B线程唤醒A线程为例
1. Object等待唤醒写法
public static void main(String[] args) throws InterruptedException {
Object obj=new Object();
Thread threadA = new Thread(()->{
System.out.println("A尝试获取锁...");
synchronized (obj){
System.out.println("A获取锁成功!");
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("A开始释放锁,并开始等待...");
obj.wait();
System.out.println("A被通知继续运行,直至结束。");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread threadB = new Thread(()->{
System.out.println("B尝试获取锁...");
synchronized (obj){
System.out.println("B获取锁成功!");
try {
TimeUnit.SECONDS.sleep(3);
System.out.println("B开始释放锁...");
obj.notify();
System.out.println("B随机通知lock对象的等待队列中某个线程!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadA.start();
TimeUnit.SECONDS.sleep(1);
threadB.start();
}
执行结果如下
A尝试获取锁...
A获取锁成功!
A开始释放锁,并开始等待...
B尝试获取锁...
B获取锁成功!
B开始释放锁...
B随机通知lock对象的等待队列中某个线程!
A被通知继续运行,直至结束。
Process finished with exit code 0
2. Condition等待唤醒写法
public static void main(String[] args) throws InterruptedException {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread threadA = new Thread(()->{
System.out.println("A尝试获取锁...");
lock.lock();
try {
System.out.println("A获取锁成功!");
TimeUnit.SECONDS.sleep(1);
System.out.println("A开始释放锁,并开始等待...");
condition.await();
System.out.println("A被通知继续运行...");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
System.out.println("A线程释放了锁,执行结束!");
}
});
Thread threadB = new Thread(()->{
System.out.println("B尝试获取锁...");
lock.lock();
try {
System.out.println("B获取锁成功!");
TimeUnit.SECONDS.sleep(3);
System.out.println("B开始释放锁...");
condition.signal();
System.out.println("B随机通知lock对象的等待队列中某个线程!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
System.out.println("B线程释放了锁,执行结束!");
}
});
threadA.start();
TimeUnit.SECONDS.sleep(1);
threadB.start();
}
执行结果如下
A尝试获取锁...
A获取锁成功!
A开始释放锁,并开始等待...
B尝试获取锁...
B获取锁成功!
B开始释放锁...
B随机通知lock对象的等待队列中某个线程!
B线程释放了锁,执行结束!
A被通知继续运行...
A线程释放了锁,执行结束!
Process finished with exit code 0
三、实现原理
1. ConditionObject类
ConditionObject 是Condition 在java并发中的具体的实现,由于Condition的操作需要获取相关的锁,而AQS则是同步锁的实现基础,所以ConditionObject 则定义为AQS的内部类。
1.1 类继承关系
ConditionObject 定义如下
public class ConditionObject implements Condition, java.io.Serializable {
}
实现Condition和Serializable接口。
1.2 类的属性
它主要包含如下属性
private static final long serialVersionUID = 1173984872572414699L;
private transient Node firstWaiter;
private transient Node lastWaiter;
2. 等待队列
每个ConditionObject 都包含一个FIFO队列,队列中的节点类型是AQS的内部类——Node类,每个节点包含着一个线程引用,该线程就是在该Condition对象上等待的线程。与CLH同步队列不同的是,Condition的等待队列是单向队列,即每个节点仅包含指向下一节点的引用,如下图所示
3. await方法解析
调用Condition的await()方法会使当前线程进入等待状态,同时会加入到Condition等待队列同时释放锁。await方法代码如下
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
我们来一步一步分析。首先进行响应中断的判断。如果没有中断异常抛出,则调用addConditionWaiter 方法将当前线程加入到等待队列中,方法如下
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
当前节点添加到等待队列成功后,会进行调用fullyRelease 方法完全释放锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
}
else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
释放锁成功后,会进行自旋,自旋中会不断判断节点是否在同步队列中,如果不在,如果不在,说明该线程还不具备竞争锁的资格,继续等待,直到检测到此节点在同步队列中,代码如下
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
我们先来看一下isOnSyncQueue 方法的实现
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null)
return true;
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
这里为啥要判断是否在CLH队列中呢?因为在执行通知操作时会将等待队列的节点转移到CLH队列中,表示这个节点可以被唤醒,任何在等待队列中需要被唤醒的节点都会在进入CLH队列中进行排队等待获取锁。至此是当前线程被挂起的所有流程,当其他线程通知该线程唤醒的时候,就会继续执行下去,剩下的代码是三个if判断,代码如下所示
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
-
第一个if判断:它调用acquireQueued 方法,该方法是一个自旋的过程,即每个线程进入同步队列中,都会自旋地观察自己是否满足条件且获取到同步状态,则就可以从自旋过程中退出,否则继续自旋下去(代码细节已经在之前AQS文章中详解过,这里不再赘述,详情参考文章:深入理解AQS实现原理),返回结果表示当前是否中断过,同时再判断interruptMode 是否不是THROW_IE ,如果不是则会将interruptMode 设置为REINTERRUPT ,即等待结束后自我中断。 -
第二个if判断:如果node的下一个等待者不为空,会清除所有状态为cancelled的节点。 -
第三个if判断:根据中断模式进行中断操作 private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
至此await的整个流程就走完了,我们来总结一下await的执行流程,前提获取到锁(这里忽略中断的细节操作)
- 1)将当前线程包装成节点插入到Condition的等待队列的队尾
- 2)释放锁
- 3)自旋挂起,当其他线程唤醒当前线程的时候,退出条件为节点已经被移到CLH队列中或当前节点对应的线程发生了中断,否则继续挂起
- 4)获取锁,进行后续的操作
4. signal方法解析
调用Condition 的signal() 方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到CLH同步队列中。代码如下
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
调用该方法需要判断当前线程是否获取了锁,否则直接抛出异常,紧接着调用方法doSignal 唤醒队列中的头结点,代码如下
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
整个signal 的流程较为简单,我们来总结一下
- 判断当前线程是否是持有锁的线程,如果不是则抛出异常,否则继续执行
- 将Condition对应的等待队列的头结点移到到CLH队列中去
5. 总结
总的来说,Condition的实现就是在其中新增一个等待队列。当调用 await方法,就会将当前线程放入到这个等待队列的队尾,同时当前线程挂起,等待其他线程唤醒。当其他线程调用 signal 方法时,就会从 等待队列中取出一个线程并插入到CLH队列中,之后就是常规的AQS获取锁流程。
其余方法如
public final void awaitUninterruptibly(){...}
public final long awaitNanos(long nanosTimeout){...}
public final boolean await(long time, TimeUnit unit){...}
public final boolean awaitUntil(Date deadline){...}
public final void signalAll(){...}
这些方法的逻辑与await和signal基本差不多,只是增加了一些中断的忽略和超时判断,篇幅有限,这里就不赘述了,感兴趣的小伙伴可以自行研究。
|