原文地址:https://blog.xaoxu.cn/archives/thread-pool-tool-package
为什么需要一个线程池工具类?
整个项目,用到线程执行任务的地方很多,不可能哪里用到就在那里直接 new 一个线程执行,这样资源得不到重复利用,一旦线程过多就会导致内存不足。
线程池的好处是什么?
使用线程池执行线程任务,当一个线程执行完成一个任务之后,线程资源回到线程池,资源得到重复利用。
线程池为什么使用自定义方式?
因为 java 自带线程池都会有可能造成内存不足的问题。自定义线程池,根据服务器配置定制线程池核心线程、最大线程等,是最好的方式。
我封装的线程池工具类有什么好处?
- 扩展性高
- 可注解形式实现执行
- 可根据业务需要注册不同的线程池,区分业务模块使用
- 可以执行无返回值线程任务,可以执行有返回值的线程任务。
代码实现
创建一个线程任务类
该类主要用来承接 Runnable 方法,和其他业务相关需要的参数。
package com.scaffolding.example.threads;
import com.scaffolding.example.threads.aop.Pooled;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
/**
* 需要执行的线程任务
*
* @author XiaoXuxuy
* @date 2022/2/20 14:32
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Worker<T> implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(Worker.class);
// 默认超时时间
private static final long DEFAULT_TIMEOUT = 500;
// 执行指令
private Runnable command;
// 返回结果
private Result<T> result = new Result<>();
// 超时
private long timeout;
// 策略
private Pooled.PoolOverAct poolOverAct = Pooled.PoolOverAct.REJECT;
// 预备执行时间
private volatile long prepareExecutionTime;
// 开始执行时间
private volatile long startExecutionTime;
// 结束执行时间
private volatile long endExecutionTime;
// 执行的线程池名称
private String executorName;
public Worker(Runnable command) {
this.command = command;
this.timeout = DEFAULT_TIMEOUT;
}
public Worker(Runnable command, Pooled.PoolOverAct poolOverAct) {
this.command = command;
this.timeout = DEFAULT_TIMEOUT;
this.poolOverAct = poolOverAct;
}
public Worker(Runnable command, T result) {
this.command = command;
this.result = new Result<>(result);
this.timeout = DEFAULT_TIMEOUT;
}
public Worker(Runnable command, T result, long timeout) {
this.command = command;
this.result = new Result<>(result);
this.timeout = timeout;
}
@Override
public void run() {
startExecution();
try {
command.run();
} finally {
endExecution();
}
}
/**
* 开始执行(预备执行耗时)
*/
private void startExecution() {
this.startExecutionTime = System.currentTimeMillis();
LOGGER.info("POOL_DISPATCH_TIME, EXECUTOR: {}, TIME: {} ms", this.executorName, this.getPrepareTime());
}
/**
* 结束执行(执行耗时)
*/
private void endExecution() {
this.endExecutionTime = System.currentTimeMillis();
LOGGER.info("POOL_EXECUTE_TIME, EXECUTOR: {}, TIME: {} ms", this.executorName, this.getExecutionTime());
}
/**
* 预备耗时
*
* @return
*/
public long getPrepareTime() {
return this.startExecutionTime - this.prepareExecutionTime;
}
/**
* 执行耗时
*
* @return
*/
public long getExecutionTime() {
return this.endExecutionTime - this.startExecutionTime;
}
/**
* callable执行线程
*
* @return
*/
public Callable<Result<T>> callable() {
return Executors.callable(command, result);
}
public void setResult(T result) {
if (null != this.result) {
this.result.value = result;
}
}
}
定义一个泛型结果类
承接具有执行结果的线程任务
package com.scaffolding.example.threads;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 线程结果
*
* @author XiaoXuxuy
* @date 2022/2/20 14:33
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Result<T> {
public T value;
}
定义线程池
package com.scaffolding.example.threads;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Data
public class TaskToolExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskToolExecutor.class);
private static final int DEFAULT_CORE_SIZE = 20;
private static final int DEFAULT_MAX_SIZE = 50;
private static final long DEFAULT_ALIVE_TIME = 60;
private static final int DEFAULT_QUEUE_SIZE = 1024;
private ExecutorService pool;
private ThreadFactory threadFactory;
private BlockingQueue<Runnable> workQueue;
private int coreSize;
private int maxSize;
private long aliveTime;
private int queueSize;
private String name;
public void init() {
if (null == pool) {
if (null == workQueue) {
queueSize = queueSize > 0 ? queueSize : DEFAULT_QUEUE_SIZE;
workQueue = new LinkedBlockingQueue<>(queueSize);
}
if (null == threadFactory) {
threadFactory = TaskToolExecutor.defaultThreadFactory();
}
coreSize = coreSize > 0 ? coreSize : DEFAULT_CORE_SIZE;
maxSize = maxSize > 0 ? maxSize : DEFAULT_MAX_SIZE;
aliveTime = aliveTime > 0 ? aliveTime : DEFAULT_ALIVE_TIME;
pool = new ThreadPoolExecutor(coreSize, maxSize, aliveTime, TimeUnit.SECONDS, workQueue, threadFactory);
}
}
public void destroy() {
this.pool.shutdown();
}
public void execute(Worker<?> worker) {
try {
worker.setExecutorName(this.name);
worker.setPrepareExecutionTime(System.currentTimeMillis());
pool.execute(worker);
} catch (RejectedExecutionException e) {
dealWhenPoolFull(worker, e);
}
}
public <T> T submit(Worker<T> worker) {
try {
Future<Result<T>> future = pool.submit(worker.callable());
Result<T> result = future.get(worker.getTimeout(), TimeUnit.MILLISECONDS);
return result.value;
} catch (RejectedExecutionException e) {
LOGGER.error("Rejected worker: Perhaps thread pool is full!", e);
} catch (InterruptedException e) {
LOGGER.error("Interrupted worker:", e);
} catch (ExecutionException e) {
LOGGER.error("Attempting to retrieve the result of a task that aborted!", e);
} catch (TimeoutException e) {
LOGGER.error("Timeout worker: get result timeout", e);
}
return worker.getResult().value;
}
private void dealWhenPoolFull(Worker<?> worker, RejectedExecutionException e) {
switch (worker.getPoolOverAct()) {
case REJECT:
LOGGER.error("Rejected worker: Perhaps thread pool is full!", e);
break;
case RUN:
worker.run();
break;
case BLOCK:
try {
workQueue.put(worker);
} catch (InterruptedException interruptedException) {
LOGGER.error("queue put worker: Perhaps block queue is full!", e);
}
break;
case NEW_THREAD:
Thread newThreadOutOfPool = threadFactory.newThread(worker);
newThreadOutOfPool.setName("outOfPool-" + newThreadOutOfPool.getName());
newThreadOutOfPool.start();
break;
default:
LOGGER.error("Rejected worker: Perhaps thread pool is full!", e);
break;
}
}
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "taskTool-" + poolNumber.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);
if (thread.isDaemon())
thread.setDaemon(false);
if (thread.getPriority() != Thread.NORM_PRIORITY)
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
}
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
public void setQueueSize(int queueSize) {
if (queueSize <= 0) {
this.queueSize = DEFAULT_QUEUE_SIZE;
} else {
this.queueSize = queueSize;
}
}
}
项目启动初始化不同业务线程池
package com.scaffolding.example.threads;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ExecutorPoolConfig {
@Bean(initMethod = "init", destroyMethod = "destroy")
public TaskToolExecutor ciToolExecutor() {
TaskToolExecutor ciToolExecutor = new TaskToolExecutor();
ciToolExecutor.setName("ciToolExecutor");
ciToolExecutor.setCoreSize(15);
ciToolExecutor.setMaxSize(32);
ciToolExecutor.setQueueSize(1024);
return ciToolExecutor;
}
@Bean(initMethod = "init", destroyMethod = "destroy")
public TaskToolExecutor msgExecutor() {
TaskToolExecutor msgExecutor = new TaskToolExecutor();
msgExecutor.setName("msgExecutor");
msgExecutor.setCoreSize(15);
msgExecutor.setMaxSize(32);
msgExecutor.setQueueSize(1024);
return msgExecutor;
}
}
测试
测试类
我们建一个测试类,测试无结果线程执行和有结果线程执行。
package com.scaffolding.example.threads;
import com.scaffolding.example.threads.aop.Pooled;
import junit.framework.TestCase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
/**
* @author Admin
* @date 2022/2/20 15:16
*/
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableAspectJAutoProxy(exposeProxy = true)
public class TaskToolExecutorTest extends TestCase {
@Autowired
private Map<String, TaskToolExecutor> executorMap;
/**
* 测试无返回结果线程
*/
@Test
public void testDispatchTask() {
TaskToolExecutor executor = executorMap.get("ciToolExecutor");
final CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
Worker<Object> worker = createTestWorker(i, countDownLatch);
executor.execute(worker);
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有线程执行完毕!");
}
private Worker<Object> createTestWorker(int i, CountDownLatch countDownLatch) {
return new Worker<>(new Thread(() -> {
System.out.println("我是任务" + i);
countDownLatch.countDown();
}), Pooled.PoolOverAct.NEW_THREAD);
}
/**
* 测试有返回结果线程
*/
@Test
public void testHasResultTask() {
TaskToolExecutor executor = executorMap.get("ciToolExecutor");
final CountDownLatch countDownLatch = new CountDownLatch(1);
Worker<Object> worker = createHasResultTestWorker(countDownLatch);
Object result = executor.submit(worker);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程执行返回结果为: " + result);
}
private Worker<Object> createHasResultTestWorker(CountDownLatch countDownLatch) {
Worker<Object> worker = new Worker<>();
Runnable runnable = () -> {
int count = 1;
// 设置返回值
worker.setResult(count);
countDownLatch.countDown();
};
worker.setTimeout(500);
worker.setCommand(runnable);
return worker;
}
}
测试结果
无返回值情况:
2022-02-20 17:52:43 INFO POOL_DISPATCH_TIME, EXECUTOR: ciToolExecutor, TIME: 1 ms
2022-02-20 17:52:43 INFO POOL_DISPATCH_TIME, EXECUTOR: ciToolExecutor, TIME: 0 ms
2022-02-20 17:52:43 INFO POOL_DISPATCH_TIME, EXECUTOR: ciToolExecutor, TIME: 1 ms
我是任务1
我是任务0
我是任务2
2022-02-20 17:52:43 INFO POOL_EXECUTE_TIME, EXECUTOR: ciToolExecutor, TIME: 0 ms
2022-02-20 17:52:43 INFO POOL_EXECUTE_TIME, EXECUTOR: ciToolExecutor, TIME: 0 ms
所有线程执行完毕!
2022-02-20 17:52:43 INFO POOL_EXECUTE_TIME, EXECUTOR: ciToolExecutor, TIME: 0 ms
有返回值情况:
线程执行返回结果为: 1
注解方式实现
定义一个注解
package com.scaffolding.example.threads.aop;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Target({METHOD})
@Retention(RUNTIME)
public @interface Pooled {
boolean async() default true;
long timeout() default 500;
String executor() default "ciToolExecutor";
PoolOverAct poolOverAct() default PoolOverAct.REJECT;
enum PoolOverAct {
REJECT, BLOCK, RUN, NEW_THREAD;
}
}
定义一个Aop切面
package com.scaffolding.example.threads.aop;
import com.scaffolding.example.threads.TaskToolExecutor;
import com.scaffolding.example.threads.Worker;
import com.scaffolding.example.utils.PrimitiveUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author XiaoXuxuy
* @date 2022/2/20 15:35
*/
@Aspect
@Component
@Order(AopOrder.POOLED_INVOKER_ORDER)
public class PooledInvoker {
private static final Logger LOGGER = LoggerFactory.getLogger(PooledInvoker.class);
@Autowired
private Map<String, TaskToolExecutor> executorMap;
@Around("@annotation(com.scaffolding.example.threads.aop.Pooled) && @annotation(pooled)")
public Object around(final ProceedingJoinPoint pjp, Pooled pooled) {
TaskToolExecutor executor = getExecutor(pooled.executor());
Object result = null;
Worker<Object> worker = toWorker(pjp);
worker.setPoolOverAct(pooled.poolOverAct());
if (pooled.async()) {
executor.execute(worker);
} else {
worker.setTimeout(pooled.timeout());
result = executor.submit(worker);
}
if (null == result) {
Class<?> returnType = ((MethodSignature) pjp.getSignature()).getMethod().getReturnType();
if (returnType.isPrimitive()) {
return PrimitiveUtils.getPrimitiveDefaultValue(returnType);
}
}
return result;
}
private Worker<Object> toWorker(ProceedingJoinPoint pjp) {
final Worker<Object> worker = new Worker<>();
Runnable command = () -> {
try {
worker.setResult(pjp.proceed());
} catch (Throwable e) {
LOGGER.error("Error pooled execute:", e);
}
};
worker.setCommand(command);
return worker;
}
private TaskToolExecutor getExecutor(String poolType) {
return executorMap.get(poolType);
}
}
使用方式
只要在方法加入 @Pooled 注解即可利用线程池执行该方法。
@Pooled(executor = "msgExecutor", poolOverAct = Pooled.PoolOverAct.NEW_THREAD)
public void notifyEmail() {
System.out.println("我是通知线程");
}
附:Aop的拦截顺序定义
package com.scaffolding.example.threads.aop;
import org.springframework.core.Ordered;
/**
* Aop的拦截顺序定义
* POOLED_INVOKER_ORDER > DISTRIBUTED_LOCK_ORDER > 日志 > 事务
*
* @author XiaoXuxuy
* @date 2022/2/20 15:38
*/
public class AopOrder {
public static final int DISTRIBUTED_LOCK_ORDER = Ordered.HIGHEST_PRECEDENCE + 4;
public static final int POOLED_INVOKER_ORDER = Ordered.HIGHEST_PRECEDENCE + 2;
}
附:Class原始类型工具类
package com.scaffolding.example.utils;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class PrimitiveUtils {
private static final ConcurrentMap<String, Class<?>> NAME_CLASS_CACHE = new ConcurrentHashMap<String, Class<?>>();
public static final char JVM_VOID = 'V';
public static final char JVM_BOOLEAN = 'Z';
public static final char JVM_BYTE = 'B';
public static final char JVM_CHAR = 'C';
public static final char JVM_DOUBLE = 'D';
public static final char JVM_FLOAT = 'F';
public static final char JVM_INT = 'I';
public static final char JVM_LONG = 'J';
public static final char JVM_SHORT = 'S';
public static boolean isPrimitives(Class<?> cls) {
if (cls.isArray()) {
return isPrimitive(cls.getComponentType());
}
return isPrimitive(cls);
}
public static boolean isPrimitive(Class<?> cls) {
return cls.isPrimitive() || cls == String.class || cls == Boolean.class || cls == Character.class
|| Number.class.isAssignableFrom(cls) || Date.class.isAssignableFrom(cls);
}
public static boolean isPojo(Class<?> cls) {
return ! isPrimitives(cls)
&& ! Collection.class.isAssignableFrom(cls)
&& ! Map.class.isAssignableFrom(cls);
}
public static Class<?> name2class(String name) throws ClassNotFoundException
{
return name2class(getClassLoader(), name);
}
private static Class<?> name2class(ClassLoader cl, String name) throws ClassNotFoundException
{
int c = 0, index = name.indexOf('[');
if( index > 0 )
{
c = ( name.length() - index ) / 2;
name = name.substring(0, index);
}
if( c > 0 )
{
StringBuilder sb = new StringBuilder();
while( c-- > 0 )
sb.append("[");
if( "void".equals(name) ) sb.append(JVM_VOID);
else if( "boolean".equals(name) ) sb.append(JVM_BOOLEAN);
else if( "byte".equals(name) ) sb.append(JVM_BYTE);
else if( "char".equals(name) ) sb.append(JVM_CHAR);
else if( "double".equals(name) ) sb.append(JVM_DOUBLE);
else if( "float".equals(name) ) sb.append(JVM_FLOAT);
else if( "int".equals(name) ) sb.append(JVM_INT);
else if( "long".equals(name) ) sb.append(JVM_LONG);
else if( "short".equals(name) ) sb.append(JVM_SHORT);
else sb.append('L').append(name).append(';');
name = sb.toString();
} else {
if( "void".equals(name) ) return void.class;
else if( "boolean".equals(name) ) return boolean.class;
else if( "byte".equals(name) ) return byte.class;
else if( "char".equals(name) ) return char.class;
else if( "double".equals(name) ) return double.class;
else if( "float".equals(name) ) return float.class;
else if( "int".equals(name) ) return int.class;
else if( "long".equals(name) ) return long.class;
else if( "short".equals(name) ) return short.class;
}
if( cl == null ){
cl = getClassLoader();
}
Class<?> clazz = NAME_CLASS_CACHE.get(name);
if(clazz == null){
clazz = Class.forName(name, true, cl);
NAME_CLASS_CACHE.put(name, clazz);
}
return clazz;
}
public static ClassLoader getClassLoader(){
return getClassLoader(PrimitiveUtils.class);
}
public static ClassLoader getClassLoader(Class<?> cls) {
ClassLoader cl = null;
try {
cl = Thread.currentThread().getContextClassLoader();
} catch (Throwable ex) {
}
if (cl == null) {
cl = cls.getClassLoader();
}
return cl;
}
private static final Map<String, PrimitiveInfo<?>> PRIMITIVES = new HashMap<String, PrimitiveInfo<?>>();
static {
addPrimitive(boolean.class, "Z", Boolean.class, "booleanValue", false);
addPrimitive(short.class, "S", Short.class, "shortValue", (short) 0);
addPrimitive(int.class, "I", Integer.class, "intValue", 0);
addPrimitive(long.class, "J", Long.class, "longValue", 0L);
addPrimitive(float.class, "F", Float.class, "floatValue", 0F);
addPrimitive(double.class, "D", Double.class, "doubleValue", 0D);
addPrimitive(char.class, "C", Character.class, "charValue", '\0');
addPrimitive(byte.class, "B", Byte.class, "byteValue", (byte) 0);
addPrimitive(void.class, "V", Void.class, null, null);
}
private static <T> void addPrimitive(Class<T> type, String typeCode, Class<T> wrapperType, String unwrapMethod,
T defaultValue) {
PrimitiveInfo<T> info = new PrimitiveInfo<T>(type, typeCode, wrapperType, unwrapMethod, defaultValue);
PRIMITIVES.put(type.getName(), info);
PRIMITIVES.put(wrapperType.getName(), info);
}
@SuppressWarnings("unused")
private static class PrimitiveInfo<T> {
final Class<T> type;
final String typeCode;
final Class<T> wrapperType;
final String unwrapMethod;
final T defaultValue;
public PrimitiveInfo(Class<T> type, String typeCode, Class<T> wrapperType, String unwrapMethod, T defaultValue) {
this.type = type;
this.typeCode = typeCode;
this.wrapperType = wrapperType;
this.unwrapMethod = unwrapMethod;
this.defaultValue = defaultValue;
}
}
@SuppressWarnings("unchecked")
public static <T> Class<T> getWrapperTypeIfPrimitive(Class<T> type) {
if (type.isPrimitive()) {
return ((PrimitiveInfo<T>) PRIMITIVES.get(type.getName())).wrapperType;
}
return type;
}
@SuppressWarnings("unchecked")
public static <T> T getPrimitiveDefaultValue(Class<T> type) {
PrimitiveInfo<T> info = (PrimitiveInfo<T>) PRIMITIVES.get(type.getName());
if (info != null) {
return info.defaultValue;
}
return null;
}
}
|