IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 2021-09-02 -> 正文阅读

[Java知识库]2021-09-02

@hello,很久没更了,最近上班太忙了,还有楼主太懒。。。。今天给大家分享一篇多线程批量处理数据库数据的解耦解决方案。。文章有点糙。。望谅解。

多线程批量处理任务(解耦,提高效率,保证事务一致性)

知识点

1.aop
2.线程池
3.线程通信
4.事务

案例:批量导入更新数据库数据,当用户导入上千条时,由于业务复杂,最终的执行sql相当于数据条数的十倍以上,更新时长超过80s最终超时异常。

我的优化方案(有更好的方案分享给博主哟,欢迎指正):
1-异步处理,提示用户稍候查看,后台处理;
2-同步处理,并行处理,更新脚本拼接,只做一次数据库链接更新,减少创建链接销毁链接的开销。因为特殊业务场景,不允许第一种方案,我选择了第二种,最终实现的效果由80s降低到10s以内。
整体思路:
		1.拦截批量处理方法
	    2.在环绕通知中,调用任务处理方法mainThreadRun();
	    3.通过BatchDealWithAnt注解的参数index,找到批量处理集合
	    4.通过BatchDealWithAnt注解的参数minMath,判断是否需要对方法中批量处理的list集合进行切割,不需要则直接调用proceed方法执行放行。
		5.如果满足批量处理条件,根据cpu核心数确定任务执行线程数nThreads。
		6.创建线程池,事务回滚对象BatchWithRollBack(这里最好用对象),声明主线程计数器(1)、子线程计数器(nThreads)
		7.拆分任务,组装参数
		8.自定义线程对象,实现Callable接口,重写call()方法,在call()方法中调用proceed方法,并传入切割过后的参数,返回值类型可以根据业务自定
		9.将线程任务提交给线程池
		10.任务完全提交后,主线程调用子线程计数器的await方法等待所有子线程完成任务
		11.子线程完成后,调用子线程计数器的countDown方法,计数-1,然后调用主线程计数器的await方法,等待主线程放行
		12.子线程全部执行完成,主线程根据await()方法的返回值判断是否回滚,如果回滚,BatchWithRollBack对象设置为ture,并执行主线程计数器的countDown方法放行
		13.此时子线程放行,判断回滚对象BatchWithRollBack是否回滚,如果是,则抛出异常
		最终结果:任何一个子线程处理失败,全部回滚,当且仅当所有线程执行成功,事务才全部提交
		注意:子任务必须加入事务,主线程等待所有子任务完成时间不能太长,否则导致线程占用资源时间过长,数据库锁住时间过长!

第一步,自定义注解、拦截器

//自定义注解

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface BatchDealWithAnt {

    /**
     * 超时时间/秒(超过时间抛出异常,默认5s)
     **/
    int timeout() default 5;

    /**
     * 超过多少条数据多线程执行
     **/
    int minMath() default 200;

    /**
     * 需要批量处理的参数大型集合下标,默认为0
     **/
    int index() default 0;
}

//自定义拦截器

@Aspect
@Component
@Slf4j
public class BatchDealWithAspectj {

    /**
     * 批量处理子任务接口
     *
     **/
    @Resource
    private BatchWithService batchWithService;


    /**
     * 批量处理拦截切入标识
     **/
    @Pointcut("@annotation(com....BatchDealWithAnt)")
    public void batchDealWithPointCut(){

    }

    /**
     * @Description: 方法环绕
     * @Author: ChenGang
     * @Date: 2021/8/31 15:45
     **/
    @Around(value = "batchDealWithPointCut()")
    public Object batchDealWithAround(ProceedingJoinPoint joinPoint){
        //声明返回值
        Object res = null;
        //获取方法签名
        Signature signature = joinPoint.getSignature();
        MethodSignature methodSignature = (MethodSignature) signature;
        //获取方法对象
        Method method = null;
        try {
            method = joinPoint.getTarget().getClass().getMethod(joinPoint.getSignature().getName(), methodSignature.getParameterTypes());
        } catch (NoSuchMethodException e) {
            throw new BaseException("=======批量处理异常!");
        }
        String methodName = method.getName();
        String className = joinPoint.getTarget().getClass().getSimpleName();
        BatchDealWithAnt batchDealWithAnt = method.getAnnotation(BatchDealWithAnt.class);
        try {
            //执行处理逻辑
            res =  mainThreadRun(joinPoint,batchDealWithAnt,className,methodName);

        } catch (Throwable throwable) {
            throwable.printStackTrace();
            log.error("=======批量处理:{}--{}异常!",className,methodName);
            throw new BaseException("=======批量处理:"+className + "--"+methodName+"异常!");
        }
        return res;
    }


