Phaser
可重用的同步屏障,功能类似于 CyclicBarrier 和 CountDownLatch,但支持更灵活的使用。
主要机制
Registration(注册机制)
与其他屏障的情况不同,在Phaser上注册同步的线程数量可能会随着时间而变化。 可以随时注册任务(使用register、bulkRegister或建立初始所需线程数量的构造函数),并且可以选择在任何到达时取消注册(使用arriveAndDeregister)。 与大多数基本同步结构一样,注册和注销仅影响内部计数; 他们没有建立任何进一步的内部簿记,因此任务无法查询它们是否已注册。 (但是,您可以通过子类化此类来引入此类簿记。)
动态性-支持屏障所需线程的数量
Synchronization(同步机制)
像CyclicBarrier一样,Phaser可能会被反复等待。 方法arriveAndAwaitAdvance具有类似于CyclicBarrier.await的效果。 每一代Phaser都有一个相关的阶段编号(phase)。 阶段编号从零开始,当所有线程到达屏障时前进,在达到Integer.MAX_VALUE后重置0。 阶段编号的使用可以通过任何注册方可以调用的两种方法,在到达阶段和等待其他阶段时独立控制操作:
重入性-一个Phaser可以支持多个阶段同步操作
Arrival(到达机制)
方法arrive和到arriveAndDeregister记录到达。 这些方法不会阻塞,而是返回相关的到达阶段编号; 即,到达应用的Phaser的相位编号。 当给定阶段的最后一方到达时,将执行一个可选操作并且该阶段前进。 这些动作由触发阶段推进的一方执行,并通过覆盖方法onAdvance(int, int) 进行安排,该方法也支持控制终止。 覆盖此方法与向CyclicBarrier提供屏障操作类似,但更灵活。
Waiting(等待机制)
方法awaitAdvance需要一个指示到达阶段编号的参数,并在Phaser前进到(或已经处于)不同阶段时返回。 与使用CyclicBarrier的类似构造不同,即使等待线程被中断,方法awaitAdvance也会继续等待。 可中断和超时版本也可用,但在任务等待中断或超时时遇到的异常不会改变Phaser的状态。 如有必要,您可以在这些异常的处理程序中执行任何相关的恢复,通常是在调用forceTermination之后。 在ForkJoinPool中执行的任务也可以使用Phaser,这将确保在其他人被阻塞等待阶段推进时执行任务有足够的并行性。
Termination(终止机制)
Phaser可以进入终止状态,可以使用方法isTerminated进行检查。 终止后,所有同步方法立即返回,无需等待提前,返回值为负数。 同样,在终止时尝试注册也无效。 当onAdvance的调用返回 true 时触发终止。 如果取消注册导致注册方的数量变为零,则默认实现返回 true。 如下图所示,当Phaser控制具有固定迭代次数的动作时,通常可以方便地覆盖此方法以在当前阶段数达到阈值时导致终止。 方法forceTermination也可用于突然释放等待线程并允许它们终止。
Tiering(分层结构)
Phaser可以分层(即以树型结构构建)以减少争用。 可以改为设置具有大量参与方的Phaser,否则这些Phaser将经历沉重的同步争用成本,以便子Phaser组共享一个共同的父级。 这可能会大大增加吞吐量,即使它会产生更大的每次操作开销。
在分层Phaser树中,自动管理子移相器与其父级的注册和注销。 每当子移相器的注册方数量变为非零时(如在Phaser(Phaser, int)构造函数、register或bulkRegister中建立的那样),子Phaser将向其父移相器注册。 每当注册方的数量由于调用到达和取消注册而变为零时,子Phaser就会从其父Phaser中注销。
层次性-将多个Phaser树形结构组织起来,通过牺牲操作的开销增加吞吐量。
Monitoring(监控)
虽然同步方法只能由注册方调用,但Phaser的当前状态可以由任何调用者监视。 在任何给定时刻,总共有getRegisteredParties方,其中getArrivedParties已到达当前阶段 (getPhase)。 当剩余的 (getUnarrivedParties) 方到达时,阶段前进。 这些方法返回的值可能反映瞬态状态,因此通常对同步控制没有用处。 方法toString以一种便于非正式监控的形式返回这些状态查询的快照。
组成
内部类QNode
static final class QNode implements ForkJoinPool.ManagedBlocker {
final Phaser phaser;
final int phase;
final boolean interruptible;
final boolean timed;
boolean wasInterrupted;
long nanos;
final long deadline;
volatile Thread thread;
QNode next;
QNode(Phaser phaser, int phase, boolean interruptible,
boolean timed, long nanos) {
this.phaser = phaser;
this.phase = phase;
this.interruptible = interruptible;
this.nanos = nanos;
this.timed = timed;
this.deadline = timed ? System.nanoTime() + nanos : 0L;
thread = Thread.currentThread();
}
public boolean isReleasable() {
if (thread == null)
return true;
if (phaser.getPhase() != phase) {
thread = null;
return true;
}
if (Thread.interrupted())
wasInterrupted = true;
if (wasInterrupted && interruptible) {
thread = null;
return true;
}
if (timed) {
if (nanos > 0L) {
nanos = deadline - System.nanoTime();
}
if (nanos <= 0L) {
thread = null;
return true;
}
}
return false;
}
public boolean block() {
if (isReleasable())
return true;
else if (!timed)
LockSupport.park(this);
else if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
return isReleasable();
}
}
成员变量
private volatile long state;
private final Phaser parent;
private final Phaser root;
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;
state状态位的含义,如下图  所有状态更新都通过 CAS 执行,除了子相位器的初始注册(即具有非 null 父级的子相位器)。在这种(相对罕见的)情况下,我们在首次向其父级注册时使用内置同步来锁定。子相位器的相位可以滞后于其祖先的相位,直到它被实际访问——参见方法 reconcileState。
在Phaser中使用奇偶两个单向链表来实现阻塞队列,降低操作的冲突。
构造函数
使用给定的父级和已注册的未到达方数量创建一个新的移相器。 当给定的父节点不为空并且给定的参与方数量大于零时,此子移相器将向其父节点注册。
public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if (parent != null) {
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}
核心方法
| 方法名 | 描述 |
|---|
| int register() | 向此移相器添加一个新的未到方。 如果正在进行的onAdvance调用正在进行中,则此方法可能会在返回之前等待其完成。 如果此移相器有父移相器,并且此移相器之前没有注册方,则此子移相器也向其父移相器注册。 如果此移相器终止,则注册尝试无效,并返回负值。 | | int bulkRegister(int parties) | 向此移相器批量添加新的未到方。 如果正在进行的 onAdvance 调用正在进行中,则此方法可能会在返回之前等待其完成。 如果此移相器有父级,并且给定的参与方数量大于零,并且此移相器之前没有注册方,则此子移相器也向其父级注册。 如果此移相器终止,则注册尝试无效,并返回负值。 | | int arrive() | 当前线程到达此移相器,无需等待其他人到达。返回arrival phase number。 未注册方调用此方法属于使用错误。 但是,此错误可能仅在此移相器上的某些后续操作(如果有)时导致 IllegalStateException。 | | int arriveAndDeregister() | 到达此移相器并从中注销,而无需等待其他人到达。返回arrival phase number。 取消注册减少了在未来阶段推进所需的参与方数量。 如果此移相器有父级,并且注销导致此移相器的参与方为零,则此移相器也会从其父级中注销。 未注册方调用此方法属于使用错误。 但是,此错误可能仅在此移相器上的某些后续操作(如果有)时导致 IllegalStateException。 | | int arriveAndAwaitAdvance() | 到达此移相器并等待其他移相器。 等效于awaitAdvance(arrive())。 如果您需要等待中断或超时,您可以使用awaitAdvance方法的其他形式之一通过类似的结构来安排它。 如果您需要在抵达时取消注册,请使用awaitAdvance(arriveAndDeregister())。 未注册方调用此方法属于使用错误。 但是,此错误可能仅在此移相器上的某些后续操作(如果有)时导致 IllegalStateException。 | | int awaitAdvance(int phase) | 等待此移相器的相位从给定的相位值前进,等待给定phase数,返回下一个 arrival phase number 如果当前相位不等于给定的相位值或此移相器终止,则立即返回。 | | int awaitAdvanceInterruptibly(int phase) throws InterruptedException | 等待此移相器的阶段从给定的阶段值前进,等待给定phase数,返回下一个 arrival phase number,响应中断 如果在等待期间被中断,则抛出 InterruptedException, 如果当前阶段不等于给定的阶段值或此移相器终止,则立即返回。 | | int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException | 等待此移相器的阶段从给定的阶段值或给定的超时时间过去, 如果在等待期间中断,则抛出 InterruptedException, 如果当前阶段不等于给定的阶段值或此移相器终止,则立即返回。 | | void forceTermination() | 强制此移相器进入终止状态。 注册方的数量不受影响。 如果此移相器是分层移相器集的成员,则该集中的所有移相器都将终止。 如果此移相器已终止,则此方法无效。 此方法可用于在一个或多个任务遇到意外异常后协调恢复。 | | int getPhase() | 返回当前阶段编号。 最大阶段数是 Integer.MAX_VALUE,之后它会从零重新开始。 终止时,阶段编号为负数,在这种情况下,可以通过 getPhase() + Integer.MIN_VALUE 获得终止之前的主要阶段。 | | int getRegisteredParties() | 返回在此移相器上注册的参与方数量。 | | int getArrivedParties() | 返回已到达此移相器当前阶段的已注册方的数量。 如果此移相器已终止,则返回的值是无意义的和任意的。 | | int getUnarrivedParties() | 返回尚未到达此移相器当前阶段的已注册方的数量。 如果此移相器已终止,则返回的值是无意义的和任意的。 | | Phaser getParent() | 返回此移相器的父级,如果没有则返回 null。 | | Phaser getRoot() | 返回此移相器的根祖先,如果它没有父移相器,则与此移相器相同。 | | boolean isTerminated() | 如果此移相器已终止,则返回 true。 | | boolean onAdvance(int phase, int registeredParties) | 在即将到来的相位超前时执行操作并控制终止的可覆盖方法。 此方法在推进此移相器的一方到达时调用(当所有其他等待方都处于休眠状态时)。 如果此方法返回 true,则此移相器将在前进时设置为最终终止状态,随后对 isTerminated 的调用将返回 true。 调用此方法引发的任何(未经检查的)异常或错误都会传播到试图推进此移相器的一方,在这种情况下不会发生推进。 此方法的参数提供了当前转换占主导地位的移相器的状态。从 onAdvance 内部调用此移相器的到达、注册和等待方法的效果是未指定的,不应依赖。 如果此移相器是一组分层移相器的成员,则仅在每次前进时为其根移相器调用 onAdvance。 为了支持最常见的用例,当注册方的数量由于一方调用到达和取消注册而变为零时,此方法的默认实现返回 true。您可以通过覆盖此方法以始终返回 false 来禁用此行为,从而在未来注册时继续进行: Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int parties) { return false; } } |
方法说明
register()与bulkRegister(int parties)在验证参与线程数之后,核心都是调用int doRegister(int registrations)实现。
int doRegister(int registrations)
private int doRegister(int registrations) {
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
for (;;) {
long s = (parent == null) ? state : reconcileState();
int counts = (int)s;
int parties = counts >>> PARTIES_SHIFT;
int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
break;
if (counts != EMPTY) {
if (parent == null || reconcileState() == s) {
if (unarrived == 0)
root.internalAwaitAdvance(phase, null);
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adjust))
break;
}
}
else if (parent == null) {
long next = ((long)phase << PHASE_SHIFT) | adjust;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
else {
synchronized (this) {
if (state == s) {
phase = parent.doRegister(1);
if (phase < 0)
break;
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
}
break;
}
}
}
}
return phase;
}
reconcileState()
协调状态
private long reconcileState() {
final Phaser root = this.root;
long s = state;
if (root != this) {
int phase, p;
while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
(int)(s >>> PHASE_SHIFT) &&
!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
s = (((long)phase << PHASE_SHIFT) |
((phase < 0) ? (s & COUNTS_MASK) :
(((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
((s & PARTIES_MASK) | p))))))
s = state;
}
return s;
}
internalAwaitAdvance(int phase, QNode node)
阻塞等待phase到下一代
private int internalAwaitAdvance(int phase, QNode node) {
releaseWaiters(phase-1);
boolean queued = false;
int lastUnarrived = 0;
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) {
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) {
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
else if (node.isReleasable())
break;
else if (!queued) {
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase)
queued = head.compareAndSet(q, node);
}
else {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null;
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase);
}
releaseWaiters(phase);
return p;
}
releaseWaiters(int phase)
private void releaseWaiters(int phase) {
QNode q;
Thread t;
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
arrive()和arriveAndDeregister() 都是表示到达了变相器,都不需要等他其他线程到达后,才能继续执行。主要由doArrive(int adjust)实现。
doArrive(int adjust)
private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
if (unarrived == 1) {
long n = s & PARTIES_MASK;
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
releaseWaiters(phase);
}
else if (nextUnarrived == 0) {
phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
else
phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
}
}
arriveAndAwaitAdvance()
到达等待推进
public int arriveAndAwaitAdvance() {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) {
if (unarrived > 1)
return root.internalAwaitAdvance(phase, null);
if (root != this)
return parent.arriveAndAwaitAdvance();
long n = s & PARTIES_MASK;
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
return (int)(state >>> PHASE_SHIFT);
releaseWaiters(phase);
return nextPhase;
}
}
}
int awaitAdvance(int phase)
public int awaitAdvance(int phase) {
final Phaser root = this.root;、
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
return root.internalAwaitAdvance(phase, null);
return p;
}
forceTermination()
public void forceTermination() {
final Phaser root = this.root;
long s;
while ((s = root.state) >= 0) {
if (UNSAFE.compareAndSwapLong(root, stateOffset,
s, s | TERMINATION_BIT)) {
releaseWaiters(0);
releaseWaiters(1);
return;
}
}
}
案例
动态修改线程数+重入性(单个Phaser多个阶段)
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class PhaserDemo {
private static Random random = new Random(System.currentTimeMillis());
public static void main(String[] args) {
Phaser phaser = new Phaser(5);
ExecutorService e = Executors.newFixedThreadPool(5);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "go to phase=" + phaser.getPhase() + ".");
for (int i = 0; i < 4; i++) {
e.submit(() -> {
try {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " start.");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " end.");
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
}
phaser.arriveAndAwaitAdvance();
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "go to phase=" + phaser.getPhase() + ".");
phaser.register();
for (int i = 0; i < 5; i++) {
e.submit(() -> {
try {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " start.");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " end.");
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
}
phaser.arriveAndAwaitAdvance();
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "now phase=" + phaser.getPhase() + ", exit.");
phaser.forceTermination();
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "Phaser Termination Status is " + phaser.isTerminated() + ", exit.");
e.shutdown();
}
}
执行结果
[17:58:26-main] go to phase=0.
[17:58:26-pool-1-thread-1] phase=0 start.
[17:58:26-pool-1-thread-2] phase=0 start.
[17:58:26-pool-1-thread-4] phase=0 start.
[17:58:26-pool-1-thread-3] phase=0 start.
[17:58:26-pool-1-thread-4] phase=0 end.
[17:58:26-pool-1-thread-3] phase=0 end.
[17:58:28-pool-1-thread-1] phase=0 end.
[17:58:30-pool-1-thread-2] phase=0 end.
[17:58:30-main] go to phase=1.
[17:58:30-pool-1-thread-4] phase=1 start.
[17:58:30-pool-1-thread-3] phase=1 start.
[17:58:30-pool-1-thread-1] phase=1 start.
[17:58:30-pool-1-thread-2] phase=1 start.
[17:58:30-pool-1-thread-2] phase=1 end.
[17:58:30-pool-1-thread-5] phase=1 start.
[17:58:31-pool-1-thread-4] phase=1 end.
[17:58:32-pool-1-thread-5] phase=1 end.
[17:58:33-pool-1-thread-1] phase=1 end.
[17:58:34-pool-1-thread-3] phase=1 end.
[17:58:34-main] now phase=2, exit.
[17:58:34-main] Phaser Termination Status is true, exit.
Phaser主要用来解决什么问题?
Phaser与CyclicBarrier和CountDownLatch的区别是什么?
如果用CountDownLatch来实现Phaser的功能应该怎么实现?
Phaser运行机制是什么样的?
|