Java CompletableFuture 使用测试
1 创建 CompletableFuture 对象
CompletableFuture 提供了四个静态方法用来创建 CompletableFuture 对象:
方法 | 返回结果 | 线程池 | 参数 |
---|
public static CompletableFuture<Void> runAsync(Runnable runnable) | 无返回结果 | 采用内部forkjoin线程池 | Runnable 接口。 | public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) | 无返回结果 | 自定义线程池 | Runnable 接口。 | public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) | 有返回结果 | 采用内部forkjoin线程池 | Supplier<U> 函数式接口。 | public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) | 有返回结果 | 自定义线程池 | Supplier<U> 函数式接口。 |
1.1 CompletableFuture.runAsync
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class UnitTest {
ThreadPoolExecutor executor = new ThreadPoolExecutor(15,
20,
10,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("a");
}, executor);
}
}
1.2 CompletableFuture.supplyAsync
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class UnitTest {
ThreadPoolExecutor executor = new ThreadPoolExecutor(15,
20,
10,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "a";
}, executor);
}
}
函数式接口名称 | 作用 | 主要方法 | 解释 |
---|
Function<T, R> | 方法 | R apply(T t) | 接受一个T类型参数,返回R类型 | Consumer<T> | 消费者 | void accept(T t) | 接受一个T类型参数,没有返回值。 | Supplier<T> | 生产者 | T get() | 不接受参数,但是提供一个返回值。 | Predicate<T> | 判断 | boolean test(T t) | 接受一个T类型参数,返回一个boolean类型值。 |
2 获取 CompletableFuture 结果
2.1 自动获取结果
获取结果方法 | 解释 |
---|
public T get() | 返回计算的结果,抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理。 | public T get(long timeout, TimeUnit unit) | 返回计算的结果,设置超时时间,抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理。 | public T getNow(T valueIfAbsent) | 如果已完成则返回结果或者抛出异常,否则返回给定的valueIfAbsent的值。 | public T join() | 返回计算的结果或者抛出一个unchecked异常,CancellationException,CompletionException 无需要用户手动处理。 |
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class UnitTest {
ThreadPoolExecutor executor = new ThreadPoolExecutor(15,
20,
10,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
}, executor).thenApply(x -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x + "b";
}).thenCompose(e -> CompletableFuture.supplyAsync(e::toUpperCase));
System.out.println(future.get());
System.out.println(future.join());
System.out.println(future.getNow("C"));
System.out.println(future.get(1, TimeUnit.SECONDS));
}
}
AB
AB
AB
AB
2.2 主动获取结果
获取结果方法 | 解释 |
---|
public boolean complete(T value) | 当调用CompletableFuture.get() 被阻塞的时候,那么这个方法就是结束阻塞,并且get()获取当前设置的value。 | public boolean completeExceptionally(Throwable ex) | 当调用CompletableFuture.get() 被阻塞的时候,那么这个方法就是结束阻塞,并且抛出异常。 |
future.complete("直接使用这个结果")
future.completeExceptionally(new Throwable("运行超时"))
3 CompletableFuture 异步回调
3.1 thenAccept…
异步回调 | 参数 | 返回值 | 线程池 | 功能 |
---|
public CompletionStage<Void> thenAccept(Consumer<? super T> action) | 上一个任务的结果 | 无 | 采用内部forkjoin线程池 | 上一个任务执行完成执行,上一个任务的结果作为参数,无返回结果。 | public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action) | 上一个任务的结果 | 无 | 采用内部forkjoin线程池 | 上一个任务执行完成执行,上一个任务的结果作为参数,无返回结果。 | public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) | 上一个任务的结果 | 无 | 自定义线程池 | 上一个任务执行完成执行,上一个任务的结果作为参数,无返回结果。 |
上一个任务需要有返回值
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
}).thenAcceptAsync(x -> {
System.out.println("thenRun");
}).thenApplyAsync(x -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x + "c";
}).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
System.out.println(future.join());
}
3.2 thenRun…
异步回调 | 参数 | 返回值 | 线程池 | 功能 |
---|
public CompletionStage<Void> thenRun(Runnable action) | 无 | 无 | 采用内部forkjoin线程池 | 上一个任务执行完成后执行,不需要参数,无返回结果。 | public CompletionStage<Void> thenRunAsync(Runnable action) | 无 | 无 | 采用内部forkjoin线程池 | 上一个任务执行完成后执行,不需要参数,无返回结果。 | public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor) | 无 | 无 | 自定义线程池 | 上一个任务执行完成后执行,不需要参数,无返回结果。 |
上一个任务有无返回值均可
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
}).thenRun(() -> {
System.out.println("thenRun");
}).thenApplyAsync(x -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x + "c";
}).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
System.out.println(future.join());
}
3.3 thenApply…
异步回调 | 参数 | 返回值 | 线程池 | 功能 |
---|
public <U> CompletableFuture<U> henApply(Function<? super T,? extends U> fn) | 上一个任务的结果 | 有 | 采用内部forkjoin线程池 | 上一个任务执行完成执行,上一个任务的结果作为参数,有返回结果。 | public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) | 上一个任务的结果 | 有 | 采用内部forkjoin线程池 | 上一个任务执行完成执行,上一个任务的结果作为参数,有返回结果。 | public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) | 上一个任务的结果 | 有 | 自定义线程池 | 上一个任务执行完成执行,上一个任务的结果作为参数,有返回结果。 |
上一个任务需要有返回值
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
}).thenApplyAsync(x -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x + "c";
}).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
System.out.println(future.join());
}
3.4 handle
异步回调 | 参数 | 返回值 | 线程池 | 功能 |
---|
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) | CompletableFuture 全部完成后的结果 | 有 | 采用内部forkjoin线程池 | CompletableFuture 全部完成或者抛出异常后,处理结果。 | public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn) | CompletableFuture 全部完成后的结果 | 有 | 采用内部forkjoin线程池 | CompletableFuture 全部完成或者抛出异常后,处理结果。 | public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor) | CompletableFuture 全部完成后的结果 | 有 | 自定义线程池 | CompletableFuture 全部完成或者抛出异常后,处理结果。 |
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
}).exceptionally((thread) -> {
System.out.println(thread.getMessage());
return "ERROR";
}).thenApplyAsync(x -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x + "c";
}).handleAsync((a, b) -> {
System.out.println(a + "handle" + b);
return "K";
});
System.out.println(future.join());
}
3.5 whenComplete
异步回调 | 参数 | 返回值 | 线程池 | 功能 |
---|
public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action) | CompletableFuture 全部完成后的结果 | 无 | 采用内部forkjoin线程池 | CompletableFuture 全部完成或者抛出异常后,处理结果。 | public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) | CompletableFuture 全部完成后的结果 | 无 | 采用内部forkjoin线程池 | CompletableFuture 全部完成或者抛出异常后,处理结果。 | public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) | CompletableFuture 全部完成后的结果 | 无 | 自定义线程池 | CompletableFuture 全部完成或者抛出异常后,处理结果。 |
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
}).exceptionally((thread) -> {
System.out.println(thread.getMessage());
return "ERROR";
}).thenApplyAsync(x -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x + "c";
}).whenComplete((a, b) -> {
System.out.println(a + "handle" + b);
});
System.out.println(future.join());
}
3.6 exceptionally
exceptionally 方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中。
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
}).exceptionally((thread) -> {
System.out.println(thread.getMessage());
return "ERROR";
}).thenApplyAsync(x -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x + "c";
}).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
System.out.println(future.join());
}
4 CompletableFuture 组合处理
4.1 thenCompose
thenCompose 方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例,空则返回null,否则返回新的CompletableFuture实例。
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
}).exceptionally((thread) -> {
System.out.println(thread.getMessage());
return "ERROR";
}).thenApplyAsync(x -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x + "c";
}).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
System.out.println(future.join());
}
4.2 thenCombine / thenAcceptBoth / runAfterBoth
组合处理 | 参数 | 返回值 | 前置条件 | 功能 |
---|
public <U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | 前两个的 CompletableFuture 的结果 | 有返回值 | 前两个的 CompletableFuture 都完成后 | 两个CompletableFuture 组合起来,处理前两个 CompletableFuture 完成后的结果。 | public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) | 前两个的 CompletableFuture 的结果 | 无返回值 | 前两个的 CompletableFuture 都完成后 | 两个CompletableFuture 组合起来,处理前两个 CompletableFuture 完成后的结果。 | public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) | 无参数 | 无返回值 | 前两个的 CompletableFuture 都完成后 | 两个CompletableFuture 组合起来,处理前两个 CompletableFuture 完成后的后修操作。 |
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "b";
});
CompletableFuture<String> future3 = future1.thenCombine(future2, (a, b) -> {
return a + b + "c";
});
CompletableFuture<Void> future4 = future1.thenAcceptBoth(future2, (a, b) -> {
System.out.println(a + b + "c");
});
CompletableFuture<Void> future5 = future1.runAfterBoth(future2, () -> {
System.out.println("c");
});
}
4.3 applyToEither / acceptEither / runAfterEither
组合处理 | 参数 | 返回值 | 前置条件 | 功能 |
---|
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) | 前两个的 CompletableFuture 中的一个完成的结果 | 有返回值 | 前两个的 CompletableFuture 有一个完成后 | 两个CompletableFuture 组合起来,处理前两个的 CompletableFuture 中的一个完成后的结果。 | public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) | 前两个的 CompletableFuture 中的一个完成的结果 | 无返回值 | 前两个的 CompletableFuture 有一个完成后 | 两个CompletableFuture 组合起来,处理前两个的 CompletableFuture 中的一个完成后的结果。 | public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) | 无 | 无返回值 | 前两个的 CompletableFuture 有一个完成后 | 两个CompletableFuture 组合起来,处理前两个的 CompletableFuture 中的一个完成后的逻辑。 |
4.4 allOf / anyOf
组合处理 | 参数 | 返回值 | 前置条件 | 能获取完成结果 | 功能 |
---|
public static CompletableFuture<Void> allOf(CompletableFuture<?>… cfs) | CompletableFuture 任务数组 | 无 | CompletableFuture 任务数组 全部完成或者异常 | 不能 | 处理 CompletableFuture 任务数组 全部完成或者异常 | public static CompletableFuture<Object> anyOf(CompletableFuture<?>… cfs) | CompletableFuture 任务数组 | 无 | CompletableFuture 任务数组 任一完成或者异常 | 能 | 处理 CompletableFuture 任务数组 全部完成或者异常 |
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "b";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "c";
}).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
CompletableFuture future4 = CompletableFuture.allOf(future1, future2, future3).whenCompleteAsync((a, b) -> {
if (null != b) {
System.out.println("出现异常");
} else {
System.out.println("全部 CompletableFuture 都完成");
}
});
CompletableFuture future5 = CompletableFuture.anyOf(future1, future2, future3).whenCompleteAsync((a, b) -> {
if (null != b) {
System.out.println("出现异常");
} else {
System.out.println("最先完成的 CompletableFuture 任务的结果为 " + a);
}
});
}
|