前言
前面介绍了CyclicBarrier和CountDownLatch,本文继续介绍另一种常用的并发工具类Semaphore。
Semaphore是一种计数信号量,利用它可以控制一定数量的请求,从而实现资源访问限制的目的,实际应用中,可以用来限制访问某种资源的数量,比如在Hystrix中就有基于Semaphore的资源隔离策略。
最简单的理解信号量就是,一个计数器、一个等待队列、两个方法(在Java实现的Semaphore中就是acquire和release)。 调用一次acquire方法,计数器就减1,如果此时计数器小于0,则阻塞当前线程,否则当前线程可继续执行。 调用一次release方法,计数器就加1,如果此时计数器小于等于0,则唤醒一个等待队列中的线程,并将其中等待队列中移除。
适用场景
1.限流器使用场景
利用Semaphore实现一 个限流器的功能
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(5);
Thread t = new Thread(new SemaphoreWorker(semaphore));
t.start();
}
}
class SemaphoreWorker implements Runnable {
private Semaphore semaphore;
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 8, 0,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("my_thread_pool_%d").build(), new ThreadPoolExecutor.DiscardOldestPolicy());
public SemaphoreWorker(Semaphore semaphore) {
this.semaphore = semaphore;
}
@SneakyThrows
@Override
public void run() {
while (true) {
threadPool.execute(new Runnable() {
@SneakyThrows
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "等待资源");
applyResource();
System.out.println(Thread.currentThread().getName() + "申请到资源,开始执行任务");
Thread.sleep(1000);
} finally {
releaseResource();
System.out.println(Thread.currentThread().getName() + "释放资源");
}
}
});
}
}
private void releaseResource() {
semaphore.release();
}
private void applyResource() throws InterruptedException {
semaphore.acquire();
}
}
最后申请到5个资源后,就必须等待有资源释放才能再申请到资源。
当然acquire?和release方法也支持一次获取或者释放多个资源。
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "等待资源");
semaphore.acquire(5);
System.out.println(Thread.currentThread().getName() + "申请到资源,开始执行任务");
System.out.println("预估等待获取资源的线程数:" + semaphore.getQueueLength());
Thread.sleep(1000);
} finally {
semaphore.release(5);
System.out.println(Thread.currentThread().getName() + "释放资源");
}
}
2.用信号量实现互斥锁
public class SemaphoreMutex {
private static final Semaphore SEMAPHORE = new Semaphore(1);
public static void main(String[] args) {
SemaphoreMutex semaphoreMutex = new SemaphoreMutex();
for (int i = 0; i < 10; i++) {
new Thread(semaphoreMutex::method).start();
}
}
@SneakyThrows
public void method() {
SEMAPHORE.acquire();
try {
System.out.println(Thread.currentThread().getName() + "线程正在执行!");
Thread.sleep(1000);
} finally {
SEMAPHORE.release();
System.out.println(Thread.currentThread().getName() + "线程执行结束!");
}
}
}
从执行结果来看,每次都是一个线程执行结束后,才会有另一个开始执行,实现了互斥锁的功能!
|