IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> java38-并发框架图-BlockingQueue-ForkJoin-Future-happensbefore- -> 正文阅读

[Java知识库]java38-并发框架图-BlockingQueue-ForkJoin-Future-happensbefore-

并发整体框架图

在这里插入图片描述

实现一个计数器(体现原子性)

方式1:使用count++

public class Counter{
	private int count;
	public void increase(){
		count++;
	}

问题
该代码可能在多线程条件下出现问题,因为count++是非原子的。
count++ 实际上等于三个操作:读数据,加1,写回数据。
解决
为了防止多线程下访问increase方法会报错,所以给increase方法加锁。
count变量修改了其他线程可能看不到,所以就加个volatile关键字吧。

方式2:加synchronized 和 volatile

public class LockCounter{
	private volatile count;
	public synchronized void increase(){
		count++;
	}
}

问题
加锁会影响效率,可以考虑使用原子操作类的形式。

方式3:AtomicInteger

public class AtomicCounter{
	private AtomicInteger count = new AtomicInteger(0);
	public void increase(){
		count.incrementAndGet();
	}
}

线程的并发和同步

区别同步异步
概念同步过程调用发出后,在没有得到结果之前,该调用不返回或不继续执行后续操作
多个线程同时访问同一个资源,一个线程访问结束之后,别的线程才能访问
异步过程调用后,调用者没有得到结果之前,就可以继续执行后续操作
调用结果,通过状态、通知和回调来通知调用者
操作必须执行到底才能执行其他操作操作可以同时执行
使用锁多个线程使用同一把锁多个线程在执行过程中使用不同的锁

Fork 和 Join

JUC中提供了Fork/Join的并行计算框架,用来处理分治的情况。
分治的思想 = 分而治之,把复杂的问题分解成相似的子问题,然后子问题再分子问题,知道问题分到很简单不必再划分为止,然后层层返回问题的结果。
分治分为两个阶段:分解任务 和 合并结果

第一个阶段:分解任务ForkJoinPool

public class ForkJoinPool extends AbstractExecutorService
把任务分解为一个个小任务直至小任务可以简单的计算返回结果;(Fork分解任务)
ForkJoinPool治理分治任务的线程池,它有多个任务队列(区别于ThreadPoolExecutor线程池),通过ForkJoinPool的invoke、submit、execute提交任务的时候会根据一定规则分配给不同的任务队列(该队列是双端队列)。
ForkJoinPool 有一个机制,当某个工作线程对应消费的任务队列空闲的时候它会去别的忙的任务队列的尾部分担任务过来执行,这种执行又被称之为窃取线程。因为这个性质,所以采用双端队列。
窃取线程如何保证不和被窃取任务的线程冲突。队列的绑定工作线程都从队列头部取任务进行执行,窃取线程会从别的队列的尾部取任务执行。
提交任务方法

提交任务方法说明
< T > T invoke(ForkJoinTask< T > task)指定给定的任务,完成后返回其结果
同步,有返回结果(会阻塞)
Future< ? > submit(Runnable task)提交一个可运行的任务执行,返回一个表示该任务的Future
异步,有返回结果(Future<>)
void execute(Runnable task)在将来的某个时刻执行给定的命令
异步,无返回结果

构造方法
ForkJoinPool(int parallelism) —— 使用指定的并行级别创建一个ForkJoinPool,使用所有其他的参数的默认值。
完整的参数介绍如下:

参数说明
int parallelism并行级别 = 也就是设置最大并发数
默认值:Runtime.availableProcessors()
ForkJoinPool.ForkJoinWorkerThreadFactory factory创建新线程的工厂
默认值 defaultForkJoinWorkerThreadFactory
Thread.UncaughtExceptionHandler Handler由于执行任务时遇到不可恢复的错误而终止的内部工作线程的处理程序
默认值 null
boolean asyncModetrue = 为从未连接的分叉任务简历本地先进先出调度模式;默认值false = 基于本地堆栈的模式
int corePoolSize核心线程数 = 保留在线程池中的线程数 ;默认值 = 并行级别数
int maximumPoolSize允许的最大线程数;默认256
int minimumRunnable未被连接组织的核心线程允许的最小数量;默认值是 1
Predicate< ? super ForkJoinPool > saturate未被连接阻止的核心线程允许的最小数量;默认情况下,当一个线程即将被阻止连接或ForkJoinPool.ManagedBlocker,但由于超过maximumPoolSize不能被替换,因此抛出RejectExecutionException
long keepAliveTime在线程终止之前自上次使用以来经过的时间;默认值 60
unitkeepAliveTime 参数的时间单位

双端队列
概念:
限定插入和删除操作在表的两端进行的线性表,两端分别称为端点1和端点2,具有队列和栈性质的数据结构。
普通队列:队列头部删除元素、队列尾部添加元素;
双端队列:队列头部添加元素、队列尾部删除元素。

第二个阶段:合并结果

把每个小任务的结果合并返回得到最终结果。(Join合并结果)
ForkJoinTask,分治任务,等同于Runnable。
抽象类,核心方法是fork和join。fork用来异步执行一个子任务,join会阻塞当前线程等待子任务返回。
ForkJoinTask有两个子类:RecursiveAction 和 RecursiveTask 都是抽象类,通过递归来执行分治任务。
每个子类都有compute抽象方法(任务执行的主要计算量),区别在于RecursiveAction没有返回值、RecursiveTast有返回值。

代码思路

1 创建ForkJoinPool,用于执行分治任务的线程池,parallelism = 并行级别,并发线程数
	ForkJoinPool forkjoinpool = new ForkJoinPool(int parallelism);
2 创建具体操作执行的类,需要继承ForkJoinTask类 或者其子类
	public class ForkJoinTaskxhjextends RecursiveTask{}
3 创建ForkJoinTaskxhj类的对象
	ForkJoinTaskxhj类 obj = new ForkJoinTaskxhj类(参数);
4 通过ForkJoinPool对象的invoke方法提交任务并执行
	forkjoinpool.invoke(obj);

案例应用:斐波那契数列

斐波那契数列:1-1-2-3-5-8-13-21-34…
公式:F(1) = 1,F(2) = 1,F(n) = F(n-2) + F(n-1) (n>=3,n为正整数)

package offer2.Test52;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class Test2 {
    public static void main(String[] args) {
        // 治理分治任务的线程池
        ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 最大并发数4
        // 要计算的斐波那契数列 的 第几个元素
        Fibonacci fibonacci = new Fibonacci(20);
        // 任务执行开始 的时间
        long startTime = System.currentTimeMillis();
        // 指定给定的任务,完成返回其结果
        // forkJoinPool对象.invoke方法的参数是ForkJoinTask类,则就是它的对象或者其后代对象
        // 本例中 fibonacci 是ForkJoinTask类的后代对象。
        Integer result = forkJoinPool.invoke(fibonacci);
        // 任务执行完毕 的时间
        long endTime = System.currentTimeMillis();
        System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
    }
// 定义Fibonacci数列方法。继承自RecursiveTask泛型类
// Fibonacci类的定义采用的是成员内部类的形式
    static  class Fibonacci extends RecursiveTask<Integer> {
        // 成员变量
        final int n;
        // 构造方法
        Fibonacci(int n) {
            this.n = n;
        }
        // compute抽象方法重写 任务的主要计算量
        @Override
        protected Integer compute() {
            if (n <= 1) {
                return n;
            }
            Fibonacci f1 = new Fibonacci(n - 1);
            f1.fork();
            Fibonacci f2 = new Fibonacci(n - 2);
            return f2.compute() + f1.join();
        }
    }
}

阻塞队列BlockingQueue

概念

