IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 并发编程——NWU_LK -> 正文阅读

[Java知识库]并发编程——NWU_LK

并发编程

并发和并行的区别

并发:CPU单核,多个线程操作同一资源,线程间切换快,同一时间只有一个线程执行
并行:CPU多核,多个线程同时执行。

Callable简介

callable和runable的区别就是其可以有返回值,可以抛出异常。其次,callable的启动方法是call方法,runable的启动方法是run方法。

实现了Callable接口的类,若要用Thread类启动,需要用FutureTask去适配,因为FutureTask实现了Runable接口。如下所示

class MyThread implements Callable<String>{
    @Override
    public String call() throws Exception {
        return "callable";
    }
}
public static void main(String args[]){
	FutureTask futureTask=new FutureTask(new MyThread());
    new Thread(futureTask).start();
    //因为结果需要等待,所以该方法可能会阻塞
    String s = (String)futureTask.get();
}

若多个线程同时将FutureTask作为参数启动时,只有第一个线程会执行call方法,执行完毕后将返回值保存在futureTask的outcome中,其他的线程会直接从outcome中获取返回值。

Lcok锁(java.util.concurrent.locks.Lock)

ReentrantLock是该接口的实现类。该类可以创建公平锁和非公平锁,默认为公平锁。公平锁就是先来后到,所有线程都有机会获得锁,不会饿死,由于所有线程都会经历阻塞态,因此唤醒阻塞线程的开销会很大;非公平锁就是所有的线程拼运气,谁运气好,谁就获取到锁,可以减少CPU唤醒线程的开销,整体的吞吐效率会高,但缺点是可能会有线程长时间甚至永远获取不到锁,导致饿死。

  • lock():获取锁,不可以被打断
  • unlock():释放锁
  • lockInterruptibly():获取可打断锁,阻塞时可以被其他线程打断
  • tryLock(Time):尝试获取锁,指定时间内获取不到的话结束阻塞,阻塞状态可以被打断
synchronized 和 lock锁的区别
  1. synchronized 是java内置的关键字,lock是一个java类

  2. synchronized 无法获取锁的状态,lock 可以判断是否获得了锁

  3. synchronized 会自动释放锁,lock不会自动释放锁,需要自己手动释放锁,如果不释放锁,会造成死锁

  4. synchronized 线程一(获得锁,然后锁阻塞了),线程二(等待,然后还是傻傻的等待),lock锁就不一定会等待下去

  5. synchronized 可重入锁,不可以被中断,非公平,lock 可重入锁,可判断锁,可以自行设置是否为公平锁

  6. synchronized 适合少量的代码同步问题,lock适合大量的代码同步问题

使用Lock实现生产者消费者模型
  1. 编写资源类
class Data{
    private Lock lock=new ReentrantLock();
    Condition condition = lock.newCondition();
    private int number=1;
    public void increment() throws InterruptedException {
        lock.lock();
        while (number!=0){
            condition.await();
        }
        number++;
        System.out.print("加法,number="+number);
        condition.signalAll();
        lock.unlock();
    }
    public void decrement() throws InterruptedException {
        lock.lock();
        while (number==0){
            condition.await();
        }
        number--;
        System.out.print("减法,number="+number);
        condition.signalAll();
        lock.unlock();
    }
}

此处资源类中的两个同步方法中,不用if用while的原因可以通过一个例子来理解。拿两个加法线程A、B来说,比如A先执行,执行时调用了wait方法,那它会等待,此时会释放锁,那么线程B获得锁并且也会执行wait方法,两个加线程一起等待被唤醒。此时减线程中的某一个线程执行完毕并且唤醒了这俩加线程,那么这俩加线程不会一起执行,其中A获取了锁并且加1,执行完毕之后B再执行。如果是if的话,那么A修改完num后,B不会再去判断num的值,直接会给num+1。如果是while的话,A执行完之后,B还会去判断num的值,因此就不会执行。

  1. 编写生产者和消费者
