Phaser
可重用的同步屏障,功能类似于 CyclicBarrier 和 CountDownLatch,但支持更灵活的使用。
主要机制
Registration(注册机制)
与其他屏障的情况不同,在Phaser 上注册同步的线程数量可能会随着时间而变化。 可以随时注册任务(使用register 、bulkRegister 或建立初始所需线程数量的构造函数),并且可以选择在任何到达时取消注册(使用arriveAndDeregister )。 与大多数基本同步结构一样,注册和注销仅影响内部计数; 他们没有建立任何进一步的内部簿记,因此任务无法查询它们是否已注册。 (但是,您可以通过子类化此类来引入此类簿记。)
动态性-支持屏障所需线程的数量
Synchronization(同步机制)
像CyclicBarrier 一样,Phaser 可能会被反复等待。 方法arriveAndAwaitAdvance 具有类似于CyclicBarrier.await 的效果。 每一代Phaser都有一个相关的阶段编号(phase)。 阶段编号从零开始,当所有线程到达屏障时前进,在达到Integer.MAX_VALUE 后重置0。 阶段编号的使用可以通过任何注册方可以调用的两种方法,在到达阶段和等待其他阶段时独立控制操作:
重入性-一个Phaser可以支持多个阶段同步操作
Arrival(到达机制)
方法arrive 和到arriveAndDeregister 记录到达。 这些方法不会阻塞,而是返回相关的到达阶段编号; 即,到达应用的Phaser 的相位编号。 当给定阶段的最后一方到达时,将执行一个可选操作并且该阶段前进。 这些动作由触发阶段推进的一方执行,并通过覆盖方法onAdvance(int, int) 进行安排,该方法也支持控制终止。 覆盖此方法与向CyclicBarrier 提供屏障操作类似,但更灵活。
Waiting(等待机制)
方法awaitAdvance 需要一个指示到达阶段编号的参数,并在Phaser 前进到(或已经处于)不同阶段时返回。 与使用CyclicBarrier 的类似构造不同,即使等待线程被中断,方法awaitAdvance 也会继续等待。 可中断和超时版本也可用,但在任务等待中断或超时时遇到的异常不会改变Phaser 的状态。 如有必要,您可以在这些异常的处理程序中执行任何相关的恢复,通常是在调用forceTermination 之后。 在ForkJoinPool 中执行的任务也可以使用Phaser ,这将确保在其他人被阻塞等待阶段推进时执行任务有足够的并行性。
Termination(终止机制)
Phaser 可以进入终止状态,可以使用方法isTerminated 进行检查。 终止后,所有同步方法立即返回,无需等待提前,返回值为负数。 同样,在终止时尝试注册也无效。 当onAdvance 的调用返回 true 时触发终止。 如果取消注册导致注册方的数量变为零,则默认实现返回 true。 如下图所示,当Phaser 控制具有固定迭代次数的动作时,通常可以方便地覆盖此方法以在当前阶段数达到阈值时导致终止。 方法forceTermination 也可用于突然释放等待线程并允许它们终止。
Tiering(分层结构)
Phaser 可以分层(即以树型结构构建)以减少争用。 可以改为设置具有大量参与方的Phaser ,否则这些Phaser 将经历沉重的同步争用成本,以便子Phaser 组共享一个共同的父级。 这可能会大大增加吞吐量,即使它会产生更大的每次操作开销。
在分层Phaser 树中,自动管理子移相器与其父级的注册和注销。 每当子移相器的注册方数量变为非零时(如在Phaser(Phaser, int) 构造函数、register 或bulkRegister 中建立的那样),子Phaser 将向其父移相器注册。 每当注册方的数量由于调用到达和取消注册而变为零时,子Phaser 就会从其父Phaser 中注销。
层次性-将多个Phaser 树形结构组织起来,通过牺牲操作的开销增加吞吐量。
Monitoring(监控)
虽然同步方法只能由注册方调用,但Phaser 的当前状态可以由任何调用者监视。 在任何给定时刻,总共有getRegisteredParties 方,其中getArrivedParties 已到达当前阶段 (getPhase )。 当剩余的 (getUnarrivedParties ) 方到达时,阶段前进。 这些方法返回的值可能反映瞬态状态,因此通常对同步控制没有用处。 方法toString 以一种便于非正式监控的形式返回这些状态查询的快照。
组成
内部类QNode
static final class QNode implements ForkJoinPool.ManagedBlocker {
final Phaser phaser;
final int phase;
final boolean interruptible;
final boolean timed;
boolean wasInterrupted;
long nanos;
final long deadline;
volatile Thread thread;
QNode next;
QNode(Phaser phaser, int phase, boolean interruptible,
boolean timed, long nanos) {
this.phaser = phaser;
this.phase = phase;
this.interruptible = interruptible;
this.nanos = nanos;
this.timed = timed;
this.deadline = timed ? System.nanoTime() + nanos : 0L;
thread = Thread.currentThread();
}
public boolean isReleasable() {
if (thread == null)
return true;
if (phaser.getPhase() != phase) {
thread = null;
return true;
}
if (Thread.interrupted())
wasInterrupted = true;
if (wasInterrupted && interruptible) {
thread = null;
return true;
}
if (timed) {
if (nanos > 0L) {
nanos = deadline - System.nanoTime();
}
if (nanos <= 0L) {
thread = null;
return true;
}
}
return false;
}
public boolean block() {
if (isReleasable())
return true;
else if (!timed)
LockSupport.park(this);
else if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
return isReleasable();
}
}
成员变量
private volatile long state;
private final Phaser parent;
private final Phaser root;
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;
state状态位的含义,如下图 所有状态更新都通过 CAS 执行,除了子相位器的初始注册(即具有非 null 父级的子相位器)。在这种(相对罕见的)情况下,我们在首次向其父级注册时使用内置同步来锁定。子相位器的相位可以滞后于其祖先的相位,直到它被实际访问——参见方法 reconcileState。
在Phaser 中使用奇偶两个单向链表来实现阻塞队列,降低操作的冲突。
构造函数
使用给定的父级和已注册的未到达方数量创建一个新的移相器。 当给定的父节点不为空并且给定的参与方数量大于零时,此子移相器将向其父节点注册。
public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if (parent != null) {
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}
核心方法
方法名 | 描述 |
---|
int register() | 向此移相器添加一个新的未到方。 如果正在进行的onAdvance 调用正在进行中,则此方法可能会在返回之前等待其完成。 如果此移相器有父移相器,并且此移相器之前没有注册方,则此子移相器也向其父移相器注册。 如果此移相器终止,则注册尝试无效,并返回负值。 | int bulkRegister(int parties) | 向此移相器批量添加新的未到方。 如果正在进行的 onAdvance 调用正在进行中,则此方法可能会在返回之前等待其完成。 如果此移相器有父级,并且给定的参与方数量大于零,并且此移相器之前没有注册方,则此子移相器也向其父级注册。 如果此移相器终止,则注册尝试无效,并返回负值。 | int arrive() | 当前线程到达此移相器,无需等待其他人到达。返回arrival phase number。 未注册方调用此方法属于使用错误。 但是,此错误可能仅在此移相器上的某些后续操作(如果有)时导致 IllegalStateException。 | int arriveAndDeregister() | 到达此移相器并从中注销,而无需等待其他人到达。返回arrival phase number。 取消注册减少了在未来阶段推进所需的参与方数量。 如果此移相器有父级,并且注销导致此移相器的参与方为零,则此移相器也会从其父级中注销。 未注册方调用此方法属于使用错误。 但是,此错误可能仅在此移相器上的某些后续操作(如果有)时导致 IllegalStateException。 | int arriveAndAwaitAdvance() | 到达此移相器并等待其他移相器。 等效于awaitAdvance(arrive()) 。 如果您需要等待中断或超时,您可以使用awaitAdvance 方法的其他形式之一通过类似的结构来安排它。 如果您需要在抵达时取消注册,请使用awaitAdvance(arriveAndDeregister()) 。 未注册方调用此方法属于使用错误。 但是,此错误可能仅在此移相器上的某些后续操作(如果有)时导致 IllegalStateException。 | int awaitAdvance(int phase) | 等待此移相器的相位从给定的相位值前进,等待给定phase数,返回下一个 arrival phase number 如果当前相位不等于给定的相位值或此移相器终止,则立即返回。 | int awaitAdvanceInterruptibly(int phase) throws InterruptedException | 等待此移相器的阶段从给定的阶段值前进,等待给定phase数,返回下一个 arrival phase number,响应中断 如果在等待期间被中断,则抛出 InterruptedException, 如果当前阶段不等于给定的阶段值或此移相器终止,则立即返回。 | int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException | 等待此移相器的阶段从给定的阶段值或给定的超时时间过去, 如果在等待期间中断,则抛出 InterruptedException, 如果当前阶段不等于给定的阶段值或此移相器终止,则立即返回。 | void forceTermination() | 强制此移相器进入终止状态。 注册方的数量不受影响。 如果此移相器是分层移相器集的成员,则该集中的所有移相器都将终止。 如果此移相器已终止,则此方法无效。 此方法可用于在一个或多个任务遇到意外异常后协调恢复。 | int getPhase() | 返回当前阶段编号。 最大阶段数是 Integer.MAX_VALUE,之后它会从零重新开始。 终止时,阶段编号为负数,在这种情况下,可以通过 getPhase() + Integer.MIN_VALUE 获得终止之前的主要阶段。 | int getRegisteredParties() | 返回在此移相器上注册的参与方数量。 | int getArrivedParties() | 返回已到达此移相器当前阶段的已注册方的数量。 如果此移相器已终止,则返回的值是无意义的和任意的。 | int getUnarrivedParties() | 返回尚未到达此移相器当前阶段的已注册方的数量。 如果此移相器已终止,则返回的值是无意义的和任意的。 | Phaser getParent() | 返回此移相器的父级,如果没有则返回 null。 | Phaser getRoot() | 返回此移相器的根祖先,如果它没有父移相器,则与此移相器相同。 | boolean isTerminated() | 如果此移相器已终止,则返回 true。 | boolean onAdvance(int phase, int registeredParties) | 在即将到来的相位超前时执行操作并控制终止的可覆盖方法。 此方法在推进此移相器的一方到达时调用(当所有其他等待方都处于休眠状态时)。 如果此方法返回 true,则此移相器将在前进时设置为最终终止状态,随后对 isTerminated 的调用将返回 true。 调用此方法引发的任何(未经检查的)异常或错误都会传播到试图推进此移相器的一方,在这种情况下不会发生推进。 此方法的参数提供了当前转换占主导地位的移相器的状态。从 onAdvance 内部调用此移相器的到达、注册和等待方法的效果是未指定的,不应依赖。 如果此移相器是一组分层移相器的成员,则仅在每次前进时为其根移相器调用 onAdvance。 为了支持最常见的用例,当注册方的数量由于一方调用到达和取消注册而变为零时,此方法的默认实现返回 true。您可以通过覆盖此方法以始终返回 false 来禁用此行为,从而在未来注册时继续进行: Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int parties) { return false; } } |
方法说明
register() 与bulkRegister(int parties) 在验证参与线程数之后,核心都是调用int doRegister(int registrations) 实现。
int doRegister(int registrations)
private int doRegister(int registrations) {
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
for (;;) {
long s = (parent == null) ? state : reconcileState();
int counts = (int)s;
int parties = counts >>> PARTIES_SHIFT;
int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
break;
if (counts != EMPTY) {
if (parent == null || reconcileState() == s) {
if (unarrived == 0)
root.internalAwaitAdvance(phase, null);
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adjust))
break;
}
}
else if (parent == null) {
long next = ((long)phase << PHASE_SHIFT) | adjust;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
else {
synchronized (this) {
if (state == s) {
phase = parent.doRegister(1);
if (phase < 0)
break;
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
}
break;
}
}
}
}
return phase;
}
reconcileState()
协调状态
private long reconcileState() {
final Phaser root = this.root;
long s = state;
if (root != this) {
int phase, p;
while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
(int)(s >>> PHASE_SHIFT) &&
!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
s = (((long)phase << PHASE_SHIFT) |
((phase < 0) ? (s & COUNTS_MASK) :
(((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
((s & PARTIES_MASK) | p))))))
s = state;
}
return s;
}
internalAwaitAdvance(int phase, QNode node)
阻塞等待phase到下一代
private int internalAwaitAdvance(int phase, QNode node) {
releaseWaiters(phase-1);
boolean queued = false;
int lastUnarrived = 0;
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) {
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) {
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
else if (node.isReleasable())
break;
else if (!queued) {
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase)
queued = head.compareAndSet(q, node);
}
else {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null;
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase);
}
releaseWaiters(phase);
return p;
}
releaseWaiters(int phase)
private void releaseWaiters(int phase) {
QNode q;
Thread t;
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
arrive() 和arriveAndDeregister() 都是表示到达了变相器,都不需要等他其他线程到达后,才能继续执行。主要由doArrive(int adjust) 实现。
doArrive(int adjust)
private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
if (unarrived == 1) {
long n = s & PARTIES_MASK;
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
releaseWaiters(phase);
}
else if (nextUnarrived == 0) {
phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
else
phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
}
}
arriveAndAwaitAdvance()
到达等待推进
public int arriveAndAwaitAdvance() {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) {
if (unarrived > 1)
return root.internalAwaitAdvance(phase, null);
if (root != this)
return parent.arriveAndAwaitAdvance();
long n = s & PARTIES_MASK;
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
return (int)(state >>> PHASE_SHIFT);
releaseWaiters(phase);
return nextPhase;
}
}
}
int awaitAdvance(int phase)
public int awaitAdvance(int phase) {
final Phaser root = this.root;、
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
return root.internalAwaitAdvance(phase, null);
return p;
}
forceTermination()
public void forceTermination() {
final Phaser root = this.root;
long s;
while ((s = root.state) >= 0) {
if (UNSAFE.compareAndSwapLong(root, stateOffset,
s, s | TERMINATION_BIT)) {
releaseWaiters(0);
releaseWaiters(1);
return;
}
}
}
案例
动态修改线程数+重入性(单个Phaser多个阶段)
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class PhaserDemo {
private static Random random = new Random(System.currentTimeMillis());
public static void main(String[] args) {
Phaser phaser = new Phaser(5);
ExecutorService e = Executors.newFixedThreadPool(5);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "go to phase=" + phaser.getPhase() + ".");
for (int i = 0; i < 4; i++) {
e.submit(() -> {
try {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " start.");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " end.");
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
}
phaser.arriveAndAwaitAdvance();
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "go to phase=" + phaser.getPhase() + ".");
phaser.register();
for (int i = 0; i < 5; i++) {
e.submit(() -> {
try {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " start.");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " end.");
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
}
phaser.arriveAndAwaitAdvance();
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "now phase=" + phaser.getPhase() + ", exit.");
phaser.forceTermination();
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "Phaser Termination Status is " + phaser.isTerminated() + ", exit.");
e.shutdown();
}
}
执行结果
[17:58:26-main] go to phase=0.
[17:58:26-pool-1-thread-1] phase=0 start.
[17:58:26-pool-1-thread-2] phase=0 start.
[17:58:26-pool-1-thread-4] phase=0 start.
[17:58:26-pool-1-thread-3] phase=0 start.
[17:58:26-pool-1-thread-4] phase=0 end.
[17:58:26-pool-1-thread-3] phase=0 end.
[17:58:28-pool-1-thread-1] phase=0 end.
[17:58:30-pool-1-thread-2] phase=0 end.
[17:58:30-main] go to phase=1.
[17:58:30-pool-1-thread-4] phase=1 start.
[17:58:30-pool-1-thread-3] phase=1 start.
[17:58:30-pool-1-thread-1] phase=1 start.
[17:58:30-pool-1-thread-2] phase=1 start.
[17:58:30-pool-1-thread-2] phase=1 end.
[17:58:30-pool-1-thread-5] phase=1 start.
[17:58:31-pool-1-thread-4] phase=1 end.
[17:58:32-pool-1-thread-5] phase=1 end.
[17:58:33-pool-1-thread-1] phase=1 end.
[17:58:34-pool-1-thread-3] phase=1 end.
[17:58:34-main] now phase=2, exit.
[17:58:34-main] Phaser Termination Status is true, exit.
Phaser主要用来解决什么问题?
Phaser与CyclicBarrier和CountDownLatch的区别是什么?
如果用CountDownLatch来实现Phaser的功能应该怎么实现?
Phaser运行机制是什么样的?
|