IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 大量mapper IO优化(使用多线程异步+CountDownLatch) -> 正文阅读

[Java知识库]大量mapper IO优化(使用多线程异步+CountDownLatch)

实战业务优化方案总结—主目录https://blog.csdn.net/grd_java/article/details/124346685

经过测试,优化前的接口需要20多秒才能返回结果,优化后一般只需要5秒左右,如果查询频繁会变成500ms左右(因为有缓存)。

  1. 我的场景是,单个查询接口,涉及大量IO操作,无法减少IO的数量。
  2. 每个IO都是去数据库查询IO,不考虑优化sql,使用其它方法优化接口响应速度。

当查询大报表,并且sql没有太大优化空间的情况下,该如何提升效率呢?

  1. 也就是说现在IO太多,但是也没办法,业务确实需要这么多的IO
  2. 单个IO的效率优化,已经没有太多优化空间了,也就是优化sql的方案不可取
  3. 那么问题就是如何让这些IO能快一点

一般这种情况,就可以考虑多线程了,而且大的报表一般是不用考虑高并发的。所以直接将内容都写到方法中,不定义到类中,就没有线程安全问题。因为没有高并发,也不用考虑栈溢出的问题(每个线程执行都会有一个,线程私有,生命周期与线程相同,是Java方法执行的线程内存模型,每个方法被执行时,Java虚拟机都会同步创建一个栈帧(Stack Frame)用于存储局部变量表、操作数栈、动态连接、方法出口等信息)

当然,用到多线程,就需要考虑很多线程同步问题,而且异步执行IO,如何最终整合结果,都是需要解决的问题

我采用的方案如下

  1. 使用submit()方法进行异步提交
  2. 使用Callable接口的call方法,不使用Runnable接口的run。习惯了,需要返回值的场景,我都统一用Callable,用其它的也行。
  3. Callable接口的call方法可以给我们一个Future类型的返回值(不是只有它可以给Future返回值),我们可以通过这个Future获取线程中代码的返回值
  4. 使用JUC工具类中的倒数门栓CountDownLatch来做线程同步
  5. 为了不重复的写线程代码,编写统一工具类,通过反射来进行代码的执行

想要实现的效果如下(我希望和原来直接调用mapper接口一样简单,并且还能将单线程同步IO,实现为多线程异步IO。)

  1. 没改之前的代码,直接调用mapper接口,但是只有执行完第一个接口,才能执行第二个,以此类推。
List<String> provinces = cmsReportMapper.querySaleProvince();
List<ConfirmationsDTO> confirmationsDTOS = cmsReportMapper.queryConfirmationsByMonth(currentYear);
List<ConfirmationsDTO> confirmationsDTOSTotal = cmsReportMapper.queryConfirmationsTotal(currentYear);
  1. 改之后的,可以发现,对于调用mapper几乎是一样的,只不过需要额外传输反射需要的东西和CountDownLatch倒数门栓对象。不过实现了异步,这些mapper将一起异步执行。
Future<Object> provincesFuture = statementThreadUtils.runOneIOLatchCountDown(cmsReportMapper, latch, "querySaleProvince", null);
Future<Object> confirmationsDTOSFuture = statementThreadUtils.runOneIOLatchCountDown(cmsReportMapper, latch, "queryConfirmationsByMonth", oneStringClass, currentYear);
Future<Object> confirmationsDTOSTotalFuture =statementThreadUtils.runOneIOLatchCountDown(cmsReportMapper,latch,"queryConfirmationsTotal",oneStringClass,currentYear);
  1. 完整使用的形式,可见实现起来还是很方便的。让所有IO都异步执行了。