  • 阻塞队列与普通队列的区别:当队列是空的时候,从队列中获取元素的操作将会阻塞;当队列是满的,往队列里面添加元素的操作会被阻塞。
  • 是java.util.concurrent包提供的接口,常用于多线程编程中容纳任务队列。
  • 适用阻塞队列的好处:不需要关心什么阻塞线程、什么时候唤醒线程,阻塞队列都帮助我们考虑了。

常用实现类

类名说明
ArrayBlockingQueue数组结构组成的有界阻塞队列
LinkedBlockingQueue链表结构组成的有界阻塞队列
LinkedTransferQueue链表结构组成的无界阻塞队列,和SynchronousQueue类似,还含有非阻塞方式
LinkedBlockingDeque链表解耦组成的双向阻塞队列
SynchronousQueue不存储元素的阻塞队列,即直接提交给线程不保持它们 = 单个元素的队列
PriorityBlockingQueue支持优先级排序的无界阻塞队列
DelayQueue使用优先级队列实现的延迟无界阻塞队列

常用方法

方法类型抛出异常特殊值阻塞超时
插入add(e)
阻塞队列慢,再插入元素抛出IllegalStateException:Queue full…
offer(e)
成功返回true,失败返回false
put(e)
阻塞队列满,生产线程持续执行put操作,队列会阻塞生产线程直到put数据成功或响应中断退出
offer(e,time,unit) 阻塞队列满,队列会阻塞生产线程一定时间,超时后生产线程会退出
移除remove()
阻塞队列为空,执行remove操作会抛出NoSuchElementException
poll()
成功返回队列元素,队列没有则返回null
take()
阻塞队列为空,消费者线程从队列里take元素,队列会一直阻塞消费者线程直到队列可用
poll()
当队列为空,队列会阻塞消费者线程一段时间,超过时限后消费者线程退出
检查element()
阻塞队列为空,执行element会抛出NoSuchElementException异常
peek()
返回队列的第一个元素
不可用不可用

LinkedBlockingQueue