    /**
     * @Description: 任务分配逻辑
     * @Author: ChenGang
     * @Date: 2021/8/31 15:40
     **/
    public Object mainThreadRun(ProceedingJoinPoint joinPoint,BatchDealWithAnt batchDealWithAnt,String className,String methodName) throws Throwable {
        Object res = null;
        //获取参数列表
        Object[] args = joinPoint.getArgs();
        Object tasks = args[batchDealWithAnt.index()];
        if (tasks instanceof List){
            List tasksList = (List) tasks;
            if (tasksList.size() < batchDealWithAnt.minMath()){
                log.info("=======批量处理:{}--{}低于处理阈值,正常执行......",className,methodName);
                res = joinPoint.proceed(args);
                return res;
            }
            if (batchDealWithAnt.timeout() > 30){
                log.info("=======批量处理:{}--{}超过最大等待时间,正常执行......",className,methodName);
                res = joinPoint.proceed(args);
                return res;
            }

            log.info("=======批量处理:为{}--{}高于处理阈值,预分配任务中......",className,methodName);

            //每条线程最小处理任务数
            int perThreadHandleCount = 1;
            //线程池的默认最大线程数
            int cupNum = Runtime.getRuntime().availableProcessors();
            int nThreads = (int)Math.floor(cupNum*0.7*2) ;
            int taskSize = tasksList.size();

            perThreadHandleCount = taskSize % nThreads == 0 ? taskSize / nThreads : taskSize / nThreads + 1;
            nThreads = taskSize % perThreadHandleCount == 0 ? taskSize / perThreadHandleCount : taskSize / perThreadHandleCount + 1;

            log.info("=======批量处理,共创建 {}个线程",nThreads);
            //监控主线程
            CountDownLatch mainLatch = new CountDownLatch(1);
            //监控子线程
            CountDownLatch threadLatch = new CountDownLatch(nThreads);
            //必须要使用对象,如果使用变量会造成线程之间不可共享变量值
            BatchWithRollBack rollBack = new BatchWithRollBack(false);
            //创建线程池
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("批量处理:" + className + " - " + methodName + "-pool-").build();
            ThreadPoolExecutor fixedThreadPool = new ThreadPoolExecutor(
                    nThreads,
                    nThreads,
                    batchDealWithAnt.timeout(),
                    TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(nThreads),
                    threadFactory,
                    new ThreadPoolExecutor.AbortPolicy());
            List<Future<Boolean>> futures = Lists.newArrayList();
            //根据子线程执行结果判断是否需要回滚
            BlockingDeque<Boolean> resultList = new LinkedBlockingDeque<>(nThreads);
            //给每个线程分配任务
            for (int i = 0; i < nThreads; i++) {
                int lastIndex = (i + 1) * perThreadHandleCount;
                List subList = tasksList.subList(i * perThreadHandleCount, lastIndex >= taskSize ? taskSize : lastIndex);
                //组装子任务参数
                Object[] params = new Object[args.length];
                for (int j = 0; j < args.length; j++) {
                    if (j == batchDealWithAnt.index()){
                        params[batchDealWithAnt.index()] = subList;
                        continue;
                    }
                    params[j] = args[j];
                }
                //创建子任务,丢给线程池处理
                BatchWithCallable threadHandleTaskCallable = new BatchWithCallable(mainLatch, threadLatch, rollBack,joinPoint,params,resultList);
                Future<Boolean> future = fixedThreadPool.submit(threadHandleTaskCallable);
                futures.add(future);
            }
            try {
                //等待所有子线程执行完毕
                boolean await = threadLatch.await(batchDealWithAnt.timeout(), TimeUnit.SECONDS);
                //如果超时,直接回滚
                if (!await) {
                    log.debug("==========批量处理线程执行超时");
                    rollBack.setRollBack(true);
                } else {
                    log.info("批量处理,线程执行完毕,共 {}个线程",nThreads);
                    //查看执行情况,如果有存在需要回滚的线程,则全部回滚
                    for (int i = 0; i < nThreads; i++) {
                        Boolean result = resultList.take();
                        log.debug("==========子线程返回结果result: "+result);
                        if (!result) {
                            /** 有线程执行异常,需要回滚子线程. */
                            rollBack.setRollBack(true);
                        }
                    }
                }
            } catch (InterruptedException e) {
                log.error("等待所有子线程执行完毕时,出现异常");
                rollBack.setRollBack(true);
                e.printStackTrace();
                throw new MsgException("等待所有子线程执行完毕时,出现异常,整体回滚");
            } finally {
                //子线程再次开始执行
                mainLatch.countDown();
                log.info("关闭线程池,释放资源");
                fixedThreadPool.shutdown();
            }
            if (rollBack.getRollBack()){
                throw new BaseException("有线程执行异常,主线程回滚");
            }
            log.info("======>批量处理成功");
        } else {
            log.error("批量处理:{}--{}参数异常,正常执行......",className,methodName);
            res = joinPoint.proceed(args);
        }
        return res;
    }


/**
 * @Description: 批量处理多线程实现
 * @Author: ChenGang
 * @Date: 2021/8/31 15:25
 **/
class BatchWithCallable implements Callable<Boolean> {

