业务流程长,通过异步操作提升返回时间 首先确定流程是可以异步的,在进行一些业务检查和补偿确定异步是成功的 或者记录好日志 也方便排查
首先创建一个线城池的管理service
package com.reformer.invoice.service;
public interface ThreadPoolManagers<T> {
void execute(T commonService, String message);
int getMaxThreadQueueSize();
int getCorePoolSize();
int getMaxPoolSize();
int getKeepAliveTime();
int getMaxCacheQueueSize();
boolean getOffline();
}
package com.reformer.invoice.service.impl;
import com.reformer.invoice.service.CommonService;
import com.reformer.invoice.service.ThreadPoolManagers;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class ThreadPoolManagersImpl<T> implements ThreadPoolManagers<T> {
private volatile LinkedBlockingQueue<Runnable> cacheLinkedQueue;
private final int MAX_THREAD_QUEUE_SIZE = 1000;
private final int CORE_POOL_SIZE = 20;
private final int MAX_POOL_SIZE = 40;
private final int KEEP_ALIVE_TIME = 0;
private final int MAX_CACHE_QUEUE_SIZE = 10000;
private volatile ThreadPoolExecutor threadPoolExecutor;
private volatile static boolean offline = false;
public ThreadPoolManagersImpl() {
getCacheQueue();
getExecutorService();
}
@Override
public void execute(T commonService, String message) {
int size = getExecutorService().getQueue().size();
log.info("线程池queue,队列size :" + size);
if (offline) {
addQueue((CommonService) commonService, message);
log.info("缓存,队列开始缓存,size:{} ", cacheLinkedQueue.size());
if (size == 0) {
log.info("开始恢复");
new OfflineResumeThread().start();
}
return;
}
if (size >= MAX_THREAD_QUEUE_SIZE) {
setOffline(true);
return;
}
getExecutorService().submit(addTask((CommonService) commonService, message));
}
private void addQueue(CommonService commonService, String message) {
getCacheQueue().add(addTask(commonService, message));
}
private Runnable addTask(CommonService commonService, String message) {
return new Runnable() {
@Override
public void run() {
commonService.process(message);
}
};
}
private ThreadPoolExecutor getExecutorService() {
if (threadPoolExecutor == null) {
synchronized (ThreadPoolManagersImpl.class) {
if (threadPoolExecutor == null) {
threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE
, MAX_POOL_SIZE
, KEEP_ALIVE_TIME
, TimeUnit.MILLISECONDS
, new LinkedBlockingQueue<>(MAX_THREAD_QUEUE_SIZE)
, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info("线程被拒绝掉,核心线程队列容量{}", executor.getQueue().size());
}
}
);
}
}
}
return threadPoolExecutor;
}
private LinkedBlockingQueue getCacheQueue() {
if (cacheLinkedQueue == null) {
synchronized (ThreadPoolManagersImpl.class) {
if (cacheLinkedQueue == null) {
cacheLinkedQueue = new LinkedBlockingQueue<>(MAX_CACHE_QUEUE_SIZE);
}
}
}
return cacheLinkedQueue;
}
class OfflineResumeThread extends Thread {
@Override
public void run() {
while (true) {
if (getExecutorService().getQueue().size() >= MAX_THREAD_QUEUE_SIZE) {
continue;
}
Runnable runnable = null;
try {
runnable = cacheLinkedQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("移除 queue,size :{}", cacheLinkedQueue.size());
if (runnable != null) {
getExecutorService().submit(runnable);
}
if (cacheLinkedQueue.size() == 0) {
setOffline(false);
log.info("恢复完毕");
return;
}
}
}
}
@Override
public int getMaxThreadQueueSize() {
return MAX_THREAD_QUEUE_SIZE;
}
@Override
public int getCorePoolSize() {
return CORE_POOL_SIZE;
}
@Override
public int getMaxPoolSize() {
return MAX_POOL_SIZE;
}
@Override
public int getKeepAliveTime() {
return KEEP_ALIVE_TIME;
}
@Override
public int getMaxCacheQueueSize() {
return MAX_CACHE_QUEUE_SIZE;
}
@Override
public boolean getOffline() {
return offline;
}
private void setOffline(boolean offline) {
ThreadPoolManagersImpl.offline = offline;
}
}
CommonService 是我自己的业务实现类 ,以上就是操作就可以使程序性能提升
|