  • 使用LinkedBlockingQueue< E > 实现线程同步。
    LinkedBlockingQueue< E > 是一个基于已连接节点的,范围任意的阻塞队列。
    该队列按照先进先出排序元素。
    队列的头部是在队列中时间最长的元素,队列的尾部是在队列中时间最短的元素,新元素插入到队列的尾部。
    获取队列元素的操作只会获取头部元素,如果队列满了或者为空会进入阻塞状态。
  • 常用方法:
方法名说明
LinkedBlockingQueue()创建一个容量为Integer.MAX_VALIE的LinkedBlockingQueue
put( E e )在队尾添加一个元素,如果队列满则阻塞当前线程,直到队列有空位
size()返回列表中的元素个数
take()移除并返回头部元素,如果队列空则阻塞当前线程,直到取到元素为止

Future

概述

在JUC包中。
public interface Future< V >
Future模式很好的解决了那些需要返回值的异步调用。
Future表示异步计算的结果。提供方法来检查计算是否完成,等待其完成,并检索计算结果。只有当计算完成之后,才能使用get方法检索结果;如果需要则阻塞,直到准备就绪。
Future模式本质上是代理模式的一种实际应用。
经常使用Future的场景:计算密集场景、处理大数据量、远程方法调用;
当执行一个长时间运行的任务时,使用Future就可以让我们暂时去执行其他任务,等长任务执行完毕后再返回其结果。

Future模式组成部分

组成说明
Main系统启动,调用Client发出请求
Client返回Data对象,立即返回FutureData,并开启ClientThread线程装配RealData
Data返回数据的接口
FutureDataFuture数据,构造快,虚拟数据,需要装配RealData(订单)
RealData真实数据,构造慢(午餐)

图示:
在这里插入图片描述

简单案例

package offer2.Test53.tast1;

// 创建Data
public interface Dataxhj {
    // 抽象类
    public String getResult();
}

// 创建继承自Dataxhj接口的真实实现类
public class RealDataXhj implements Dataxhj{
    // protected受保护的变量;
    protected final String result;
    // 带参构造方法
    public RealDataXhj(String para){
        StringBuffer sb = new StringBuffer(para);
        // 假设这里很慢,构造一个RealDataXhj不是一个容易的事
        result = sb.toString();
    }
    @Override
    public String getResult() {
        return result;
    }
}

// future模式的核心
public class FutureData implements Dataxhj {
    // 创建RealDataXhj对象
    protected RealDataXhj realDataXhj = null;
    protected boolean isReady = false;

