IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Java CompletableFuture 使用测试 -> 正文阅读

[Java知识库]Java CompletableFuture 使用测试

1 创建 CompletableFuture 对象

CompletableFuture 提供了四个静态方法用来创建 CompletableFuture 对象:

方法返回结果线程池参数
public static CompletableFuture<Void> runAsync(Runnable runnable)无返回结果采用内部forkjoin线程池Runnable 接口。
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)无返回结果自定义线程池Runnable 接口。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)有返回结果采用内部forkjoin线程池Supplier<U> 函数式接口。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)有返回结果自定义线程池Supplier<U> 函数式接口。

1.1 CompletableFuture.runAsync

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

public class UnitTest {

    ThreadPoolExecutor executor = new ThreadPoolExecutor(15, // 核心线程池大小
            20, // 最大线程池大小
            10, // 线程最大空闲时间
            TimeUnit.MILLISECONDS, // 线程最大空闲时间单位
            new ArrayBlockingQueue<>(100), // 线程等待队列
            Executors.defaultThreadFactory(), // 线程创建工厂
            new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略

    @Test
    public void testCompletableFuture() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("a");
        }, executor);
    }

}

1.2 CompletableFuture.supplyAsync

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

public class UnitTest {

    ThreadPoolExecutor executor = new ThreadPoolExecutor(15, // 核心线程池大小
            20, // 最大线程池大小
            10, // 线程最大空闲时间
            TimeUnit.MILLISECONDS, // 线程最大空闲时间单位
            new ArrayBlockingQueue<>(100), // 线程等待队列
            Executors.defaultThreadFactory(), // 线程创建工厂
            new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略

    @Test
    public void testCompletableFuture() throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "a";
        }, executor);
    }

}
函数式接口名称作用主要方法解释
Function<T, R>方法R apply(T t)接受一个T类型参数,返回R类型
Consumer<T>消费者void accept(T t)接受一个T类型参数,没有返回值。
Supplier<T>生产者T get()不接受参数,但是提供一个返回值。
Predicate<T>判断boolean test(T t)接受一个T类型参数,返回一个boolean类型值。

2 获取 CompletableFuture 结果

2.1 自动获取结果

获取结果方法解释
public T get()返回计算的结果,抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理。
public T get(long timeout, TimeUnit unit)返回计算的结果,设置超时时间,抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理。
public T getNow(T valueIfAbsent)如果已完成则返回结果或者抛出异常,否则返回给定的valueIfAbsent的值。
public T join()返回计算的结果或者抛出一个unchecked异常,CancellationException,CompletionException 无需要用户手动处理。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

public class UnitTest {

    ThreadPoolExecutor executor = new ThreadPoolExecutor(15, // 核心线程池大小
            20, // 最大线程池大小
            10, // 线程最大空闲时间
            TimeUnit.MILLISECONDS, // 线程最大空闲时间单位
            new ArrayBlockingQueue<>(100), // 线程等待队列
            Executors.defaultThreadFactory(), // 线程创建工厂
            new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略

    @Test
    public void testCompletableFuture() throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "a";
        }, executor).thenApply(x -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return x + "b";
        }).thenCompose(e -> CompletableFuture.supplyAsync(e::toUpperCase));
        System.out.println(future.get());
        System.out.println(future.join());
        System.out.println(future.getNow("C"));
        System.out.println(future.get(1, TimeUnit.SECONDS));
    }

}
AB
AB
AB
AB

2.2 主动获取结果

获取结果方法解释
public boolean complete(T value)当调用CompletableFuture.get() 被阻塞的时候,那么这个方法就是结束阻塞,并且get()获取当前设置的value。
public boolean completeExceptionally(Throwable ex)当调用CompletableFuture.get() 被阻塞的时候,那么这个方法就是结束阻塞,并且抛出异常。
future.complete("直接使用这个结果")
future.completeExceptionally(new Throwable("运行超时"))

3 CompletableFuture 异步回调

3.1 thenAccept…

异步回调参数返回值线程池功能
public CompletionStage<Void> thenAccept(Consumer<? super T> action)上一个任务的结果采用内部forkjoin线程池上一个任务执行完成执行,上一个任务的结果作为参数,无返回结果。
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action)上一个任务的结果采用内部forkjoin线程池上一个任务执行完成执行,上一个任务的结果作为参数,无返回结果。
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)上一个任务的结果自定义线程池上一个任务执行完成执行,上一个任务的结果作为参数,无返回结果。
上一个任务需要有返回值
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    }).thenAcceptAsync(x -> {
        System.out.println("thenRun");
    }).thenApplyAsync(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return x + "c";
    }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
    System.out.println(future.join());
}

3.2 thenRun…

异步回调参数返回值线程池功能
public CompletionStage<Void> thenRun(Runnable action)采用内部forkjoin线程池上一个任务执行完成后执行,不需要参数,无返回结果。
public CompletionStage<Void> thenRunAsync(Runnable action)采用内部forkjoin线程池上一个任务执行完成后执行,不需要参数,无返回结果。
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor)自定义线程池上一个任务执行完成后执行,不需要参数,无返回结果。
上一个任务有无返回值均可
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    }).thenRun(() -> {
        System.out.println("thenRun");
    }).thenApplyAsync(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return x + "c";
    }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
    System.out.println(future.join());
}

3.3 thenApply…