/**
     * 用给定的初始值,创建一个新的线程池 =======>>>>>>可使用全局配置的线程池,推荐定义处理IO密集型的专用线程池
     * @param corePoolSize 核心线程数量
     * @param maximumPoolSize 最大线程数量 (线程等待时间与线程CPU时间之比 + 1)* CPU数目
     * @param keepAliveTime 当线程数大于核心线程数量时,空闲的线程可生存的时间
     * @param unit 时间单位
     * @param workQueue 任务队列,只能包含由execute提交的Runnable任务
     * @param threadFactory 工厂,用于创建线程给线程池调度的工厂,可以自定义
     * @param handler 拒绝策略(可以自定义,JDK默认提供4种),当线程边界和队列容量已经满了,新来线程被阻塞时使用的处理程序
     */
    //《Java并发编程实战》 IO 密集型计算场景;;;;线程数 = CPU 核心数 * (1 + IO 耗时/ CPU 耗时)
    //此系统,IO耗时为400,CPU耗时为4 ===== 101 * 核心数 ===== 101*8
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,808,10,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(50),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
    /**20220421-yinzhipeng=====>>>>>>>>>优化响应速度 ===>> 优化前 800ms ====>>>> 优化后 200ms**/
        @Override
    public List<ConfirmationsDTO> queryConfirmationsBySaleProvince(String currentYear) {
        LinkedList<ConfirmationsDTO> result  = new LinkedList<>();

        try{
            //倒数门栓,用于同步线程
            final CountDownLatch latch = new CountDownLatch(3);
            //报表线程工具类
            StatementThreadUtils statementThreadUtils = new StatementThreadUtils(threadPoolExecutor);
            //用于反射,代表要调用的Mapper接口,参数列表为一个String
            Class[] oneStringClass = {String.class};

//            List<String> provinces = cmsReportMapper.querySaleProvince();
//            provinces.add("其他");
//            List<ConfirmationsDTO> confirmationsDTOS = cmsReportMapper.queryConfirmationsByMonth(currentYear);
//            List<ConfirmationsDTO> confirmationsDTOSTotal = cmsReportMapper.queryConfirmationsTotal(currentYear);
            Future<Object> provincesFuture = statementThreadUtils.runOneIOLatchCountDown(cmsReportMapper, latch, "querySaleProvince", null);
            Future<Object> confirmationsDTOSFuture = statementThreadUtils.runOneIOLatchCountDown(cmsReportMapper, latch, "queryConfirmationsByMonth", oneStringClass, currentYear);
            Future<Object> confirmationsDTOSTotalFuture =statementThreadUtils.runOneIOLatchCountDown(cmsReportMapper,latch,"queryConfirmationsTotal",oneStringClass,currentYear);
            //=================latch.await();=========================//
            latch.await();//等待所有异步都完成,倒数门栓的个数count=0,执行后面代码
            List<String> provinces = (List<String>)provincesFuture.get();
            provinces.add("其他");
            List<ConfirmationsDTO> confirmationsDTOS = (List<ConfirmationsDTO>)confirmationsDTOSFuture.get();
            List<ConfirmationsDTO> confirmationsDTOSTotal =(List<ConfirmationsDTO>)confirmationsDTOSTotalFuture.get();
            //==================latch.await();========================//
            //逻辑处理代码几百行,略。。。。。。。。。。。。。。。。。
            provinces.forEach(province -> {}
                
            result.addFirst(confirmation);
            return result;
        }catch (Exception e){
            e.printStackTrace();
            log.error("cms确认书件数统计表SystemError-->"+e.getMessage());
            return result;
        }


    }

具体工具类是如何封装的呢?就是简单的异步代码,用反射调用了mapper接口
在这里插入图片描述

当然,还提供了批量处理的工具方法,使用效果如下
在这里插入图片描述

String minSignatureDate = cmsReportMapper.queryMinSignatureDate();
String currentDate = DateUtils.currentDate();
if(StringUtils.isEmpty(endTime)){
    endTime = currentDate;
}
if(StringUtils.isEmpty(startTime)){
    startTime = minSignatureDate;
}

//倒数门栓,用于同步线程
final CountDownLatch latch = new CountDownLatch(9);
//报表线程工具类
StatementThreadUtils statementThreadUtils = new StatementThreadUtils(threadPoolExecutor);
//要调用的接口的参数表和参数,重复率很高,所以统一指定
Class[] threeString = {String.class, String.class, String.class};
Class[] twoString = {String.class, String.class};
Object[] threeArgs = {null, startTime, endTime};
Object[] twoArgs = {startTime, endTime};

//2021特殊企划件保费省份分布
List<PremiumProvinceDTO> areaSpecialPremium2021 = new ArrayList<>();
//2021延迟申请件保费省份分布
List<PremiumProvinceDTO> areaDelayPremium2021 = new ArrayList<>();

//2021特殊企划件趸交、3年期、五年期、10年期保费省份分布  目前只有三年期和十年期
List<ConfirmationPremiumDTO> specialPlanPremiumYearByTime = new ArrayList<>();
//2021延迟申请件趸交、3年期、五年期、10年期保费省份分布
List<ConfirmationPremiumDTO> delayPremiumYearByTime = new ArrayList<>();
//===============批量异步=====================下面是上面注释的异步submit代码的替换新式,所需代码量更少==============================================//
//要调用的mapper接口,和方法
Object[] mappers = {
        cmsReportMapper, cmsReportPremiumMapper, cmsReportPremiumMapper,
        cmsReportPremiumMapper, cmsReportPremiumMapper, cmsReportPremiumMapper,
        cmsReportPremiumMapper, cmsReportPremiumMapper, cmsReportPremiumMapper};
String[] mapperMethodNames = {
        "querySaleProvince", "queryAreaNormalPremiumTotal", "queryAreaSpecialPremium",
        "queryNormalPremiumYearByTime", "queryAreaSpecialPremium2021", "querySpecialPremiumYearByTime",
        "querySpecialPlanPremiumYearByTime", "queryAreaDelayPremium2021", "queryDelayPremiumYearByTime"};
//要调用的mapper对应方法的参数表
Class[][] methodArgsArr = {
        null, threeString, threeString,
        twoString, twoString, twoString,
        twoString, twoString, twoString};
//要调用mapper的方法实际要传的参数
Object[][] argsArr = {
        null, threeArgs, threeArgs,
        twoArgs, twoArgs, twoArgs,
        twoArgs, twoArgs, twoArgs};
//每个接口异步执行完成后,对应返回结果Future的key===可以不指定,默认用接口方法名作为key
String[] keys = {
        "provincesFuture", "areaNormalPremiumTotalFuture", "areaSpecialPremiumFuture",
        "normalPremiumYearByTimeFuture", "areaSpecialPremium2021Future", "specialPremiumYearByTimeFuture",
        "specialPlanPremiumYearByTimeFuture", "areaDelayPremium2021Future", "delayPremiumYearByTimeFuture"};
//进行批量异步
HashMap<String, Future> stringFutureHashMap =
        statementThreadUtils.runOneIOLatchCountDownMap(keys, mappers, latch, mapperMethodNames, methodArgsArr, argsArr);
//===============latch.await();=================//
latch.await();
List<String> provinces = (List<String>)stringFutureHashMap.get("provincesFuture").get();
provinces.add("其他");
//正常件保费省份分布
List<PremiumProvinceDTO> areaNormalPremiumTotal = (List<PremiumProvinceDTO>)stringFutureHashMap.get("areaNormalPremiumTotalFuture").get();
//特殊件(刨除2021年)保费省份分布
List<PremiumProvinceDTO> areaSpecialPremium = (List<PremiumProvinceDTO>)stringFutureHashMap.get("areaSpecialPremiumFuture").get();
//正常件保费 趸交、3年期、五年期、10年期
List<ConfirmationPremiumDTO> normalPremiumYearByTime = (List<ConfirmationPremiumDTO>)stringFutureHashMap.get("normalPremiumYearByTimeFuture").get();
//特殊件保费(刨除2021年) 趸交、3年期、五年期、10年期querySpecialPremiumYearByTime
List<ConfirmationPremiumDTO> specialPremiumYearByTime = (List<ConfirmationPremiumDTO>)stringFutureHashMap.get("specialPremiumYearByTimeFuture").get();
//如果endTime and startTime 均大于2021-12-31
areaSpecialPremium2021.addAll((List<PremiumProvinceDTO>)stringFutureHashMap.get("areaSpecialPremium2021Future").get());
areaDelayPremium2021.addAll((List<PremiumProvinceDTO>)stringFutureHashMap.get("areaDelayPremium2021Future").get());
specialPlanPremiumYearByTime.addAll((List<ConfirmationPremiumDTO> )stringFutureHashMap.get("specialPlanPremiumYearByTimeFuture").get());
delayPremiumYearByTime.addAll((List<ConfirmationPremiumDTO>)stringFutureHashMap.get("delayPremiumYearByTimeFuture").get());
//================latch.await();================//

工具类的源码如下

import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.*;
/**
 * 20220421==>>>yinzhipeng
 * 报表专用线程工具类
 * 主要用于优化 IO 密集型计算场景
 */
public class StatementThreadUtils {
    ThreadPoolExecutor threadPoolExecutor = null;
    public StatementThreadUtils(ThreadPoolExecutor threadPoolExecutor) {
        this.threadPoolExecutor = threadPoolExecutor;
    }

    /**
     * 一个用倒数门栓的IO执行(一个异步线程中,只负责一个IO,需要花费一个门栓)
     * @param o 要执行IO的对象
     * @param latch 倒数门栓
     * @param method 要执行的io方法
     * @param methodArgs 要执行的io方法的参数,没有就是null
     * @param args 方法的参数
     */
    public Future<Object> runOneIOLatchCountDown(Object o,CountDownLatch latch, String method,Class[] methodArgs,Object... args){
        Future<Object> submit = threadPoolExecutor.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                Object result = null;
                try{
                    Method method1 = o.getClass().getMethod(method,methodArgs);
                    result = method1.invoke(o, args);
//                    return result;
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    latch.countDown();
                    return result;
                }
            }
        });
        return submit;
    }
    /**
     * 多个用倒数门栓的IO执行(就是一个异步中,执行多个IO操作,多个IO只用一个门栓count-1和一个线程)
     * @param objs 要执行IO的对象
     * @param latch 倒数门栓
     * @param methods 要执行的io方法
     * @param methodArgsArr 要执行的io方法的参数,没有就是null
     * @param argsArr 方法的参数
     * @return
     */
    public Future<Object> runManyIOLatchCountDown(Object[] objs,CountDownLatch latch, String[] methods,Class[][] methodArgsArr,Object[][] argsArr){
        Future<Object> submit = threadPoolExecutor.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                ArrayList<Object> results = new ArrayList<>();
                try{
                    for (int i = 0; i < objs.length; i++) {
                        Object result = null;
                        Method method1 = objs[i].getClass().getMethod(methods[i],methodArgsArr[i]);
                        result = method1.invoke(objs[i], argsArr[i]);
                        results.add(result);
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    latch.countDown();
                    return results;
                }
            }
        });
        return submit;
    }
    /**
     * 一个用倒数门栓的IO执行-----批量---返回List(同样是一个线程异步一个IO,只不过这个是批量的,可以用数组把需要异步的IO都传输过来)
     * @param objs 要执行IO的对象
     * @param latch 倒数门栓
     * @param methods 要执行的io方法
     * @param methodArgsArr 要执行的io方法的参数,没有就是null
     * @param argsArr 方法的参数
     * @return ArrayList<Future>
     */
    public ArrayList<Future> runOneIOLatchCountDownList(Object[] objs,CountDownLatch latch, String[] methods,Class[][] methodArgsArr,Object[][] argsArr){
        ArrayList<Future> futures = new ArrayList<>();
        for (int i = 0; i < objs.length; i++) {
            int finalI = i;
            Future<Object> submit = threadPoolExecutor.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    Object result = null;
                    try{
                        Method method1 = objs[finalI].getClass().getMethod(methods[finalI],methodArgsArr[finalI]);
                        result = method1.invoke(objs[finalI], argsArr[finalI]);
//                    return result;
                    }catch (Exception e){
                        e.printStackTrace();
                    }finally {
                        latch.countDown();
                        return result;
                    }
                }
            });
            futures.add(submit);
        }
        return futures;
    }
    /**
     * 一个用倒数门栓的IO执行-----批量---返回Map
     * @param objs 要执行IO的对象
     * @param latch 倒数门栓
     * @param methods 要执行的io方法
     * @param methodArgsArr 要执行的io方法的参数,没有就是null
     * @param argsArr 方法的参数
     * @return HashMap<String, Future>
     */
    public HashMap<String, Future> runOneIOLatchCountDownMap(Object[] objs,CountDownLatch latch, String[] methods,Class[][] methodArgsArr,Object[][] argsArr){
        return runOneIOLatchCountDownMap(methods,objs,latch,methods,methodArgsArr,argsArr);
    }
    /**
     * 一个用倒数门栓的IO执行-----批量---返回Map----指定key版
     * @param keys 异步执行后,保存到Map中的key。
     * @param objs 要执行IO的对象
     * @param latch 倒数门栓
     * @param methods 要执行的io方法
     * @param methodArgsArr 要执行的io方法的参数,没有就是null
     * @param argsArr 方法的参数
     * @return HashMap<String, Future>
     */
    public HashMap<String, Future> runOneIOLatchCountDownMap(String[] keys,Object[] objs,CountDownLatch latch, String[] methods,Class[][] methodArgsArr,Object[][] argsArr){
        HashMap<String, Future> futuresMap = new HashMap<>();
        for (int i = 0; i < objs.length; i++) {
            int finalI = i;
            Future<Object> submit = threadPoolExecutor.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    Object result = null;
                    try{
                        Method method1 = objs[finalI].getClass().getMethod(methods[finalI],methodArgsArr[finalI]);
                        result = method1.invoke(objs[finalI], argsArr[finalI]);
//                    return result;
                    }catch (Exception e){
                        e.printStackTrace();
                    }finally {
                        latch.countDown();
                        return result;
                    }
                }
            });
            futuresMap.put(keys[finalI],submit);
        }
        return futuresMap;
    }

    /**
     * 获得真正的处理对象,可能多层代理.
     */
    public static <T> T realTarget(Object target) {
        if (Proxy.isProxyClass(target.getClass())) {
            MetaObject metaObject = SystemMetaObject.forObject(target);
            return realTarget(metaObject.getValue("h"));
        }
        return (T) target;
    }
}
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-04-23 10:43:02  更:2022-04-23 10:46:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 4:23:43-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码