    /**
     * 主线程监控
     */
    private CountDownLatch mainLatch;
    /**
     * 子线程监控
     */
    private CountDownLatch threadLatch;
    /**
     * 是否回滚
     */
    private BatchWithRollBack rollBack;


    /**
     * 切点对象,通过此对象执行方法
     */
    private ProceedingJoinPoint joinPoint;


    /**
     * 方法执行参数
     */
    private Object[] args;

    /**
     *
     * 子线程执行结果判断
     **/
    private BlockingDeque<Boolean> resultList;



    public BatchWithCallable(CountDownLatch mainLatch, CountDownLatch threadLatch, BatchWithRollBack rollBack,ProceedingJoinPoint joinPoint,Object[] args,BlockingDeque<Boolean> resultList) {
        this.mainLatch = mainLatch;
        this.threadLatch = threadLatch;
        this.rollBack = rollBack;
        this.joinPoint = joinPoint;
        this.args = args;
        this.resultList = resultList;
    }

    @Override
    public Boolean call() throws Exception {
        return batchWithService.childThreadRun(joinPoint,args,threadLatch,mainLatch,rollBack,resultList);
    }
}

}

第二步:子任务方法实现(这里事务做了全局配置,所以没加事务注解!)

public interface BatchWithService {

    /**
     * @Description:
     * @Author: ChenGang
     * @Date: 2021/8/31 17:49
     * @param joinPoint 需要批量处理的方法切入对象
     * @param args 执行参数
     * @param threadLatch 子线程计数器
     * @param mainLatch 主线程计数器
     * @param resultList 执行结果
     * @param rollBack 回滚对象
     **/
    Boolean childThreadRun(ProceedingJoinPoint joinPoint, Object[] args, CountDownLatch threadLatch, CountDownLatch mainLatch, BatchWithRollBack rollBack, BlockingDeque<Boolean> resultList);
}

@Service("batchWithService")
@Slf4j
public class BatchWithServiceImpl implements BatchWithService{

    @Override
    public Boolean childThreadRun(ProceedingJoinPoint joinPoint, Object[] args, CountDownLatch threadLatch, CountDownLatch mainLatch, BatchWithRollBack rollBack, BlockingDeque<Boolean> resultList) {
        log.info("=====线程:"+Thread.currentThread().getName()+",开始执行====");
        long startTime = System.currentTimeMillis();
        Boolean result = false;
        try {
            joinPoint.proceed(args);
            result = true;
        } catch (Throwable throwable) {
            log.error("=====线程{}执行任务出现异常,等待主线程通知是否回滚,异常信息:{}",Thread.currentThread().getName(),throwable.getMessage());
            throwable.printStackTrace();
        }
        resultList.add(result);
        threadLatch.countDown();
        log.info("=====线程{},计算过程已经结束,等待主线程通知是否需要回滚====",Thread.currentThread().getName());
        long endTime = System.currentTimeMillis();
        long sec = (endTime - startTime)/1000;
        log.info("=====线程{},计算时间秒:{} ====",Thread.currentThread().getName(),sec);
        try {
            mainLatch.await();
            log.info("=====子线程{}再次启动",Thread.currentThread().getName());
        } catch (InterruptedException e) {
            throw new MsgException("批量处理,线程InterruptedException异常");
        }

        if (rollBack.getRollBack()) {
            log.info("=====线程{},线程回滚====",Thread.currentThread().getName());
            throw new MsgException("批量处理,线程回滚");
        }
        log.info("=====线程{},线程退出====",Thread.currentThread().getName());
        return result;
    }
}

第三步:业务层批量处理方法使用BatchDealWithAnt注解即可,根据实际情况传入参数,楼主经验有限,如有不对,欢迎指正!

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-09-03 11:46:23  更:2021-09-03 11:47:13 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:07:49-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码