(1)首先,创建一个队列任务
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
public class QueueTask {
private static final Logger log = LoggerFactory.getLogger(QueueTask.class);
private final LinkedBlockingQueue<TaskHandler> tasks = new LinkedBlockingQueue<TaskHandler>(10);
private ExecutorService service = Executors.newSingleThreadExecutor();
private volatile boolean running = true;
private Future<?> serviceThreadStatus = null;
@PostConstruct
public void init() {
serviceThreadStatus = service.submit(new Thread(() -> {
while (running) {
try {
TaskHandler task = tasks.take();
try {
task.processTask();
} catch (Exception e) {
log.error("任务处理发生错误", e);
}
} catch (InterruptedException e) {
log.error("服务停止,退出", e);
running = false;
}
}
}));
}
public boolean addData(TaskHandler dataHandler) {
if (!running) {
log.warn("service is stop");
return false;
}
boolean success = tasks.offer(dataHandler);
if (!success) {
log.warn("添加任务到队列失败");
}
return success;
}
public boolean isEmpty() {
return tasks.isEmpty();
}
public boolean checkServiceRun() {
return running && !service.isShutdown() && !serviceThreadStatus.isDone();
}
public void activeService() {
running = true;
if (service.isShutdown()) {
service = Executors.newSingleThreadExecutor();
init();
log.info("线程池关闭,重新初始化线程池及任务");
}
if (serviceThreadStatus.isDone()) {
init();
log.info("线程池任务结束,重新初始化任务");
}
}
@PreDestroy
public void destroy() {
running = false;
service.shutdownNow();
}
}
(2)其次,抽象出一个任务处理器的接口
public interface TaskHandler {
void processTask();
}
(3)接着,补充接口的实现类——定时任务
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
public class TimingTaskService implements TaskHandler {
@Autowired
QueueTask queueTask;
private String taskInfo;
public TimingTaskService(String taskInfo) {
this.taskInfo = taskInfo;
}
@Override
@Scheduled(cron = "0 0/10 * * * ?")
public void processTask() {
boolean isSuccess = queueTask.addData(new TimingTaskService("newTaskInfo"));
if (isSuccess) {
System.out.println("the queueService has done!");
}
}
}
(4)最后,补充单元测试,测试以上方法的正确性
import org.junit.Assert;
import org.junit.Test;
public class TimingTaskServiceTest {
@Test
public void timingTaskServiceTest() {
QueueTask queueTask = new QueueTask();
Assert.assertTrue(queueTask.addData(new TimingTaskService("newTaskInfo")));
Assert.assertFalse(queueTask.addData(new TimingTaskService("taskInfo")));
}
}
|