1、请问 如何控制多线程顺序执行
? ? ?1、使用join 控制
package com.example.demothread.util;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PathVariable;
/**
* @author lenovo
* @version 1.0
* @Date 2022/5/3 12:22
* @Description
*/
@Slf4j
public class DemoStepThread extends Thread {
/**
* 下一个需要执行
*/
private Thread preThread;
public DemoStepThread(Thread thread) {
this.preThread = thread;
}
public DemoStepThread() {
}
@Override
public void run() {
try {
if(preThread !=null) {
preThread.join();
}
Thread.sleep(300);
log.info("当前线程---{}",Thread.currentThread().getName());
} catch (Exception ex) {
log.error("捕获异常---{}", ex.getMessage(), ex);
}
}
}
main?
package com.example.demothread.util;
import lombok.extern.slf4j.Slf4j;
/**
* @author lenovo
* @version 1.0
* @Date 2022/5/1 15:13
* @Description
*/
@Slf4j
public class MainDemo {
public static void main(String args[]) {
DemoStepThread firstThread = new DemoStepThread();
DemoStepThread secondThread = new DemoStepThread(firstThread);
DemoStepThread threeThread = new DemoStepThread(secondThread);
threeThread.start();
secondThread.start();
firstThread.start();
}
}
?运行结果:
?2、使用 singleThreadExecutor 单核心线程
package com.example.demothread.util;
import lombok.extern.slf4j.Slf4j;
/**
* @author lenovo
* @version 1.0
* @Date 2022/5/1 15:09
* @Description
*/
@Slf4j
public class DemoRunnable implements Runnable {
/**
* 耗时时间
*/
private int costTime;
public DemoRunnable(int costTime) {
this.costTime =costTime;
}
@Override
public void run() {
try {
Thread.sleep(costTime*100);
log.info("当前耗时时间为---{}",costTime*100);
} catch ( Exception ex) {
log.error("捕获到异常---{}",ex.getMessage(),ex);
}
}
}
main 函数
package com.example.demothread.util;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* @author lenovo
* @version 1.0
* @Date 2022/5/1 15:13
* @Description
*/
@Slf4j
public class MainDemo {
public static void main(String args[]) {
Executor singleThreadExecutor = Executors.newSingleThreadExecutor();
singleThreadExecutor.execute(new DemoRunnable(3));
singleThreadExecutor.execute(new DemoRunnable(2));
singleThreadExecutor.execute(new DemoRunnable(1));
}
}
2、请问 如何实现如下效果?
? 题目描述: 建立三个线程 A、B、C ,A线程打印10次字母A,B线程打印10次字母B,C线程打印10 次字母C, 但是要求三个线程同时运行,并且实现交替打印,即按照ABCABCABC的顺序打印
? ?1、wait() 和 notify()、synchronized
? ? 其中
notify: 通知一个在对象上等待的线程,使其从wait() 返回,而返回的前提是该线程获取到了对象的锁
notifyAll: 通知所有等待在该对象上的线程,同时释放锁
wait: 通过调用该方法的线程进入waiting 状态, 只有等待另外线程的通知或者被中断才会返回,需要注意,调用wait方法后,会释放对象的锁
wait(long) 超时等待一段时间,这里的参数是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回
wait(long ,int) 对于超时时间更细颗粒度的控制,可以达到几毫秒
?示例代码:
package com.example.demothread.util;
import lombok.extern.slf4j.Slf4j;
/**
* @author lenovo
* @version 1.0
* @Date 2022/5/1 15:13
* @Description
*/
@Slf4j
public class MainDemo {
public static void main(String args[]) {
Object lock = new Object();
new Thread(new Runnable() {
@Override
public void run() {
log.info("线程A等待获取lock锁");
synchronized (lock) {
try {
log.info("线程A获取了lock锁");
Thread.sleep(1000);
log.info("线程A简要运行lock wait() 方法进行等待");
lock.wait();
log.info("线程A等待结束");
} catch (Exception ex) {
log.error("捕获到异常---{}",ex.getMessage(),ex);
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
log.info("线程B等待获取lock锁");
synchronized (lock) {
log.info("线程B获取到了lock锁");
try {
Thread.sleep(1000);
} catch ( Exception ex) {
log.error("捕获到异常-=---{}",ex.getMessage(),ex);
}
log.error("线程B将要运行lock.notify() 方法进行通知");
lock.notify();
}
}
}).start();
}
}
运行结果:
那么就可以使用这个方式来实现题目:
package com.example.demothread.util;
import lombok.extern.slf4j.Slf4j;
/**
* @author lenovo
* @version 1.0
* @Date 2022/5/3 14:26
* @Description
*/
@Slf4j
public class ThreadPrinter implements Runnable {
private String name;
private Object prev;
private Object self;
public ThreadPrinter(String name,Object prev,Object self) {
this.name = name;
this.prev = prev;
this.self = self;
}
@Override
public void run() {
int count = 10;
while (count > 0) {
synchronized (prev) {
synchronized (self) {
log.info(name);
count--;
self.notifyAll();
// 唤醒其他线程竞争self锁,注意此时self锁并没有立即释放
}
// 此时执行self 同步块 这时 self才释放
try {
// 如果count ==0 表示这个最后一次都打印操作.通过notifyAll
//操作释放对象锁
if (count == 0) {
// 如果count==0 表示这是最后有一次打印操作,通过notifyAll操作释放对象锁。
prev.notifyAll();
} else {
prev.wait(); // 立即释放prev锁,当前线程休眠,等待被唤醒
}
} catch (Exception ex) {
log.error("捕获到异常---{}", ex.getMessage(), ex);
}
}
}
}
}
package com.example.demothread.util;
import lombok.extern.slf4j.Slf4j;
/**
* @author lenovo
* @version 1.0
* @Date 2022/5/1 15:13
* @Description
*/
@Slf4j
public class MainDemo {
public static void main(String args[]) {
Object a = new Object();
Object objectB = new Object();
Object objectC =new Object();
ThreadPrinter threadPrinterA = new ThreadPrinter("A",objectC,a);
ThreadPrinter threadPrinterB = new ThreadPrinter("B",a,objectB);
ThreadPrinter threadPrinterC = new ThreadPrinter("C",objectB,objectC);
try {
new Thread(threadPrinterA).start();
Thread.sleep(10);
new Thread(threadPrinterB).start();
Thread.sleep(10);
new Thread(threadPrinterC).start();
Thread.sleep(10);
} catch (Exception ex) {
log.error("捕获到异常---{}",ex.getMessage(),ex);
}
}
}
运行结果:
代码分析:
ThreadPrinter 存在三个属性 name prev self ,其中 name 表示 线程名称? ?prev? 表示需要等待的前一个线程? self 自身对象锁
?重写的run方法中 首先 synchronized(prev) 表示要先拿到前一个对象的锁,也就是前一个对象需要释放自身,拿到锁之后,synchronized(slef) 表示拿到自身的锁,为了控制执行的顺序,必须要先持有prev锁,然后当前线程再申请自己的对象锁,只有等待同步块代码执行完毕之后才会释放,再调用prev.wait() 立即释放prev对象锁,当前线程进入休眠,等待其他线程的notify操作再次被唤醒。
? ?当synchronized 同步代码块执行完毕之后会自动释放self锁 ,如果是最后一次打印操作,则通过notifyAll释放对象所,如果不是则通过wait释放,当前线程休眠
运行逻辑:
ThreadPrinter pa = new ThreadPrinter("A",c,a)? 自身名称为A,前一个对象为c,自身为a
A线程最先运行,A线程按照顺序申请c,a对象锁,打印顺序后按照顺序释放a,c对象锁,并且通过notify操作唤醒线程B(因为B的prev是A) 。线程B 首先等待获取A锁,再申请B锁,然后打印B,再释放B,A锁,唤醒C。 线程C等待B锁,再申请C锁,然后打印C,再释放C,B锁,唤醒A。注意初始条件是这三个线程必须按照A,B,C的顺序来启动。
?2、lock 锁方法
? ? 通过ReentrantLock我们可以很方便的进行显示的锁操作,即获取锁和释放锁,对于同一个对象锁而言,统一时刻只可能有一个线程拿到了这个锁,此时其他线程通过lock.lock() 来获取对象锁时都会被阻塞,直到这个线程通过lock.unlock() 操作释放这个锁后,其他线程才能拿到这个锁
? ?
package com.example.demothread.util;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author lenovo
* @version 1.0
* @Date 2022/5/1 15:13
* @Description
*/
@Slf4j
public class MainDemo {
private static Lock lock = new ReentrantLock();
// 通过JDK5中的Lock锁来保证线程的访问
private static int state =0; // 通过state的值来确定是否打印
static class ThreadA extends Thread {
@Override
public void run() {
for(int i=0;i<10;) {
try {
lock.lock();
while(state %3 ==0) {
log.info("A");
state++;
i++;
}
} catch (Exception ex) {
log.error("捕获到异常---{}",ex.getMessage(),ex);
} finally {
lock.unlock();
}
}
}
}
static class ThreadB extends Thread {
@Override
public void run() {
for(int i=0;i<10;) {
try {
lock.lock();
while(state %3 ==1) {
log.info("B");
state++;
i++;
}
} catch (Exception ex) {
log.error("捕获到异常---{}",ex.getMessage(),ex);
}finally {
lock.unlock();
}
}
}
}
static class ThreadC extends Thread {
@Override
public void run() {
for(int i=0;i<10;) {
try {
lock.lock();
while(state %3 ==2) {
log.info("C");
state++;
i++;
}
} catch (Exception ex) {
log.error("捕获到异常---{}",ex.getMessage(),ex);
} finally {
lock.unlock();
}
}
}
}
public static void main(String args[]) {
new ThreadA().start();
new ThreadB().start();
new ThreadC().start();
}
}
运行结果:
分析:
首先定义了两个静态变量 ReentranLock lock 和state?
ThreadA 表示使用了lock.lock获取到了锁,这样如果其他对象再次使用lock.lock()的时候会被阻塞住,当依次循环打印完毕之后,会调用lock.unlock() 释放锁,这样别线程等待的时候就会获取到锁,从而实现顺序轮询打印
3、ReentrantLock结合Condition
与ReentrantLock搭配的通行方式是Condition ,如下;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
condition.await(); // this.await();
condition.signal(); // this.notify();
condition.signalAll(); // this.notifyAll();
Condition 是被绑定在Lock上的,必须使用lock.newCondition() 才能创建一个Condition,从上面的代码可以看出,Synchronized 能实现的通信方式,Condition都可以实现
?
package com.example.demothread.util;
import lombok.extern.slf4j.Slf4j;
import sun.print.Win32MediaTray;
import sun.util.resources.cldr.kl.LocaleNames_kl;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author lenovo
* @version 1.0
* @Date 2022/5/1 15:13
* @Description
*/
@Slf4j
public class MainDemo {
private static Lock lock = new ReentrantLock();
private static Condition A = lock.newCondition();
private static Condition B = lock.newCondition();
private static Condition C = lock.newCondition();
private static int count =0;
static class ThreadA extends Thread {
@Override
public void run() {
try {
lock.lock();
for(int i=0;i<10;i++) {
while(count %3!=0) {
A.await();
}
log.info("A");
count++;
B.signal();
}
} catch (Exception ex) {
log.error("捕获到异常---{}",ex.getMessage(),ex);
} finally {
lock.unlock();
}
}
}
static class ThreadB extends Thread {
@Override
public void run() {
try {
lock.lock();
for(int i=0;i<10;i++) {
while(count%3!=1) {
B.await();
}
log.info("B");
count++;
C.signal();
}
}catch (Exception ex) {
log.error("捕获到异常----{}",ex.getMessage(),ex);
} finally {
lock.unlock();
}
}
}
static class ThreadC extends Thread {
@Override
public void run() {
try {
lock.lock();
for(int i=0;i<10;i++) {
while(count%3!=2) {
C.await();
}
log.info("C");
count++;
A.signal();
}
} catch (Exception ex) {
log.error("捕获到异常---{}",ex.getMessage(),ex);
} finally {
lock.unlock();
}
}
}
public static void main(String args[]) {
new ThreadA().start();
new ThreadB().start();
new ThreadC().start();
}
}
3、如何判断线程池中的线程都已经执行完毕
? ?1、使用isTerminated 方法判断
? ? ?我们可以利用线程池的终止状态(Terminated) 来判断线程池的任务,是否已经全部执行完成,但想要线程池的状态发生改变,我们就需要调用线程池的shutdown方法,不然线程池一直会处于RUNNING运行状态,那就没办法使用终止状态来判断任务是否已经全部执行完了,它的实现代码如下:
?
package com.example.demothread.util;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author lenovo
* @version 1.0
* @Date 2022/5/1 15:13
* @Description
*/
@Slf4j
public class MainDemo {
public static void main(String args[]) {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(10,20,0,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(1024));
addTask(threadPoolExecutor);
isCompleted(threadPoolExecutor);
log.info("线程池任务执行完成");
}
public static void isCompleted(ThreadPoolExecutor threadPool) {
threadPool.shutdown();
while(!threadPool.isTerminated()) {
}
}
private static void addTask(ThreadPoolExecutor threadPoolExecutor) {
final int taskCount =5;
for(int i=0; i<taskCount;i++) {
final int finalI = i;
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
try {
int sleepTime= new Random().nextInt(5);
TimeUnit.SECONDS.sleep(sleepTime);
} catch (Exception ex) {
log.error("捕获到异常");
}
log.info("任务完成---{}",finalI);
}
});
}
}
}
运行结果:
?shutdown方法是启动线程池有序关闭的方法,它在完全关闭之前会执行完之前所有已经提交的任务,并且不会接受任务新的任务,当线程池中所有任务都执行完成之后,线程池就进入了终止状态,调用isTerminated 方法返回的结果就是true了
?但是一旦调用了shutdown 就需要关闭了线程池
2、 getCompletedTaskCount?
? ? 我们可以通过判断线程池中的计划执行的任务数和已完成任务数,来判断线程池是否已经全部执行完,如果计划执行任务数= 已完成数,那么线程池的任务就全部执行完了,否则就未执行完成,
package com.example.demothread.util;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author lenovo
* @version 1.0
* @Date 2022/5/1 15:13
* @Description
*/
@Slf4j
public class MainDemo {
public static void main(String args[]) {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(10,20,0,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(1024));
addTask(threadPoolExecutor);
isCompleted(threadPoolExecutor);
log.info("线程池任务执行完成");
}
public static void isCompleted(ThreadPoolExecutor threadPool) {
while(threadPool.getTaskCount()!= threadPool.getCompletedTaskCount()) {
}
}
private static void addTask(ThreadPoolExecutor threadPoolExecutor) {
final int taskCount =5;
for(int i=0; i<taskCount;i++) {
final int finalI = i;
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
try {
int sleepTime= new Random().nextInt(5);
TimeUnit.SECONDS.sleep(sleepTime);
} catch (Exception ex) {
log.error("捕获到异常");
}
log.info("任务完成---{}",finalI);
}
});
}
}
}
? ?其中 getTaskCount() 返回计划执行的任务总数,由于任务和线程的状态可能在计算过程中动态变化,因此返回的值只是一个近似值
?getCompletedTaskCount(): 返回完成执行任务的总数,因为任务和线程状态可能在计算过程中动态的改变,所以返回的值只是一个近似值,但是在连续调用中并不会减少
?3、CountDownLatch
?countDownLatch 可以理解一个计数器,我们创建一个包含N个任务的计数器,每个任务执行完计数器-1,直到计数器减为0,说明所有的任务都执行完了,就可以执行下一段业务代码
package com.example.demothread.util;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.C;
import org.springframework.web.bind.annotation.PathVariable;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author lenovo
* @version 1.0
* @Date 2022/5/1 15:13
* @Description
*/
@Slf4j
public class MainDemo {
public static void main(String args[]) {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(10,20,0,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(1024));
final int taskCount =5;
CountDownLatch countDownLatch = new CountDownLatch(taskCount);
for(int i=0;i<taskCount;i++) {
final int finalI = i;
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
try {
} catch (Exception ex) {
log.error("捕获到异常---{}",ex.getMessage(),ex);
}finally {
log.info("任务执行完成---{}",finalI);
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await();
} catch (Exception ex) {
log.error("捕获到异常---{}",ex.getMessage(),ex);
}
log.info("线程池任务执行完成");
}
}
CountDownLatch创建之后不能被重复使用,也就是CountDownLatch可以理解为只能使用一次的计数器
4、CyclicBarrier
??
package com.example.demothread.util;
import com.fasterxml.jackson.databind.ser.impl.UnwrappingBeanPropertyWriter;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.C;
import org.springframework.web.bind.annotation.PathVariable;
import java.util.Random;
import java.util.concurrent.*;
/**
* @author lenovo
* @version 1.0
* @Date 2022/5/1 15:13
* @Description
*/
@Slf4j
public class MainDemo {
public static void main(String args[]) {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(10,20,0,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(1024));
final int taskCount =5;
CyclicBarrier cyclicBarrier = new CyclicBarrier(taskCount, new Runnable() {
@Override
public void run() {
log.info("所有任务都已经执行完成");
}
});
// 添加任务
for(int i=0;i<taskCount;i++) {
final int finalI = i;
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
try {
int sleepTime = new Random().nextInt(5);
TimeUnit.SECONDS.sleep(sleepTime);
log.info("任务执行完成---{}",finalI);
cyclicBarrier.await();
} catch (Exception ex) {
log.error("捕获到异常---{}",ex.getMessage(),ex);
}
}
});
}
}
}
运行结果:?
?CyclicBarrier 有3个重要的方法:?
1、构造方法:构造方法可以传递两个参数,参数1是计数器的数量parties,参数2是计数器0时,也就是任务都执行完成之后可以执行的事件(方法)
?2、await方法,在CycliBarrier 上进行阻塞等待,当调用此方法CyclicBarrier 的内部计数器会 -1,直到发生以下情形之一:
? ? 1、在CycliBarrier 上等待的线程数量达到parties,也就是计数器声明的数量时,则所有的线程被释放,继续执行
?2、当前线程被中断,抛出InterruptedException异常,并停止等待,继续执行。
3、其他等待线程被中断,则当前线程抛出BrokenBarrierException 异常,并停止等待,继续执行
4、其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行
5、其他线程调用CrycliBarrier.reset() 方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行
reset方法: 使得CycliBarrier回到初始状态:
? ? ? 1、如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行
? ?2、将是破损标志位broken置为false
CycliBarrier 可以重复使用
|