1.线程池配置
@Configuration
public class ThreadPoolConfig {
/**
* cpu内核 暂时默认8核
*/
private static final int CORE_SIZE = 8;
/**
* 核心线程数 暂定为I/O密集型
*/
private static final int CORE_POOL_SIZE = 2 * CORE_SIZE + 1;
/**
* 最大线程数
*/
private static final int MAX_POOL_SIZE = 4 * CORE_SIZE + 1;
/**
* 线程队列容量
*/
private static final int QUEUE_CAPACITY = 1000;
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
//看情况决定是否自定义线程池
CustomThreadPoolExecutor threadPoolTaskExecutor = new CustomThreadPoolExecutor();
threadPoolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
threadPoolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
threadPoolTaskExecutor.setQueueCapacity(QUEUE_CAPACITY);
threadPoolTaskExecutor.setThreadNamePrefix("shiny-thread-");
//交由调用方线程运行
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.setDaemon(true);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
2.使用线程池
@Service
@Slf4j
public class IndexServiceImpl implements IndexService {
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
/**
* @return void
* @desc 循环执行任务
* @author dengkongze
* @date 2022/3/30 16:09
*/
@Override
public void cycle() {
log.info("开始执行任务!");
for (int i = 0; i < 20; i++) {
threadPoolTaskExecutor.execute(() -> {
try {
Thread.sleep(5000);
//执行任务
log.info("执行完毕!");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
3.调用方法,打印日志(tips:已配置打印REQUEST_ID) 通过观察日志,不难看出: a.线程池已配置成功(最后3个线程在16:54:11前还在队列中); b.新开线程的数据未透传(REQUEST_ID打印为空),接下来处理这部分问题。
4.定义数据透传处理器父接口
public interface ThreadLocalFlowHandler {
/**
* 放入threadLocal
* @param obj
*/
void put(Object obj);
/**
* 取出数据
* @return
*/
Object get();
/**
* 清除数据
*/
void clear();
}
5.实现该处理器 - MDC日志数据透传
public class MdcInfoFlowHandler implements ThreadLocalFlowHandler {
@Override
public void put(Object obj) {
// MDC 透传
if (!ObjectUtils.isEmpty(obj)) {
MDC.setContextMap((Map<String, String>) obj);
}
}
@Override
public Object get() {
return MDC.getCopyOfContextMap();
}
@Override
public void clear() {
MDC.clear();
}
}
6.自定义线程池
@Slf4j
public class CustomThreadPoolExecutor extends ThreadPoolTaskExecutor {
private static final List<ThreadLocalFlowHandler> threadLocalFlowHandlers = new ArrayList<>();
static {
threadLocalFlowHandlers.add(new MdcInfoFlowHandler());
}
@Override
public void execute(Runnable task) {
Map<ThreadLocalFlowHandler, Object> parentThreadLocal = getParentThreadLocal();
super.execute(() -> agent(task, parentThreadLocal));
}
@Override
public Future<?> submit(Runnable task) {
Map<ThreadLocalFlowHandler, Object> parentThreadLocal = getParentThreadLocal();
return super.submit(() -> agent(task, parentThreadLocal));
}
private Map<ThreadLocalFlowHandler, Object> getParentThreadLocal() {
Map<ThreadLocalFlowHandler, Object> parentThreadLocal = new HashMap<>();
threadLocalFlowHandlers.forEach(handler -> parentThreadLocal.put(handler, handler.get()));
return parentThreadLocal;
}
/**
* 透传ThreadLocal即可
*
* @param task
* @param parentThreadLocal
*/
private void agent(Runnable task, Map<ThreadLocalFlowHandler, Object> parentThreadLocal) {
try {
threadLocalFlowHandlers.forEach(handler -> {
if (parentThreadLocal.containsKey(handler)) {
Object obj = parentThreadLocal.get(handler);
handler.put(obj);
}
});
task.run();
} catch (Throwable e) {
log.error("thread error", e);
throw e;
} finally {
//清空
threadLocalFlowHandlers.forEach(ThreadLocalFlowHandler::clear);
}
}
}
7.再次调用方法,打印日志
8.github源码:https://github.com/shiny1day/thread-pool-demo.git
|