现在我有一个任务,希望异步执行,首先就考虑创建一个线程嘛
第一版
package com.su.demo.test;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Demo {
public static void main(String[] args) {
FlashExecutor flashExecutor = new FlashExecutor();
for (int i = 0; i < 10; i++) {
int temp = i;
flashExecutor.execute(()->{
log.debug("当前线程名称:{}",Thread.currentThread().getName());
log.debug("打印当前数值:{}",temp);
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
class FlashExecutor implements Executor{
@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}
就是一个任务对应创建一个线程去执行,显而易见,缺点是十分明显的。
第二版
package com.su.demo;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class Main {
public static void main(String[] args) {
FlashExecutor flashExecutor = new FlashExecutor(10);
for (int i = 0; i < 100; i++) {
int temp = i;
flashExecutor.execute(()->{
log.debug("当前线程名称:{}",Thread.currentThread().getName());
log.debug("打印当前数值:{}",temp);
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
log.info("主线程循环次数:{}",temp);
}
}
}
@Slf4j
class FlashExecutor implements Executor {
Worker worker;
ArrayBlockingQueue<Runnable> taskQueue;
int queueCapacity;
public FlashExecutor(int queueCapacity) {
this.queueCapacity = queueCapacity;
taskQueue = new ArrayBlockingQueue<>(queueCapacity);
worker = new Worker(null);
worker.start();
}
@Override
public void execute(Runnable command) {
if(taskQueue.size() < queueCapacity){
try {
taskQueue.add(command);
} catch (Exception e) {
log.debug("任务队列已满,无法继续添加任务");
e.printStackTrace();
}
log.debug("向任务队列中添加一个任务:{}",command);
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (true){
if(task != null ){
try{
log.debug("正在执行...{}",task);
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}else{
if(!taskQueue.isEmpty()){
try {
task = taskQueue.remove();
} catch (Exception e) {
log.info("任务队列为空,无法获取任务");
e.printStackTrace();
}
}
}
}
}
}
}
把任务丢到一个任务队列中,然后只启动一个worker线程,不断地从任务队列中获取任务,执行任务 缺点:当任务队列满了之后,会抛弃任务。
小步前进,实现不会丢弃任务
调用put()、take()方法
package com.su.demo;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class Main {
public static void main(String[] args) {
FlashExecutor flashExecutor = new FlashExecutor(10);
for (int i = 0; i < 100; i++) {
int temp = i;
flashExecutor.execute(()->{
log.debug("当前线程名称:{}",Thread.currentThread().getName());
log.debug("打印当前数值:{}",temp);
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
log.info("主线程循环次数:{}",temp);
}
}
}
@Slf4j
class FlashExecutor implements Executor {
Worker worker;
ArrayBlockingQueue<Runnable> taskQueue;
int queueCapacity;
public FlashExecutor(int queueCapacity) {
this.queueCapacity = queueCapacity;
taskQueue = new ArrayBlockingQueue<>(queueCapacity);
worker = new Worker(null);
worker.start();
}
@Override
public void execute(Runnable command) {
try {
taskQueue.put(command);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("向任务队列中添加一个任务:{}",command);
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (true){
if(task != null){
try{
log.debug("正在执行...{}",task);
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}else{
try {
task = taskQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
缺点:只有一个线程执行任务,太慢。如果有多个线程执行任务就好了。 还有就是线程一直死循环从任务队列中获取任务。
小步前进 自己实现一个阻塞队列
package com.su.demo;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class Main {
public static void main(String[] args) {
FlashExecutor flashExecutor = new FlashExecutor(10);
for (int i = 0; i < 5; i++) {
int temp = i;
flashExecutor.execute(()->{
log.debug("当前线程名称:{}",Thread.currentThread().getName());
log.debug("打印当前数值:{}",temp);
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
@Slf4j
class FlashExecutor implements Executor {
Worker worker;
BlockQueue<Runnable> taskQueue;
public FlashExecutor(int queueCapacity) {
taskQueue = new BlockQueue<>(queueCapacity);
}
@Override
public void execute(Runnable command) {
if(worker == null){
worker = new Worker(command);
worker.start();
}else{
taskQueue.put(command);
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (true){
if(task != null || (task = taskQueue.take()) != null){
try{
log.debug("正在执行...{}",task);
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
}
}
}
}
class BlockQueue<T>{
private Deque<T> queue = new ArrayDeque<>();
private ReentrantLock lock = new ReentrantLock();
private Condition fullWaitSet = lock.newCondition();
private Condition emptyWaitSet = lock.newCondition();
private int capacity;
public BlockQueue(int capacity) {
this.capacity = capacity;
}
public T take(){
lock.lock();
try{
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fullWaitSet.signal();
return queue.removeFirst();
}finally {
lock.unlock();
}
}
public void put(T element){
lock.lock();
try{
while (queue.size() == capacity) {
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
public int size(){
lock.lock();
try{
return queue.size();
}finally {
lock.unlock();
}
}
}
这个时候只有一个线程执行任务,它说它太累了啊。短时间内处理不完太多任务,万一任务队列满了。
@Slf4j
public class Main {
public static void main(String[] args) {
FlashExecutor flashExecutor = new FlashExecutor(10);
for (int i = 0; i < 100; i++) {
int temp = i;
flashExecutor.execute(()->{
log.debug("当前线程名称:{}",Thread.currentThread().getName());
log.debug("打印当前数值:{}",temp);
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
第三版
希望实现可以有多个线程同时执行任务队列中的任务 初始化时,直接启动corePoolSize个工作线程先跑着, 然后死循环不断地从任务队列中获取任务,执行任务。
package com.su.demo;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class Main {
public static void main(String[] args) {
FlashExecutor flashExecutor = new FlashExecutor(2,10);
for (int i = 0; i < 100; i++) {
int temp = i;
flashExecutor.execute(()->{
log.debug("当前线程名称:{},打印当前数值:{}",Thread.currentThread().getName(),temp);
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
log.info("主线程循环次数:{}",temp);
}
}
}
@Slf4j
class FlashExecutor implements Executor {
Worker[] workers;
ArrayBlockingQueue<Runnable> taskQueue;
int queueCapacity;
int corePoolSize;
public FlashExecutor(int corePoolSize,int queueCapacity) {
this.queueCapacity = queueCapacity;
taskQueue = new ArrayBlockingQueue<>(queueCapacity);
this.corePoolSize =corePoolSize;
workers = new Worker[corePoolSize];
for (int i = 0; i < this.corePoolSize; i++) {
workers[i] = new Worker(null);
workers[i].start();
}
}
@Override
public void execute(Runnable command) {
try {
taskQueue.put(command);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("向任务队列中添加一个任务:{}",command);
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (true){
if(task != null){
try{
log.debug("正在执行...{}",task);
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}else{
try {
task = taskQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
第四版
package com.su.demo.test;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "hang.Main")
public class Main {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MICROSECONDS, 10);
for (int i = 0; i < 5; i++) {
int temp = i;
threadPool.execute(()->{
log.debug("打印当前值:{}",temp);
});
}
}
}
@Slf4j(topic = "hang.ThreadPool")
class ThreadPool{
private BlockQueue<Runnable> taskQueue;
private HashSet<Worker> workers = new HashSet<>();
private int coreSize;
private long timeout;
private TimeUnit timeUnit;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueCapacity);
}
public void execute(Runnable task){
if(workers.size() < coreSize){
Worker worker = new Worker(task);
log.debug("新增 worker:{},任务:{}",worker,task);
workers.add(worker);
worker.start();
}else{
log.debug("加入任务队列{}",task);
taskQueue.put(task);
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (task != null || (task = taskQueue.take()) != null){
try{
log.debug("正在执行...{}",task);
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers){
log.debug("worker被移除...{}",this);
workers.remove(this);
}
}
}
}
class BlockQueue<T>{
private Deque<T> queue = new ArrayDeque<>();
private ReentrantLock lock = new ReentrantLock();
private Condition fullWaitSet = lock.newCondition();
private Condition emptyWaitSet = lock.newCondition();
private int capacity;
public BlockQueue(int capacity) {
this.capacity = capacity;
}
public T poll(long timeout, TimeUnit unit){
lock.lock();
try{
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if(nanos <= 0){
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fullWaitSet.signal();
return queue.removeFirst();
}finally {
lock.unlock();
}
}
public T take(){
lock.lock();
try{
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fullWaitSet.signal();
return queue.removeFirst();
}finally {
lock.unlock();
}
}
public void put(T element){
lock.lock();
try{
while (queue.size() == capacity) {
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
public int size(){
lock.lock();
try{
return queue.size();
}finally {
lock.unlock();
}
}
}
|