异步回调参数返回值线程池功能
public <U> CompletableFuture<U> henApply(Function<? super T,? extends U> fn)上一个任务的结果采用内部forkjoin线程池上一个任务执行完成执行,上一个任务的结果作为参数,有返回结果。
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)上一个任务的结果采用内部forkjoin线程池上一个任务执行完成执行,上一个任务的结果作为参数,有返回结果。
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)上一个任务的结果自定义线程池上一个任务执行完成执行,上一个任务的结果作为参数,有返回结果。
上一个任务需要有返回值
 @Test
 public void testCompletableFuture() throws Exception {
     CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return "a";
     }).thenApplyAsync(x -> {
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return x + "c";
     }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
     System.out.println(future.join());
 }

3.4 handle

异步回调参数返回值线程池功能
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)CompletableFuture 全部完成后的结果采用内部forkjoin线程池CompletableFuture 全部完成或者抛出异常后,处理结果。
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)CompletableFuture 全部完成后的结果采用内部forkjoin线程池CompletableFuture 全部完成或者抛出异常后,处理结果。
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)CompletableFuture 全部完成后的结果自定义线程池CompletableFuture 全部完成或者抛出异常后,处理结果。
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    }).exceptionally((thread) -> {
        System.out.println(thread.getMessage());
        return "ERROR";
    }).thenApplyAsync(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return x + "c";
    }).handleAsync((a, b) -> {
        System.out.println(a + "handle" + b);
        return "K";
    });
    System.out.println(future.join());
}

3.5 whenComplete

异步回调参数返回值线程池功能
public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action)CompletableFuture 全部完成后的结果采用内部forkjoin线程池CompletableFuture 全部完成或者抛出异常后,处理结果。
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)CompletableFuture 全部完成后的结果采用内部forkjoin线程池CompletableFuture 全部完成或者抛出异常后,处理结果。
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)CompletableFuture 全部完成后的结果自定义线程池CompletableFuture 全部完成或者抛出异常后,处理结果。
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    }).exceptionally((thread) -> {
        System.out.println(thread.getMessage());
        return "ERROR";
    }).thenApplyAsync(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return x + "c";
    }).whenComplete((a, b) -> {
        System.out.println(a + "handle" + b);
    });
    System.out.println(future.join());
}

3.6 exceptionally

exceptionally 方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中。

@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    }).exceptionally((thread) -> {
        System.out.println(thread.getMessage());
        return "ERROR";
    }).thenApplyAsync(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return x + "c";
    }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
    System.out.println(future.join());
}

4 CompletableFuture 组合处理

4.1 thenCompose

thenCompose 方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例,空则返回null,否则返回新的CompletableFuture实例。

@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    }).exceptionally((thread) -> {
        System.out.println(thread.getMessage());
        return "ERROR";
    }).thenApplyAsync(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return x + "c";
    }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
    System.out.println(future.join());
}

4.2 thenCombine / thenAcceptBoth / runAfterBoth

组合处理参数返回值前置条件功能
public <U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)前两个的 CompletableFuture 的结果有返回值前两个的 CompletableFuture 都完成后两个CompletableFuture 组合起来,处理前两个 CompletableFuture 完成后的结果。
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)前两个的 CompletableFuture 的结果无返回值前两个的 CompletableFuture 都完成后两个CompletableFuture 组合起来,处理前两个 CompletableFuture 完成后的结果。
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)无参数无返回值前两个的 CompletableFuture 都完成后两个CompletableFuture 组合起来,处理前两个 CompletableFuture 完成后的后修操作。
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "b";
    });
    // thenCombine
    CompletableFuture<String> future3 = future1.thenCombine(future2, (a, b) -> {
        return a + b + "c";
    });
    // thenAcceptBoth
    CompletableFuture<Void> future4 = future1.thenAcceptBoth(future2, (a, b) -> {
        System.out.println(a + b + "c");
    });
    // runAfterBoth
    CompletableFuture<Void> future5 = future1.runAfterBoth(future2, () -> {
        System.out.println("c");
    });
}

4.3 applyToEither / acceptEither / runAfterEither

组合处理参数返回值前置条件功能
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)前两个的 CompletableFuture 中的一个完成的结果有返回值前两个的 CompletableFuture 有一个完成后两个CompletableFuture 组合起来,处理前两个的 CompletableFuture 中的一个完成后的结果。
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)前两个的 CompletableFuture 中的一个完成的结果无返回值前两个的 CompletableFuture 有一个完成后两个CompletableFuture 组合起来,处理前两个的 CompletableFuture 中的一个完成后的结果。
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)无返回值前两个的 CompletableFuture 有一个完成后两个CompletableFuture 组合起来,处理前两个的 CompletableFuture 中的一个完成后的逻辑。

4.4 allOf / anyOf

组合处理参数返回值前置条件能获取完成结果功能
public static CompletableFuture<Void> allOf(CompletableFuture<?>… cfs)CompletableFuture 任务数组CompletableFuture 任务数组 全部完成或者异常不能处理 CompletableFuture 任务数组 全部完成或者异常
public static CompletableFuture<Object> anyOf(CompletableFuture<?>… cfs)CompletableFuture 任务数组CompletableFuture 任务数组 任一完成或者异常处理 CompletableFuture 任务数组 全部完成或者异常
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "b";
    });
    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "c";
    }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
    // allOf
    CompletableFuture future4 = CompletableFuture.allOf(future1, future2, future3).whenCompleteAsync((a, b) -> {
        if (null != b) {
            System.out.println("出现异常");
        } else {
            System.out.println("全部 CompletableFuture 都完成");
        }
    });
    // anyOf
    CompletableFuture future5 = CompletableFuture.anyOf(future1, future2, future3).whenCompleteAsync((a, b) -> {
        if (null != b) {
            System.out.println("出现异常");
        } else {
            System.out.println("最先完成的 CompletableFuture 任务的结果为 " + a);
        }
    });
}
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-04 15:22:29  更:2022-03-04 15:24:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 10:58:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码