Java多任务多线程,总线程数countDownLatch限制模板(附源码)
问题背景
最近开发的项目需要多任务并行运行,然后每个任务需要多线程运行,要求如下:
- 多任务并行,一个任务可设置线程数
- 限制整个项目开启任务的线程数500,大于500则等待线程执行完毕,再进行创建线程
- 大于500是等待线程结束,最多等待10分钟
注意事项: - 可以通过复制文章的代码自己创建工程,也可以下载源码进行参考
项目创建
1 引入pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.7</version>
<relativePath/>
</parent>
<groupId>com.yg</groupId>
<artifactId>taskFrame</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>taskFrame</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
2 启动类
package com.yg.taskframe;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TaskFrameApplication {
public static void main(String[] args) {
SpringApplication.run(TaskFrameApplication.class, args);
}
}
3 Runnable线程类
package com.yg.taskframe.core;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
@Slf4j
public class CreateRunnable implements Runnable {
private CountDownLatch countDownLatch;
public CreateRunnable(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("CreateRunnable error: ", e);
}
countDownLatch.countDown();
}
}
4 线程池管理服务类
package com.yg.taskframe.core;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.CountDownLatch;
@Slf4j
public class AsynThreadService {
public ThreadPoolTaskExecutor asyncServiceExecutor;
public CountDownLatch countDownLatch;
public AsynThreadService(ThreadPoolTaskExecutor asyncServiceExecutor, CountDownLatch countDownLatch) {
this.asyncServiceExecutor = asyncServiceExecutor;
this.countDownLatch = countDownLatch;
}
public void waitComplete() {
try {
log.info("countDownLatch remain {}", this.countDownLatch.getCount());
this.countDownLatch.await();
log.info("Close asynThreadService");
asyncServiceExecutor.shutdown();
} catch (Exception e) {
log.error("AsyncServiceExecutor shutdown error: ", e);
}
}
}
5 线程池和 countDownLatch 创建类
package com.yg.taskframe.core;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class AsynThreadPool {
private static AtomicInteger curCount = new AtomicInteger(0);
private static Integer maxCount = 500;
public static AsynThreadService createCountDownLatch(Long taskId, int threads) throws InterruptedException {
CountDownLatch = new CountDownLatch(threads);
ThreadPoolTaskExecutor asyncServiceExecutor = creatThreadPool(taskId, threads);
if (curCount.get() >= maxCount) {
int n = 10;
while (n-- > 0) {
if (curCount.get() >= maxCount) {
log.info("CountDownLatch count more than 500, loading over for other threads");
TimeUnit.SECONDS.sleep(60);
} else {
n = 0;
}
if (n == 0) {
log.warn("Part of asynThreadService aren't finish");
}
}
}
curCount.addAndGet(threads);
AsynThreadService asynThreadService = new AsynThreadService(asyncServiceExecutor, countDownLatch);
return asynThreadService;
}
public static void free(int count) {
log.info("CountDownLatch : {}, count: {}", curCount.get(), count);
curCount.addAndGet(-count);
log.info("CountDownLatch remain: {}", curCount.get());
}
public static ThreadPoolTaskExecutor creatThreadPool(Long taskId, int threads) {
ThreadPoolTaskExecutor asyncServiceExecutor = null;
try {
asyncServiceExecutor = new ThreadPoolTaskExecutor();
asyncServiceExecutor.setCorePoolSize(threads);
asyncServiceExecutor.setMaxPoolSize(threads + 1);
asyncServiceExecutor.setQueueCapacity(2000);
asyncServiceExecutor.setThreadNamePrefix("TaskId" + taskId.toString() + "-Thread-");
asyncServiceExecutor.setBeanName("TaskId" + taskId);
asyncServiceExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
asyncServiceExecutor.setWaitForTasksToCompleteOnShutdown(true);
asyncServiceExecutor.setAwaitTerminationSeconds(30);
asyncServiceExecutor.initialize();
} catch (Exception e) {
log.error("Create ThreadPoolTaskExecutor failed", e);
}
return asyncServiceExecutor;
}
}
6 任务创建类
package com.yg.taskframe.core;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@AllArgsConstructor
@NoArgsConstructor
public class ParallelTask {
private int threads;
private String url;
public ParallelTask nextTask;
public ParallelTask prevTask;
public ParallelTask(int threads) {
this.threads = threads;
}
public AsynThreadService startTask(Long taskId) throws InterruptedException {
AsynThreadService asynThreadService = AsynThreadPool.createCountDownLatch(taskId, threads);
return asynThreadService;
}
}
7 启动一个任务入口类
package com.yg.taskframe.service;
import com.yg.taskframe.core.AsynThreadPool;
import com.yg.taskframe.core.AsynThreadService;
import com.yg.taskframe.core.ParallelTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
@Slf4j
@Service
public class TaskService {
public void startTask(Long taskId, int threads) throws InterruptedException {
log.info("Begin task");
if (threads == 0) {
threads = 1;
}
log.info("threads: {}", threads);
ParallelTask parallelTask = new ParallelTask(threads);
AsynThreadService asynThreadService = parallelTask.startTask(taskId);
List<String> params = new ArrayList<>();
params.add("1");
submit(params, asynThreadService);
int countDownLatchCount = (int) asynThreadService.countDownLatch.getCount();
while (countDownLatchCount-- > 0) {
asynThreadService.countDownLatch.countDown();
}
asynThreadService.waitComplete();
AsynThreadPool.free(threads);
}
public void submit(List<String> params, AsynThreadService asynThreadService) {
log.info("Begin 0004 thread");
Future<String> futureResult = asynThreadService.asyncServiceExecutor.submit(() ->
submitSingle(params, asynThreadService.countDownLatch)
);
}
public String submitSingle(List<String> params, CountDownLatch countDownLatch) {
log.info("Submit single thread");
try {
log.info("params size: {}", params.size());
long start = System.currentTimeMillis();
String resultVOlist = doQuery(params);
log.info("resultVOlist:{}", resultVOlist);
return resultVOlist;
} catch (Exception e) {
log.error("dataxQueryController not found error", e);
return "dataxQueryController not found error";
} finally {
log.info("Close one countDownLatch");
countDownLatch.countDown();
}
}
public String doQuery(List<String> params) {
if (params.size() < 10) {
return "success";
}
return "fail";
}
}
8 使用接口添加任务,通过需要设置每个线程池的线程数和任务ID
package com.yg.taskframe.controller;
import com.yg.taskframe.service.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class TaskController {
@Autowired
TaskService taskService;
@PostMapping("/task")
public String addTask(@RequestParam Long taskId, @RequestParam int threads) {
try {
taskService.startTask(taskId, threads);
return "success";
} catch (Exception e) {
log.error("Exception", e);
return "fail";
}
}
}
9 整个项目目录
总结
- 通过模板可以进行多任务并行运行,并且可以控制总线程数
作为程序员第 122 篇文章,每次写一句歌词记录一下,看看人生有几首歌的时间,wahahaha …
Lyric: 在月光下一直找寻
|