Semaphore
Semaphore 字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,通过协调各个线程,以保证合理的使用资源。底层依赖 AQS 的状态 State,是在生产当中比较常用的一个工具类。
使用场景
常用于资源有明确访问数量限制的场景,常用于限流,可实现熔断、降级等处理 。
例如:数据库连接池,连接不能超过限制数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接。
例如:某些排队场景,能够容纳同时在线人数有限,超过后只能排队等待。
例如:资源访问,服务限流(Hystrix里限流就有基于信号量方式),可以使用 tryAcquire(long timeout, TimeUnit unit) 方法,在超时做其他处理。
Semaphore常用方法说明
方法 | 说明 |
---|
acquire(int permits) | 获取 n 个许可,在获取到许可、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态,被中断时抛出异常。有无参方法,默认获取一个信号量 | acquireUninterruptibly() | 获取 n 个许可,在获取到许可之前线程一直处于阻塞状态(忽略中断) | tryAcquire() | 尝试获得许可,返回获取许可成功或失败,不阻塞线程 | tryAcquire(long timeout, TimeUnit unit) | 尝试获得许可,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程 | release(int permits) | 释放 n 个许可,并将它们归还给信号量(即增加可用许可)不传参默认释放一个许可 | availablePermits() | 返回可用的许可数量 | getQueueLength() | 获取等待队列里阻塞的线程数 | hasQueuedThreads() | 等待队列里是否还存在等待线程 | drainPermits() | 清空令牌把可用令牌数置为0,返回清空令牌的数量 |
使用实例
@Slf4j
public class SemaphoreRunner {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i=0;i<5;i++){
new Thread(new Task(semaphore,"SemaphoreRunner+"+i)).start();
}
}
static class Task extends Thread {
Semaphore semaphore;
public Task(Semaphore semaphore,String name){
this.semaphore = semaphore;
this.setName(name);
}
public void run() {
try {
semaphore.acquire();
log.info(Thread.currentThread().getName()+"完成acquire()");
Thread.sleep(5000);
log.info(Thread.currentThread().getName()+"开始释放许可release()",System.currentTimeMillis());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行结果
--------------------------------------------------------------------------------
14:16:09.436 [Thread-3] INFO my.SemaphoreRunner - Thread-3完成acquire()
14:16:09.436 [Thread-1] INFO my.SemaphoreRunner - Thread-1完成acquire()
14:16:14.451 [Thread-3] INFO my.SemaphoreRunner - Thread-3开始释放许可release()
14:16:14.451 [Thread-1] INFO my.SemaphoreRunner - Thread-1开始释放许可release()
14:16:14.457 [Thread-5] INFO my.SemaphoreRunner - Thread-5完成acquire()
14:16:14.457 [Thread-7] INFO my.SemaphoreRunner - Thread-7完成acquire()
14:16:19.461 [Thread-7] INFO my.SemaphoreRunner - Thread-7开始释放许可release()
14:16:19.461 [Thread-5] INFO my.SemaphoreRunner - Thread-5开始释放许可release()
14:16:19.462 [Thread-9] INFO my.SemaphoreRunner - Thread-9完成acquire()
14:16:24.476 [Thread-9] INFO my.SemaphoreRunner - Thread-9开始释放许可release()
部分源码解析
构造方法:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
获取许可:
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
释放许可:
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
CountDownLatch
使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。构
使用场景
Zookeeper分布式锁、Jmeter模拟高并发等
CountDownLatch 原理
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当 一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的 线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
造器中的计数值(count)实际上就是闭锁需要等待的线程数量这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。
CountDownLatch方法说明(所有方法)
常用方法 | 说明 |
---|
await() | 使当前线程等待,直到闩锁计数为零,除非线程被中断 | countDown() | 递减计数,如果计数为零,则释放所有等待的线程。否则继续递减 | getCount() | 返回当前计数。此方法通常用于调试和测试目的 | await(long timeout, TimeUnit unit) | 使当前线程等待,直到锁存器倒数为零,除非线程被中断,或超时 |
- 开始执行前等待n个线程完成各自任务
- 实现最大的并行性:创建一个初始计数为1 的CountDownLatch,并让所有线程都在这个锁上等待,最后调用一次countDown() 方法就可以让所有的等待线程同时恢复执行。
- **死锁检测:**使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁
使用实例
@Slf4j
public class CountDownLatchRunner {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(new WashTheTeapot(countDownLatch)).start();
new Thread(new HeatUpWater(countDownLatch)).start();
try {
countDownLatch.await();
new Thread(new MakeTea()).start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class WashTheTeapot implements Runnable {
private CountDownLatch countDownLatch;
public WashTheTeapot(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
log.info("开始洗茶壶...");
try {
TimeUnit.SECONDS.sleep(5);
log.info("茶壶清洗结束...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (countDownLatch != null)
countDownLatch.countDown();
}
}
}
static class HeatUpWater implements Runnable {
private CountDownLatch countDownLatch;
public HeatUpWater(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
log.info("开始烧水...");
try {
TimeUnit.SECONDS.sleep(8);
log.info("水烧开了...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (countDownLatch != null)
countDownLatch.countDown();
}
}
}
static class MakeTea implements Runnable {
@Override
public void run() {
log.info("开始泡茶...");
try {
TimeUnit.SECONDS.sleep(5);
log.info("茶泡好了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行结果
-----------------------------------------------------------
17:20:37.921 [Thread-1] my.CountDownLatchRunner - 开始烧水...
17:20:37.921 [Thread-0] my.CountDownLatchRunner - 开始洗茶壶...
17:20:42.946 [Thread-0] my.CountDownLatchRunner - 茶壶清洗结束...
17:20:45.942 [Thread-1] my.CountDownLatchRunner - 水烧开了...
17:20:45.946 [Thread-2] my.CountDownLatchRunner - 开始泡茶...
17:20:50.956 [Thread-2] my.CountDownLatchRunner - 茶泡好了
CyclicBarrier
栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续运行。
常用方法
方法 | 说明 |
---|
await() | 使当前线程等待,直到最后一个线程(达到限制数的)到达屏障点 |
应用场景
-
模拟并发 -
多线程计算数据,最后合并计算结果。例如用一个 Excel 保存了用户 所有银行流水,每个 Sheet 保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 Sheet 里的银行流水,都执行完之后,得到每个 Sheet 的日均银行流水,最后,再用这些线程的计算结果,计算出整个 Excel 的日 均银行流水。
使用实例
@Slf4j
public class CyclicBarrierRunner {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
log.info("所有线程都到达了屏蔽点,如果线程执行完毕,没有线程再到达屏障,可结束!");
}
});
for (int i = 0; i < 6; i++) {
int j = i;
new Thread(new Runnable() {
@Override
public void run() {
log.info("Thread-{}被创建...",String.valueOf(j));
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
TimeUnit.SECONDS.sleep(2);
}
}
}
运行结果
-----------------------------------------------------------
17:28:47.972 [Thread-0] INFO my.CyclicBarrierRunner - Thread-0被创建...
17:28:49.970 [Thread-1] INFO my.CyclicBarrierRunner - Thread-1被创建...
17:28:51.986 [Thread-2] INFO my.CyclicBarrierRunner - Thread-2被创建...
17:28:51.986 [Thread-2] INFO my.CyclicBarrierRunner - 所有线程都到达了屏蔽点,如果线程执行完毕,没有线程再到达屏障,可结束!
17:28:53.994 [Thread-3] INFO my.CyclicBarrierRunner - Thread-3被创建...
17:28:56.003 [Thread-4] INFO my.CyclicBarrierRunner - Thread-4被创建...
17:28:58.011 [Thread-5] INFO my.CyclicBarrierRunner - Thread-5被创建...
17:28:58.011 [Thread-5] INFO my.CyclicBarrierRunner - 所有线程都到达了屏蔽点,如果线程执行完毕,没有线程再到达屏障,可结束!
Executors
主要用来创建线程池,代理了线程池的创建,使得你的创建入口参数变得简单 。十分的鸡肋,一帮不使用。线程的资源时十分宝贵的,不能随便乱用,一般需要使用时都是直接自己手动创建线程池。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
四种线程池
线程池种类 | 说明 |
---|
newCachedThreadPool | 创建一个可缓存线程池,如果线程池长度超过处理需 要,可灵活回收空闲线程,若无可回收,则新建线程 | newFixedThreadPool | 创建一个定长线程池,可控制线程最大并发数,超出的 线程会在队列中等待 | newScheduledThreadPool | 创建一个定长线程池,支持定时及周期性任务执行 | newSingleThreadExecutor | 创建一个线程池(这个线程池只有一个线程),这个线程池可以在线程死后(或发生异常时)重新启动一个线程来替代原来的线程继续执行下去 |
|