目录
1.Semaphore介绍
1.1 限流操作实战
1.2 Semaphore源码解析
?2. countDownLatch介绍
2.1 多个线程等待实战
?2.2 主线程等待
2.3 源码分析
1.Semaphore介绍
semaphorer(信号量),是一个基于AQS框架实现的工具类,也是操作系统PV操作在java中的实现。通过发放许可来控制线程,只有拿到许可的线程才能执行代码,常用于限流操作。
PV操作是一种操作系统实现进程互斥与同步的有效方法:P表示通过,V表示释放。
P操作:S-1=X,如果X>=0线程执行,如果小于0放入等待队列中。
V操作:S+1=X,如果X>=0线程执行,如果小于0从等待队列释放一个等待线程。
构造器(默认非公平锁,也有公平锁实现):
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
常用的方法:
acquire()获取许可拿到共享锁
tryacquire()尝试获取锁,获取到锁返回true,没有返回false
release()释放许可
1.1 限流操作实战
模拟限流操作
1. 5个线程发送3个许可。
2. 10个核心线程发送5个许可。
import java.util.concurrent.Semaphore;
/**
* 限流操作
*/
public class SemaphoreTest {
private static Semaphore semaphore = new Semaphore(3);//发放三个许可
public static void main(String[] args) throws Exception {
//执行5个线程
for(int i=0;i<5;i++){
new Thread(()->{
try{
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"开始购票");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName()+"购票成功");
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
}).start();
}
}
}
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 使用线程池-限流操作
*/
public class SemaphoreTest2 {
//定义一个5个许可的信号量
private static Semaphore semaphore = new Semaphore(5);
//定义一个10个线程的线程池
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,20,60,
TimeUnit.SECONDS,new LinkedBlockingQueue<>(200));
public static void main(String[] args) throws InterruptedException {
//每100ms发送一次请求
for(;;){
Thread.sleep(100);
threadPoolExecutor.execute(()->exec());
System.out.println("-------------------------------------");
}
}
private static void exec(){
//模拟限流场景
try{
semaphore.acquire(1);
//暂停2秒
Thread.sleep(2000);
System.out.println("执行业务逻辑");
}catch (Exception e){
e.printStackTrace();
}finally {
semaphore.release(1);
}
}
}
1.2 Semaphore源码解析
关注点:
1.加解锁逻辑(共享锁)实现
2.竞争锁失败入队操作阻塞和唤醒同步队列中等待线程逻辑
服了这代码了,写的狗屁不通
?
?
?2. countDownLatch介绍
?countDownLatch(闭锁)是一个同步协助类,用于一个或多个线程等待,相当于一个倒计时计数器,需要当计数器为0时才释放等待线程执行,等待线程会一次性全部返回,计数器不会重置,需要重置需要使用CyclicBarrier
?常用方法:
public void await() 让线程等待
public boolean await(long timeout, TimeUnit unit) 让线程等待一段时间,没有置为0执行业务
countDown() count减1
countDownLatch比join方法更加灵活,join方法底层在不停检测线程状态是否存活。
countDownLatch与CyclicBarrier区别:
1.countDownLatch只能使用一次,CyclicBarrier可以通过reset方法重复使用
2.countDownLatch是通过AQS的共享锁实现的,CyclicBarrier是通过reentrantlock的独占锁和condition实现线程唤醒。
2.1 多个线程等待实战
定义一个计数器为3的闭锁,线程挂起,等待2秒计数器减1,直到最后一个线程减到0跑线程
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
//定义一个计数器为3的闭锁
private static CountDownLatch countDownLatch = new CountDownLatch(3);
public static void main(String[] args) throws InterruptedException {
for (int i =0 ;i<5;i++){
//阻塞线程
new Thread(()->{
try{
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"执行业务");
}catch(Exception e){
e.printStackTrace();
}
}).start();
//等待2秒计数器减1
Thread.sleep(2000);
countDownLatch.countDown();
}
}
}
?2.2 主线程等待
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest2 {
public static void main(String[] args) throws InterruptedException {
//1. 定义一个计数器为3的线程
CountDownLatch countDownLatch = new CountDownLatch(3);
//2.让线程计数器减1
for(int i=0;i<3;i++){
new Thread(()->{
try{
Thread.sleep(1000);
countDownLatch.countDown();
}catch (Exception e){
e.printStackTrace();
}
}).start();
}
//3.主线程阻塞,当计数器为0时执行业务
countDownLatch.await();
System.out.println("执行业务");
}
}
2.3 源码分析
关注点:
加解锁逻辑
1. countDownLatch.await()加锁:默认写死1永远waitStatus为-1走阻塞线程逻辑
2. countDownLatch.countDown()解锁:cas操作减1,直到为0执行唤醒线程逻辑?
解决synchronized两个线程导致的死锁问题:使用wait方法释放锁的形式解决,线上只能kill了
?
|