    // 设置RealDataXhj对象
    public synchronized void setRealDataXhj(RealDataXhj realDataXhj){
        if(isReady){
            return;
        }
        this.realDataXhj = realDataXhj;
        isReady = true;
        // RealDataXhj已经被注入,通知getResult,唤醒线程
        notifyAll();
    }

    // 等待RealDataXhj构造完成
    @Override
    public synchronized String getResult() {
        while(!isReady){
            try {
                // 等待 直到RealDataXhj被注入
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return realDataXhj.result;
        // RealDataXhj类中的result成员变量
    }
}

// 从Client得到DataXhj
public class Client {
   public Dataxhj request(final String queryStr){
       // 创建FutureData对象
       final FutureData future = new FutureData();
       // 创建线程并启动
       new Thread(){
           @Override
           public void run() {
               // RealData构建很慢,在单独的线程中进行
               RealDataXhj realDataXhj = new RealDataXhj(queryStr);
               // setRealData的时候会notify等待这个future上的对阿宁
               future.setRealDataXhj(realDataXhj);
           }
       }.start();
       System.out.println(future.getResult());
       // FutureData会立即返回,不会等待RealDataXhj被构造完
       return future;
   }
}

public class Mainxhj {
    public static void main(String[] args) {
        Client client = new Client();
        //这里会立即返回,因为得到的是FutureData而不是RealData
        Dataxhj dataxhj = client.request("xhjname");
        System.out.println("请求完毕");
        try {
            //这里可以用一个sleep代替了对其他业务逻辑的处理
            //在处理这些业务逻辑的过程中,RealData被创建,从而充分利用了等待时间
            Thread.sleep(2000);
        } catch (InterruptedException e) {
//            e.printStackTrace();
        }
        //使用真实的数据,如果到这里数据还没有准备好,getResult()会等待数据准备完,再返回
        System.out.println("数据 = " + dataxhj.getResult());
    }
}
问题:2022/5/3
输出结果 dataxhj.getResult显示内容为空。
个人理解输出结果是:数据 = xhjname
解决:RealData的构造方法并没有使用传入参数para,new StringBuilder(para) 改为这种形式就好了

java中的Future模式

JDK内部有一个Future接口,类似之前案例中的订单。
常用方法:

方法名说明
boolean cannel(boolean manInterruptIfRunning)尝试取消执行此任务
V get()等待计算完成,然后检索其结果
V get(long timeout,TimeUnit unit)如果需要等待最多在给定的时间计算完成,然后索引其结果
boolean isCancelled()如果此任务在正常完成之前被取消,返回true
boolean isDone()如果此任务完成,则返回true

案例:2022/5/3不是很理解

package offer2.Test53.tast1;

// 创建Data
public interface Dataxhj {
    // 抽象类
    public String getResult();
}

// 创建继承自Dataxhj接口的真实实现类
public class RealDataXhj implements Dataxhj, Callable {
    // protected受保护的变量;
    protected final String result;
    // 带参构造方法
    public RealDataXhj(String para){
        StringBuffer sb = new StringBuffer(para);
        // 假设这里很慢,构造一个RealDataXhj不是一个容易的事
        result = sb.toString();
    }
    @Override
    public String getResult() {
        return result;
    }

    @Override
    public Object call() throws Exception {
        return result;
    }
}

public class Test2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建一个固定长度的线程池
        ExecutorService executor = Executors.newFixedThreadPool(1);
        final Future<?> future = executor.submit((Callable) new RealDataXhj("xhj"));
        System.out.println("请求完毕,数据准备中");
        try {
            // 仍旧可以做额外的数据操作,这是使用sleep代替
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("数据 = " + future.get());
    }
}

happens-before

happens-before是JMM最核心的概念。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-05-04 07:25:22  更:2022-05-04 07:25:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 0:40:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码