public static void main(String[] args) {
        Data2 data=new Data2();
        new Thread(()->{
            for (int i=1;i<=10;i++){
                try {
                    data.increment();
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"生产者1").start();
        new Thread(()->{
            for (int i=1;i<=10;i++){
                try {
                    data.increment();
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"生产者2").start();
        new Thread(()->{
            for (int i=1;i<=10;i++){
                try {
                    data.decrement();
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"消费者1").start();
        new Thread(()->{
            for (int i=1;i<=10;i++){
                try {
                    data.decrement();
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"消费者2").start();
    }
使用传统方法实现顺序执行线程
  1. 创建打印类
class MyPrint{
    private int flag;
    private int loopNumber;

    public MyPrint(int flag, int loopNumber) {
        this.flag = flag;
        this.loopNumber = loopNumber;
    }

    public void print(String str, int flag,int nextFlag){
        for (int i=0;i<loopNumber;i++){
            synchronized (this){
                while (flag!=this.flag){
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.print(str);
                this.flag=nextFlag;
                this.notifyAll();
            }
        }
    }
}
  1. 测试
public static void main(String[] args) {
        MyPrint waitSet=new MyPrint(1,10);
        new Thread(()->{
            waitSet.print("a",1,2);
        },"t1").start();
        new Thread(()->{
            waitSet.print("b",2,3);
        },"t2").start();
        new Thread(()->{
            waitSet.print("c",3,1);
        },"t3").start();
    }
使用Lock实现顺序执行线程
  1. 创建资源类
class Data{
    private int number=1;
    private Lock lock=new ReentrantLock();
    Condition condition1 = lock.newCondition();
    Condition condition2 = lock.newCondition();
    Condition condition3 = lock.newCondition();

    public void printA() throws InterruptedException {
        lock.lock();
        while (number!=1){
            condition1.await();
        }
        number++;
        System.out.println("printA:"+number);
        condition2.signal();
        lock.unlock();
    }
    public void printB() throws InterruptedException {
        lock.lock();
        while (number!=2){
            condition2.await();
        }
        number++;
        System.out.println("printB:"+number);
        condition3.signal();
        lock.unlock();
    }
    public void printC() throws InterruptedException {
        lock.lock();
        while (number!=3){
            condition3.await();
        }
        number=1;
        System.out.println("printB:"+number);
        condition1.signal();
        lock.unlock();
    }
}
  1. 测试
    @Test
    public void test(){
        Data data=new Data();
        new Thread(()->{
            try {
                for (int i=0;i<5;i++){
                    data.printA();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(()->{
            try {
                for (int i=0;i<5;i++){
                    data.printB();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(()->{
            try {
                for (int i=0;i<5;i++){
                    data.printC();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

synchronized理解锁

当同步方法是普通方法时,锁定是对象,当同步方法是静态方法时,锁的是类。
因此,一个对象的两个普通同步方法不会同时执行,先获得锁的执行完毕后另一个才会执行。详情可以参考8锁现象。

8锁现象(狂神说)

JAVA内存模型(JMM)

java内存模型定义了多线程读写共享数据时,对数据的可见性、有序性和原子性的规则和保障。

synchronized: 既可以保证可见性,也可以保证原子性,因此,线程中有同步方法时,那么可以防止去工作缓存中获取变量的值,保证了可见性

volatile: 它可以用来修饰成员变量和静态变量,可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作volatile变量都是直接操作主存。保证可见性,但不保证原子性,适用于一个写线程,多个读线程的情况。其次,volatile可以避免指令重排。因此用双重锁实现单例模式时,可以在静态对象上加一个volatile关键字,这样就不会造成指令重排(因为初始化的语句和给静态变量赋值的语句可能会指令重排,造成未初始化完就把对象返回了),使得返回的对象可能是完整的对象。

保证可见性的原理:在加了volatile修饰的变量的读操作之前会加读屏障,使得之后的代码中的变量都是从主存中读取的,并且不会发生指令重排;写操作之后会加写屏障,使得写屏障之前的变量都是要写入主存的,并且不会发生指令重排

happens-before规则: 规定了哪些写操作对于其他读操作可见,他是可见性与有序性的一套规则总结。

  • 一个线程对volatile变量的写对接下来要访问该变量的读操作是可见的
  • 一个线程解锁之前对变量进行了写,该变量对接下来加锁(与解锁的对象为同一对象)的其他线程是可见的。
  • 线程t2对变量进行了写操作,线程t1打断线程t2,对于其他得知线程t2被打断的线程,可以访问当线程t2修改的那个变量
  • 线程start前对变量的写对该线程可见

无锁并发

CAS: 体现了一种乐观锁的思想,当fi中的旧值和变量值相等时,会将变量修改为结果值

while(true){
	int 旧值=共享变量
	int 结果=旧值+1
	if(compareAndSwap(旧值,结果)){

	}
}

缺点:

  • 循环会耗时
  • 一次性只能保证一个共享变量的原子性
  • ABA问题:即可能另一个线程将变量修改后又修改了回来,那么该操作对于本线程是不可见的

获取共享变量时,为了保证变量的可见性,需要使用volatile修饰,结合volatile和CAS可以实现无锁并发,适用于竞争不激烈,多核CPU的场景下。JUC中的原子类默认实现了自旋锁。

原子整数:

  • AtomicInteger:原子整数类
  1. incrementAndGet:自增,等价++i
  2. getAndIncrement:自增,等价于i++
  3. addAndGet(num):自增num
  4. get:得到值
  5. decrementAndGet:自增,等价–i
  6. getAndDecrement:自增,等价于i–
  7. getAndUpdate:修改
  • AtomicLong
  • AtomicBoolean

原子引用:保证引用的安全性

  • AtomicReference:原子引用类,将引用传入进去,存在ABA问题
  • AtomicMarkableReference:避免了ABA问题,用一个boolean值记录是否被更改过
  • AtomicStampedReference:避免了ABA问题,构造方法加入了版本号,每有一个线程修改了原子引用后都会修改版本号

原子数组:保证数组每个元素的安全性

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

字段更新器:保证对象的属性是线程安全的

  • AtomicReferenceFiledUpdater
  • AtomicIntegerFiledUpdater
  • AtomicLongFiledUpdater

对应的属性必须配合volatile

class Student{
    volatile String name;
    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + '\'' +
                '}';
    }
}
public static void main(String[] args) {
        Student student=new Student();
        AtomicReferenceFieldUpdater updater=AtomicReferenceFieldUpdater.newUpdater(Student.class,String.class,"name");
        System.out.println(updater.compareAndSet(student,null,"asas"));
        System.out.println(student);
    }

累加器

  • LongAdder
  • DoubleAdder

AQS(AbstractQueuedSynchronizer)

实现的是阻塞式的锁,其中的state属性表示资源的状态,判断是否获取了锁。独占模式表示只允许一个线程访问,共享模式允许多个线程访问。提供了FIFO等待队列,类似于Monitor的EntryList;条件变量来实现等待、唤醒机制,支持多个条件变量,类似于Monitor的WaitSet。

  • 获取锁:tryAcquire(arg):返回boolean,获取不到锁时可以入队阻塞,阻塞原理是park和unpark
  • 释放锁:tryRelease(arg):让阻塞线程恢复运行

ReentrantLock原理:内部调用AQS的方法实现叫
在这里插入图片描述

Lock锁不可打断原理:即其他线程将其打断后,仍然会驻留在AQS队列中,等获得锁以后方能继续运行。而可打断锁的原理是被打断后会抛出异常

非公平锁原理:新来个线程直接抢占锁,不会检查AQS队列是否有等待的线程节点;而公平锁的话会检查AQS队列,如果有节点在排序的话

条件变量原理:创建一个ConditionObject对象,将要等待的线程以链表的形式接起来,然后线程释放锁(重入锁也直接减为0),接着AQS中的等待队列中的第一个线程竞争锁。当先线程调用signal唤醒线程时会将ConditionObject的第一个线程唤醒,调用signalAll时会将所有的线程唤醒,并将唤醒的线程加入到AQS的等待队列中

集合不安全

ArrayList和LinkedList是不安全的,多个线程同时添加的话会出异常(ConcurrentModificationException)。为了解决这种现象,可以有以下几种解决办法

  1. 用Vector替代ArrayList和LinkedList
  2. 用Collections的方法synchronizedList生成一个安全的list
  3. 用CopyOnWriteArrayList类替代ArrayList,与vector相比只有写的方法加了锁

HashSet和TreeSet也是不安全的,和ArrayList一样,也可以解决该问题。

  1. 用Collections的方法synchronizedSet生成一个安全的Set
  2. 用CopyOnWriteSet类替代HashSet和TreeSet

HashMap和TreeMap也是不安全的,解决方案如下。

  1. 用Collections的方法synchronizedMap生成一个安全的map
  2. 用ConcurrentHashMap替代HashMap和TreeMap
三大常用辅助类
  • CountDownLatch(减法计数器)
    相当于一个计数器的功能,创建一个CountDownLatch对象后,构造函数要求赋予一个初值(数字),然后调用CountDownLatch的countDown()方法实现减1的操作。countDown还有await()方法,该方法用来判断countDown的值是否变为了0,如果变为0才会执行await()下面的代码。

    CountDownLatch count=new CountDownLatch(6);
    for (int i=0;i<6;i++){
        new Thread(()->{
            count.countDown();
        }).start();
    }
    count.await();
    
  • CyclicBarrier(加法计数器)
    相对于一个加法计数器,每调用一次await()方法,内部就会+1,直到加到设置的阈值,那么就会执行构造函数中的方法。

    CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
        System.out.println("加到7了,执行");
    });
    for (int i=0;i<7;i++){
        new Thread(()->{
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();
    }
    
  • Semaphore(限制计数器)
    同一时间只能让有限个线程获得资源,以下的例子就是在同一时间内只能有三个线程执行。在访问高峰时,让请求线程阻塞,高峰过去再释放即可,只适合限制单机限制线程数。
    在这里插入图片描述

    Semaphore semaphore=new Semaphore(3);
    for (int i=1;i<=6;i++){
        new Thread(()->{
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName());
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                semaphore.release();
            }
        },String.valueOf(i)).start();
    }
    
ReadWriteLock读写锁

读写锁详情

BlockingDeque阻塞队列

队列满了之后,写操作会阻塞,队列空的话,读操作会阻塞。
在这里插入图片描述

方式抛出异常不抛异常阻塞等待超时等待
入队add(e)offer(e)put(e)offer(e,time,TimeUnit)
出队remove(e)poll(e)take(e)poll(e,time,TimeUnit)
查看队首元素element()peek()--

SynchronousQueue

这是BlockingDeque的一个实现类,该类不存储任何元素,相当于只能在存的时候就直接取,否则会造成阻塞。

BlockingQueue<String> queue=new SynchronousQueue<>();
new Thread(()->{
    try {
        queue.put("1");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();
new Thread(()->{
    try {
        System.out.println(queue.take());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();
线程池

线程池的好处:

  • 降低资源你的消耗
  • 提高响应的速度
  • 方便管理
  • 线程复用、可以控制最大并发数、管理线程

创建线程池的方法:

  1. Executors.newSingleThreadExecutor():创建只有一个线程的线程池,阻塞队列为LinkedBlockingQueue,无界队列。即时当前线程出异常,线程池会创建一个新的线程接着执行剩余任务
  2. Executors.newFixedThreadPool(5):创建固定数量线程的线程池
  3. Executors.newCachedThreadPool():创建任意的线程池,阻塞队列使用SynchronousQueue,当任务执行完毕后,空闲一分钟释放线程

这三个方法最终都调用了ThreadPoolExecutor(…)方法,如下

public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize, 
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

参数介绍

参数名称说明
corePoolSize核心线程数,核心线程会一直存活,即使没有任务需要执行
maximumPoolSize线程池中能拥有最多线程数
keepAliveTime空闲线程存活时间
unit时间单位
workQueue用于缓存任务的阻塞队列,当任务数量大于核心线程数加阻塞队列大小时会创建新的线程
threadFactory线程池创建线程使用的工厂
handler线程池对拒绝任务的处理策略 ,比如当任务数大于最大线程数+阻塞队列大小的时候会触发拒绝任务的处理。一共四种策略,分别为AbortPolicy(默认策略,会抛出异常)、CallerRunsPolicy(哪来的任务让哪里处理)、DiscardPolicy(丢掉任务,不抛出异常)、DiscardOldestPolicy(队列满了会尝试竞争,不会抛出异常)

用线程池启动线程:

线程池对象.execute(()->{});

方法介绍:

  • execute(Runnable):执行任务
  • submit(futureTask):执行任务,返回值为future对象,通过get方法可以拿到线程中的返回值
  • invokeAll(Collection<callable>):执行一组集合中的任务
  • invokeAll(Collection<callable>,time,timeUnit):执行一组集合中的任务,带超时
  • invokeAny(Collection<callable>):执行一组集合中的任务,返回最先执行完的线程返回值,其他线程停止
  • shutdown():打断线程,没执行完的线程执行完自动结束
  • shutdownNow:立刻结束所有线程

如何定义线程数的最大值:

  • CPU密集型:电脑核数为几就是几,保持效率最高。获取cpu的核数:Runtime.getRuntime().availableProcessors()
  • IO密集型:判断程序中耗IO的任务数量,让最大线程数大于这个值

传统延时任务:Timer,缺点是串行执行,如果在执行时其中某个任务失败,那么会导致后面的任务也执行失败

Timer timer=new Timer();
TimerTask timerTask1=new TimerTask() {
    @Override
    public void run() {
        System.out.println("asas");
    }
};
TimerTask timerTask2=new TimerTask() {
    @Override
    public void run() {
        System.out.println("222");
    }
};
timer.schedule(timerTask1,5000);//延时时间
timer.schedule(timerTask2,5000);

线程池延时任务:克服了timer的缺点

ScheduledExecutorService pool= Executors.newScheduledThreadPool(2);
        pool.schedule(()->{
            System.out.println("hahah");
        },1, TimeUnit.SECONDS);
        pool.schedule(()->{
            System.out.println("hahah");
        },1, TimeUnit.SECONDS);

线程池定时任务

ScheduledExecutorService pool= Executors.newScheduledThreadPool(2);
//每隔2s执行一次,任务开始后计算间隔时间,如果任务时长大于间隔时长,那么上一个任务执行完毕后马上会执行下一个任务
pool.scheduleAtFixedRate(()->{  
    System.out.println("hahah");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
},1,2,TimeUnit.SECONDS);
pool.scheduleWithFixedDelay(()->{ //每隔2s执行一次,任务执行完毕后才计算间隔时间
    System.out.println("hahah");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
},1,2,TimeUnit.SECONDS);
四大函数式接口
  • 函数式接口:Function,实现apply方法,传入一个参数,返回一个参数
  • 断定式接口:Predicate,实现test方法,传入一个参数,返回boolean类结果
  • 消费型接口:Consumer,实现accept方法,只有输入参数,没有返回值
  • 供给型接口:Supplier,实现get方法,没有参数,只有返回值
Stream流式计算

已知User类,有如下五个用户,返回id是偶数的、年龄大于22的、按照年龄排序、并且将用户名转化为大写、只输出第一个用户的用户名

/*

 */
User user1 = new User("1","user1",11);
User user2 = new User("2","user2",21);
User user3 = new User("3","user3",22);
User user4 = new User("4","user4",31);
User user5 = new User("6","user5",41);
List<User> users = Arrays.asList(user1, user2, user3, user4, user5);
users.stream()
        .filter(user -> Integer.parseInt(user.getId()) % 2 == 0)
        .filter(user->user.getAge()>22)
        .sorted((u1,u2)-> u2.getAge()-u1.getAge())
        .map((u)-> u.getName().toUpperCase())
        .limit(1)
        .forEach(System.out::println);
ForkJoin

Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork/Join框架要完成两件事情:

  1. 任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
  2. 执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。
    在这里插入图片描述

ForkJoinTask

使用Fork/Join框架,首先需要创建一个ForkJoin任务。该类提供了在任务中执行fork和join的机制。通常情况下我们不需要直接集成ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了两个子类:

  • RecursiveAction,用于没有返回结果的任务
  • RecursiveTask,用于有返回结果的任务
class SumTask extends RecursiveTask<Long> {
    static final int THRESHOLD = 20;
    int start;
    int end;
    SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // 如果任务足够小,直接计算:
            long sum = 0;
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }
        // 任务太大,一分为二:
        int middle = (end + start) / 2;
        SumTask subTask1 = new SumTask(start, middle);
        SumTask subTask2 = new SumTask(middle+1, end);
        invokeAll(subTask1, subTask2);
        Long subResult1 = subTask1.join();
        Long subResult2 = subTask2.join();
        return subResult1 + subResult2;
    }
}

ForkJoinPool

ForkJoinTask需要通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务(工作窃取算法)

ForkJoinPool pool = new ForkJoinPool();
SumTask task=new SumTask(1,500);
Future<Long> result = pool.submit(task);
System.out.println(result.get());
异步回调CompletableFuture

拿callable接口来说,可以获取返回值,那么有没有一种方法可以异步的获取返回值呢?可以使用线程池的submit方法来获取future对象,然后调用get方法取值。

ExecutorService executor = Executors.newFixedThreadPool(4); 
// 定义任务,Task类实现了callable接口:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞

但是以上的方法有所缺陷,因为get方法会引起主线程阻塞。从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。使用的模板如下:

// 第一个任务:
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
    return queryCode("中国石油");
});
// cfQuery成功后继续执行下一个任务:
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
    return fetchPrice(code);
});
// cfFetch成功后打印结果:
cfFetch.thenAccept((result) -> {
    System.out.println("price: " + result);
});
// 如果执行异常:
cfFetch.exceptionally((e) -> {
    e.printStackTrace();
    return null;
});

其中:

  • 创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象,即只有返回值
  • 继续一个CompletableFuture是通过thenApplyAsync实现的,它需要实现function类型接口,有参数有返回值。
  • 完成时,CompletableFuture会调用Consumer对象
  • 异常时,CompletableFuture会调用Function对象

任务合并:即将两个目的相同的任务合并为一个任务,其中的anyOf表示只要有一个执行成功即可,allOf表示每个都成功才可。语法如下:

CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQuery1, cfQuery2);

在这里插入图片描述

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-04-09 18:09:53  更:2022-04-09 18:11:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 6:07:48-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码