IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 通过队列和线程池提升任务的执行效率 -> 正文阅读

[Java知识库]通过队列和线程池提升任务的执行效率

业务流程长,通过异步操作提升返回时间
首先确定流程是可以异步的,在进行一些业务检查和补偿确定异步是成功的
或者记录好日志 也方便排查

首先创建一个线城池的管理service

package com.reformer.invoice.service;

public interface ThreadPoolManagers<T> {

    /**
     * 任务提交执行
     * @param commonService
     * @param message
     */
    void execute(T commonService, String message);
    /**
     * 最大队列容量
     */
    int getMaxThreadQueueSize();
    /**
     * 核心容量
     */
    int getCorePoolSize();
    /**
     * 最大容量
     */
    int getMaxPoolSize();
    /**
     * 存活时间
     */
    int getKeepAliveTime();
    /**
     * 最大缓存队列容量
     */
    int getMaxCacheQueueSize();
    /**
     * 标识
     */
    boolean getOffline();

}

package com.reformer.invoice.service.impl;

import com.reformer.invoice.service.CommonService;
import com.reformer.invoice.service.ThreadPoolManagers;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;



@Slf4j
@Service
public class ThreadPoolManagersImpl<T> implements ThreadPoolManagers<T> {

    /**
     * 缓存队列
     */
    private volatile LinkedBlockingQueue<Runnable> cacheLinkedQueue;
    /**
     * 最大队列容量
     */
    private final int MAX_THREAD_QUEUE_SIZE = 1000;
    /**
     * 核心容量
     */
    private final int CORE_POOL_SIZE = 20;
    /**
     * 最大容量
     */
    private final int MAX_POOL_SIZE = 40;
    /**
     * 存活时间
     */
    private final int KEEP_ALIVE_TIME = 0;
    /**
     * 最大缓存队列容量
     */
    private final int MAX_CACHE_QUEUE_SIZE = 10000;
    /**
     * 线程池
     */
    private volatile ThreadPoolExecutor threadPoolExecutor;
    /**
     * 标识
     */
    private volatile static boolean offline = false;


    public ThreadPoolManagersImpl() {
        getCacheQueue();
        getExecutorService();
    }

    @Override
    public void execute(T commonService, String message) {
        int size = getExecutorService().getQueue().size();
        log.info("线程池queue,队列size :" + size);
        if (offline) {
            addQueue((CommonService) commonService, message);
            log.info("缓存,队列开始缓存,size:{} ", cacheLinkedQueue.size());
            if (size == 0) {
                log.info("开始恢复");
                new OfflineResumeThread().start();
            }
            return;
        }
        if (size >= MAX_THREAD_QUEUE_SIZE) {
            setOffline(true);
            return;
        }
        getExecutorService().submit(addTask((CommonService) commonService, message));
    }

    private void addQueue(CommonService commonService, String message) {

        getCacheQueue().add(addTask(commonService, message));

    }

    /**
     * 生成任务
     *
     * @param commonService
     * @param message
     * @return
     */
    private Runnable addTask(CommonService commonService, String message) {
        return new Runnable() {
            @Override
            public void run() {
                commonService.process(message);
            }
        };
    }


    private ThreadPoolExecutor getExecutorService() {
        if (threadPoolExecutor == null) {
            synchronized (ThreadPoolManagersImpl.class) {
                if (threadPoolExecutor == null) {
                    threadPoolExecutor = new ThreadPoolExecutor(
                            CORE_POOL_SIZE
                            , MAX_POOL_SIZE
                            , KEEP_ALIVE_TIME
                            , TimeUnit.MILLISECONDS
                            , new LinkedBlockingQueue<>(MAX_THREAD_QUEUE_SIZE)
                            , new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            log.info("线程被拒绝掉,核心线程队列容量{}", executor.getQueue().size());
                            // 可以做一些补偿机制
                        }
                    }
                    );
                }
            }
        }
        return threadPoolExecutor;
    }

    private LinkedBlockingQueue getCacheQueue() {
        if (cacheLinkedQueue == null) {
            synchronized (ThreadPoolManagersImpl.class) {
                if (cacheLinkedQueue == null) {
                    cacheLinkedQueue = new LinkedBlockingQueue<>(MAX_CACHE_QUEUE_SIZE);
                }
            }
        }
        return cacheLinkedQueue;
    }


    class OfflineResumeThread extends Thread {
        @Override
        public void run() {
            while (true) {
                if (getExecutorService().getQueue().size() >= MAX_THREAD_QUEUE_SIZE) {
                    continue;
                }
                Runnable runnable = null;
                try {
                    runnable = cacheLinkedQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("移除 queue,size :{}", cacheLinkedQueue.size());
                if (runnable != null) {
                    getExecutorService().submit(runnable);
                }
                if (cacheLinkedQueue.size() == 0) {
                    setOffline(false);
                    log.info("恢复完毕");
                    return;
                }
            }
        }
    }


    @Override
    public int getMaxThreadQueueSize() {
        return MAX_THREAD_QUEUE_SIZE;
    }

    @Override
    public int getCorePoolSize() {
        return CORE_POOL_SIZE;
    }

    @Override
    public int getMaxPoolSize() {
        return MAX_POOL_SIZE;
    }

    @Override
    public int getKeepAliveTime() {
        return KEEP_ALIVE_TIME;
    }

    @Override
    public int getMaxCacheQueueSize() {
        return MAX_CACHE_QUEUE_SIZE;
    }

    @Override
    public boolean getOffline() {
        return offline;
    }

    private void setOffline(boolean offline) {
        ThreadPoolManagersImpl.offline = offline;
    }
}

CommonService 是我自己的业务实现类 ,以上就是操作就可以使程序性能提升

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-12 17:18:07  更:2022-03-12 17:21:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 8:34:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码