一 概述
Phaser 是 JDK1.7 推出的,一个可重用的同步障碍,与 CyclicBarrier,CountDownLatch 功能类似,但是它支持更灵活的用法。
先简单说明这个类的作用。假设有一个大工程,可以分为多个阶段,每一个阶段有多个人参与,并且每一个阶段需要参与的所有人都完成这个阶段的事情,才可以进入下一个阶段,然后所有人又继续做下一个阶段的事,直到所有阶段都完成,当然这途中每个人都可以随时退出,整个工程也可以中途终止。
二 Phaser 的使用
例如某天陈皮约小美,小雪去他家吃饭。这个事情可以分为三个阶段,第一阶段去超市买食材,第二阶段炒菜,第三阶段吃饭,假设每一个阶段完成后才能继续下一个阶段。
首先定义阶段器类,继承 Phaser 重写 onAdvance 方法来对每一个阶段进行不同的操作。
package com.chenpi;
?
import java.util.concurrent.Phaser;
?
public class DiningPhaser extends Phaser {
?
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("第一阶段,买食材完成啦!总共参与人数:" +
registeredParties);
return false;
case 1:
System.out.println("第二阶段,炒菜完成啦!总共参与人数:" +
registeredParties);
return false;
case 2:
System.out.println("第三阶段,吃完饭啦!总共参与人数:" +
registeredParties);
return false;
default:
return true;
}
}
}
接下来定义参与的任务,编写每一个任务在每一个阶段需要干的事情。
定义陈皮的任务:
package com.chenpi;
?
import java.util.concurrent.Phaser;
?
public class ChenPiTask implements Runnable {
?
private Phaser phaser;
?
public ChenPiTask(Phaser phaser) {
this.phaser = phaser;
}
?
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +
" 买好猪肉了...");
phaser.arriveAndAwaitAdvance();
?
System.out.println(Thread.currentThread().getName() +
" 炒好猪肉了...");
phaser.arriveAndAwaitAdvance();
?
System.out.println(Thread.currentThread().getName() +
" 吃饱了...");
phaser.arriveAndAwaitAdvance();
}
}
定义小美的任务:
package com.chenpi;
?
import java.util.concurrent.Phaser;
?
public class XiaoMeiTask implements Runnable {
?
private Phaser phaser;
?
public XiaoMeiTask(Phaser phaser) {
this.phaser = phaser;
}
?
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +
" 买好白菜了...");
phaser.arriveAndAwaitAdvance();
?
System.out.println(Thread.currentThread().getName() +
" 炒好白菜了...");
phaser.arriveAndAwaitAdvance();
?
System.out.println(Thread.currentThread().getName() +
" 吃饱了...");
phaser.arriveAndAwaitAdvance();
}
}
定义小雪的任务:
package com.chenpi;
?
import java.util.concurrent.Phaser;
?
public class XiaoXueTask implements Runnable {
?
private Phaser phaser;
?
public XiaoXueTask(Phaser phaser) {
this.phaser = phaser;
}
?
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +
" 买好鲍鱼了...");
phaser.arriveAndAwaitAdvance();
?
System.out.println(Thread.currentThread().getName() +
" 炒好鲍鱼了...");
phaser.arriveAndAwaitAdvance();
?
System.out.println(Thread.currentThread().getName() +
" 吃饱了...");
phaser.arriveAndAwaitAdvance();
}
}
最后,编写测试类,进行测试验证。
package com.chenpi;
?
public class ChenPiMain {
?
public static void main(String[] args) {
?
DiningPhaser diningPhaser = new DiningPhaser();
diningPhaser.bulkRegister(3);
?
Thread thread1 = new Thread(new ChenPiTask(diningPhaser));
thread1.setName("陈皮");
thread1.start();
?
Thread thread2 = new Thread(new XiaoMeiTask(diningPhaser));
thread2.setName("小美");
thread2.start();
?
Thread thread3 = new Thread(new XiaoXueTask(diningPhaser));
thread3.setName("小雪");
thread3.start();
}
}
最后,启动服务,显示结果如下:
陈皮 买好猪肉了...
小美 买好白菜了...
小雪 买好鲍鱼了...
第一阶段,买食材完成啦...
小雪 炒好鲍鱼了...
陈皮 炒好猪肉了...
小美 炒好白菜了...
第二阶段,炒菜完成啦...
小美 吃饱了...
陈皮 吃饱了...
小雪 吃饱了...
第三阶段,吃完饭啦...
二 Phaser 详解
通过以上简单例子已知道 Phaser 的作用了。其实它的作用不止这些。
可以动态调整注册任务的数量(最大注册的任务数量为 65535)。任务可以在任何时间注册(register 和 bulkRegister 方法,或者以构造方法形式注册初始任务数量),也可以在任何到达时注销注册的任务(arriveAndDeregister 方法)。
package com.chenpi;
?
import java.util.concurrent.Phaser;
?
public class XiaoXueTask implements Runnable {
?
private Phaser phaser;
?
public XiaoXueTask(Phaser phaser) {
this.phaser = phaser;
}
?
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +
" 买好鲍鱼了...");
phaser.arriveAndAwaitAdvance();
?
System.out.println(Thread.currentThread().getName() +
" 炒好鲍鱼了...");
phaser.arriveAndDeregister();
System.out.println(Thread.currentThread().getName() +
"有事先走了");
}
}
我们修改小雪这个任务,她干完第二阶段的事情就有事先走了,即注销任务。结果如下:
陈皮 买好猪肉了...
小美 买好白菜了...
小雪 买好鲍鱼了...
第一阶段,买食材完成啦!总共参与人数:3
小雪 炒好鲍鱼了...
小雪有事先走了
陈皮 炒好猪肉了...
小美 炒好白菜了...
第二阶段,炒菜完成啦!总共参与人数:2
小美 吃饱了...
陈皮 吃饱了...
第三阶段,吃完饭啦!总共参与人数:2
注册和注销只影响内部计数,内部没有记录具体的注册任务,所以不能查询哪个任务是否已注册。但是我们可以编写 Phaser 的子类来实现记录注册的具体任务。
package com.chenpi;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Phaser;
public class DiningPhaser extends Phaser {
private List<Runnable> registeredTask = new ArrayList<>();
public int register(Runnable task) {
registeredTask.add(task);
return super.register();
}
public List<Runnable> getRegisteredTask() {
return registeredTask;
}
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("第一阶段,买食材完成啦!总共参与人数:" +
registeredParties);
return false;
case 1:
System.out.println("第二阶段,炒菜完成啦!总共参与人数:" +
registeredParties);
return false;
case 2:
System.out.println("第三阶段,吃完饭啦!总共参与人数:" +
registeredParties);
return false;
default:
return true;
}
}
}
测试类中,每一个任务单独注册并记录。
package com.chenpi;
public class ChenPiMain {
public static void main(String[] args) {
DiningPhaser diningPhaser = new DiningPhaser();
Thread thread1 = new Thread(new ChenPiTask(diningPhaser));
thread1.setName("陈皮");
diningPhaser.register(thread1);
thread1.start();
Thread thread2 = new Thread(new XiaoMeiTask(diningPhaser));
thread2.setName("小美");
diningPhaser.register(thread2);
thread2.start();
Thread thread3 = new Thread(new XiaoXueTask(diningPhaser));
thread3.setName("小雪");
diningPhaser.register(thread3);
thread3.start();
System.out.println("注册的任务:" +
diningPhaser.getRegisteredTask());
}
}
启动服务,打印了在阶段器中注册的任务。
陈皮 买好猪肉了...
注册的任务:[Thread[陈皮,5,main], Thread[小美,5,main], Thread[小雪,5,main]]
小美 买好白菜了...
小雪 买好鲍鱼了...
第一阶段,买食材完成啦!总共参与人数:3
小雪 炒好鲍鱼了...
陈皮 炒好猪肉了...
小美 炒好白菜了...
第二阶段,炒菜完成啦!总共参与人数:3
陈皮 吃饱了...
小雪 吃饱了...
小美 吃饱了...
第三阶段,吃完饭啦!总共参与人数:3
对于同步性质,像 CyclicBarrier 一样,Phaser 可以重复等待。Phaser 的 arriveAndAwaitAdvance 方法的作用类似于 CyclicBarrier 的 await 方法。
每一个 Phaser 对象都会关联一个阶段数。这个数从0开始,当所有注册的任务都到达每一个阶段的时候,这个数就递增一次。特别地,如果这个数到达 Integer.MAX_VALUE 后就会重新变回到 0。
arrive 和 arriveAndDeregister 方法记录到达,这 2 个方法不会阻塞,它们会返回已到达的阶段数。
arrive 方法表示当前任务已到达某个阶段,但是不会等待其他任务到达此阶段。arriveAndDeregister 方法表示当前任务已到达某个阶段,并且注销任务。
在每一个阶段中,当所有任务都到达的时候,onAdvance 方法会被最后一个触发阶段到达的任务执行,然后进入下一个阶段。onAdvance 方法可以控制一个 Phaser 的终止,如果我们的 Phaser 对象是继承 Phaser 的子类,可以重写 onAdvance 方法,在每一个阶段到达时这个方法就会被调用从而在每一个阶段做我们想做的事情。
package com.chenpi;
import java.util.concurrent.Phaser;
public class DiningPhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName() +
" 调用了onAdvance方法");
switch (phase) {
case 0:
System.out.println("第一阶段,买食材完成啦!总共参与人数:" +
registeredParties);
return false;
case 1:
System.out.println("第二阶段,炒菜完成啦!总共参与人数:" +
registeredParties);
return false;
case 2:
System.out.println("第三阶段,吃完饭啦!总共参与人数:" +
registeredParties);
return false;
default:
return true;
}
}
}
我们在 onAdvance 方法中打印当前线程,结果表明确实是最后一个触发阶段到达的任务执行 onAdvance 方法,如下:
陈皮 买好猪肉了...
小美 买好白菜了...
小雪 买好鲍鱼了...
小雪 调用了onAdvance方法
第一阶段,买食材完成啦!总共参与人数:3
小雪 炒好鲍鱼了...
陈皮 炒好猪肉了...
小美 炒好白菜了...
小美 调用了onAdvance方法
第二阶段,炒菜完成啦!总共参与人数:3
小美 吃饱了...
陈皮 吃饱了...
小雪 吃饱了...
小雪 调用了onAdvance方法
第三阶段,吃完饭啦!总共参与人数:3
phaser 可以随时被终止。当终止时,所有同步方法(例如 arriveAndAwaitAdvance )会立即返回而不用阻塞等待,并且返回一个负数。同样地,被终止后无法再注册任务。isTerminated 方法可以判断是否已经终止。
phaser 可以在 onAdvance 方法中返回 true 来达到终止的效果。例如我们继承 Phaser 编写的子类可以重写此方法,当到达某个阶段后返回 true 来终止阶段器。
package com.chenpi;
import java.util.concurrent.Phaser;
public class DiningPhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName() +
" 调用了onAdvance方法");
switch (phase) {
case 0:
System.out.println("第一阶段,买食材完成啦!总共参与人数:" +
registeredParties);
return false;
case 1:
System.out.println("第二阶段,炒菜完成啦!总共参与人数:" +
registeredParties);
return false;
case 2:
System.out.println("第三阶段,吃完饭啦!总共参与人数:" +
registeredParties);
return false;
default:
return true;
}
}
}
默认的 onAdvance 方法实现是当注册的任务注销到 0 的时候返回 true,源码如下:
protected boolean onAdvance(int phase, int registeredParties) {
return registeredParties == 0;
}
phaser 的 forceTermination 方法其实也可以强制终止阶段器。还是以上述例子,小美干完第一阶段的事情后,觉得这样等来等去太费时间了,所以终止这个阶段器。
package com.chenpi;
import java.util.concurrent.Phaser;
public class XiaoMeiTask implements Runnable {
private Phaser phaser;
public XiaoMeiTask(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +
" 买好白菜了...");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() +
" 炒好白菜了...");
phaser.forceTermination();
System.out.println(Thread.currentThread().getName() +
" 吃饱了...");
phaser.arriveAndAwaitAdvance();
}
}
再执行程序,结果第一阶段后,后续的任务执行就不再按阶段来了,结果如下:
陈皮 买好猪肉了...
小美 买好白菜了...
小雪 买好鲍鱼了...
小雪 调用了onAdvance方法
第一阶段,买食材完成啦!总共参与人数:3
小雪 炒好鲍鱼了...
小美 炒好白菜了...
小美 吃饱了...
小雪 吃饱了...
陈皮 炒好猪肉了...
陈皮 吃饱了...
Phaser 支持层次结构,可以通过构造函数 Phaser(Phaser parent) 和 Phaser(Phaser parent, int parties) 创建一个树形结构的阶段器。这样可以减轻在一个 Phaser 上注册过多的任务而导致的竞争,从而提升吞吐量,缺点是会增加单个操作的开销。
你会发现 arriveAndAwaitAdvance 方法没有抛出 InterruptedException,即使当前线程被中断,这个方法也不会返回,而是继续等待。所以如果希望在等待时可中断,或者可超时,则可以选择使用以下方法:
awaitAdvanceInterruptibly(int phase)
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
下面罗列一些其他常用方法:
int getArrivedParties()
Phaser getParent()
final int getPhase()
int getRegisteredParties()
int getUnarrivedParties()
boolean isTerminated()
|