1.ReentrantLock详解
相对于synchronized 它具备如下特点
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
- 与 synchronized 一样,都支持可重入
基本语法
private ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
}finally {
lock.unlock();
}
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
1.1 可重入
- 可重入锁是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此 有权利再次获取这把锁
- 如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
@Slf4j(topic = "z.ReentrantTest")
public class ReentrantTest {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
lock.lock();
try {
log.debug("entry main...");
m1();
} finally {
lock.unlock();
}
}
private static void m1() {
lock.lock();
try {
log.debug("entry m1...");
m2();
} finally {
lock.unlock();
}
}
private static void m2() {
log.debug("entry m2....");
}
}
运行结果
2022-03-11 21:15:34 [main] - entry main...
2022-03-11 21:15:34 [main] - entry m1...
2022-03-11 21:15:34 [main] - entry m2....
Process finished with exit code 0
synchronized的可重入
static final Object obj = new Object();
public static void method1() {
synchronized( obj ) {
method2();
}
}
public static void method2() {
synchronized( obj ) {
}
}
1.2 可中断 lockInterruptibly()
synchronized 和 reentrantlock.lock() 的锁, 是不可被打断的; 也就是说别的线程已经获得了锁, 线程就需要一直等待下去,不能中断,直到获得到锁才运行。
通过reentrantlock.lockInterruptibly(); 可以通过调用阻塞线程的t1.interrupt();方法 打断。
@Slf4j
public class ReentrantTest1 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
log.debug("尝试获得锁");
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("t1线程没有获得锁,被打断...return");
return;
}
try {
log.debug("t1线程获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
thread.start();
Thread.sleep(1000);
log.debug("interrupt...打断t1");
thread.interrupt();
}
}
2022-03-11 21:42:43 [t1] - 尝试获得锁
2022-03-11 21:42:44 [main] - interrupt...打断t1
2022-03-11 21:42:44 [t1] - t1线程没有获得锁,被打断...return
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:900)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1225)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
at com.concurrent.reentrantlocktest.ReentrantTest1.lambda$main$0(ReentrantTest1.java:25)
at java.lang.Thread.run(Thread.java:748)
Process finished with exit code 0
测试使用lock.lock()不可以从阻塞队列中打断, 一直等待别的线程释放锁
@Slf4j
public class ReentrantTest1 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
log.debug("尝试获得锁");
lock.lock();
try {
log.debug("t1线程获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
thread.start();
Thread.sleep(1000);
log.debug("interrupt...打断t1");
thread.interrupt();
}
}
调用thread.interrupt();后 thread线程并没被打断。
1.3 设置超时时间 tryLock()
使用 lock.tryLock() 方法会返回获取锁是否成功。如果成功则返回true,反之则返回false。
并且tryLock方法可以设置指定等待时间,参数为:tryLock(long timeout, TimeUnit unit) , 其中timeout为最长等待时间,TimeUnit为时间单位
获取锁的过程中, 如果超过等待时间, 或者被打断, 就直接从阻塞队列移除, 此时获取锁就失败了, 不会一直阻塞着 ! (可以用来实现死锁问题)
不设置等待时间, 立即失败
@Slf4j
public class ReentrantTest2 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
log.debug("尝试获得锁");
if (!lock.tryLock()) {
log.debug("获取立刻失败,返回");
return;
}
try {
log.debug("获得到锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得到锁");
t1.start();
Thread.sleep(2000);
log.debug("释放了锁");
lock.unlock();
}
}
2022-03-11 22:20:09 [main] - 获得到锁
2022-03-11 22:20:09 [t1] - 尝试获得锁
2022-03-11 22:20:09 [t1] - 获取立刻失败,返回
2022-03-11 22:20:11 [main] - 释放了锁
Process finished with exit code 0
设置等待时间
@Slf4j
public class ReentrantTest2 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
log.debug("尝试获得锁");
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("等待1s获取不到锁");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("获取不到锁");
return;
}
try {
log.debug("获得到锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得到锁");
t1.start();
Thread.sleep(2000);
log.debug("释放了锁");
lock.unlock();
}
}
2022-03-11 22:32:32 [main] - 获得到锁
2022-03-11 22:32:32 [t1] - 尝试获得锁
2022-03-11 22:32:33 [t1] - 等待1s获取不到锁
2022-03-11 22:32:34 [main] - 释放了锁
Process finished with exit code 0
1.4 通过lock.tryLock()来解决, 哲学家就餐问题
lock.tryLock(时间) : 尝试获取锁对象, 如果超过了设置的时间, 还没有获取到锁, 此时就退出阻塞队列, 并释放掉自己拥有的锁
@Slf4j(topic = "z.PhilosopherEat")
public class PhilosopherEat {
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
}
}
@Slf4j(topic = "z.Philosopher")
class Philosopher extends Thread {
final Chopstick left;
final Chopstick right;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
@Override
public void run() {
while (true) {
if (left.tryLock()) {
try {
if (right.tryLock()) {
try {
eat();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
right.unlock();
}
}
} finally {
left.unlock();
}
}
}
}
private void eat() throws InterruptedException {
log.debug("eating...");
Thread.sleep(500);
}
}
class Chopstick extends ReentrantLock {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}
1.5 公平锁 new ReentrantLock(true)
- ReentrantLock默认是非公平锁, 可以指定为公平锁new ReentrantLock(true)。
- 在线程获取锁失败,进入阻塞队列时,先进入的会在锁被释放后先获得锁。这样的获取方式就是公平的。一般不设置ReentrantLock为公平的, 会降低并发度
- Synchronized底层的Monitor锁就是不公平的, 和谁先进入阻塞队列是没有关系的。
公平锁 多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。 优点:所有的线程都能得到资源,不会饿死在队列中。 缺点:吞吐量会下降很多,队列里面除了第一个线程,其他的线程都会阻塞,cpu唤醒阻塞线程的开销会很大。 非公平锁 多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。 优点:可以减少CPU唤醒线程的开销,整体的吞吐效率会高点,CPU也不必取唤醒所有线程,会减少唤起线程的数量。 缺点:你们可能也发现了,这样可能导致队列中间的线程一直获取不到锁或者长时间获取不到锁,导致饿死。
1.6 条件变量 Condition
传统对象等待集合只有一个 waitSet, Lock可以通过newCondition()方法 生成多个等待集合Condition对象。 Lock和Condition 是一对多的关系
synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet等待 ReentrantLock的条件变量比 synchronized强大之处在于,它是支持多个条件变量的,这就好比 synchronized 是那些不满足条件的线程都在一间休息室等消息 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来 唤醒
使用流程 1.await 前需要 获得锁 2.await 执行后,会释放锁,进入 conditionObject (条件变量)中等待 3.await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁 4.竞争 lock 锁成功后,从 await 后继续执行 5.signal 方法用来唤醒条件变量(等待室)汇总的某一个等待的线程 6.signalAll方法, 唤醒条件变量(休息室)中的所有线程
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
代码举例:
@Slf4j(topic = "z.ConditionVariable")
public class ConditionVariable {
private static boolean hasCigarette = false;
private static boolean hasTakeout = false;
private static final ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteSet = lock.newCondition();
static Condition waitTakeoutSet = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
lock.lock();
try {
log.debug("有烟没?[{}]", hasCigarette);
while (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
waitCigaretteSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("烟来咯, 可以开始干活了");
} finally {
lock.unlock();
}
}, "小南").start();
new Thread(() -> {
lock.lock();
try {
log.debug("外卖送到没?[{}]", hasTakeout);
while (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
waitTakeoutSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外卖来咯, 可以开始干活了");
} finally {
lock.unlock();
}
}, "小女").start();
Thread.sleep(1000);
new Thread(() -> {
lock.lock();
try {
log.debug("送外卖的来咯~");
hasTakeout = true;
waitTakeoutSet.signal();
} finally {
lock.unlock();
}
}, "送外卖的").start();
Thread.sleep(1000);
new Thread(() -> {
lock.lock();
try {
log.debug("送烟的来咯~");
hasCigarette = true;
waitCigaretteSet.signal();
} finally {
lock.unlock();
}
}, "送烟的").start();
}
}
运行结果
2022-03-11 23:31:22 [小南] - 有烟没?[false]
2022-03-11 23:31:22 [小南] - 没烟,先歇会!
2022-03-11 23:31:22 [小女] - 外卖送到没?[false]
2022-03-11 23:31:22 [小女] - 没外卖,先歇会!
2022-03-11 23:31:23 [送外卖的] - 送外卖的来咯~
2022-03-11 23:31:23 [小女] - 外卖来咯, 可以开始干活了
2022-03-11 23:31:24 [送烟的] - 送烟的来咯~
2022-03-11 23:31:24 [小南] - 烟来咯, 可以开始干活了
Process finished with exit code 0
2.同步模式之顺序控制
2.1 固定运行顺序
比如必须先打印2再打印1
使用wait/notify来实现顺序打印 2, 1
@Slf4j(topic = "z.SyncPrintWaitTest")
public class SyncPrintWaitTest {
public static final Object lock = new Object();
public static boolean t2Runned = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock) {
while (!t2Runned) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("1");
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (lock) {
log.debug("2");
t2Runned = true;
lock.notify();
}
}, "t2");
t1.start();
t2.start();
}
}
使用LockSupport中的park/unpark
@Slf4j(topic = "z.SyncPrintWaitTest")
public class SyncPrintWaitTest {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
LockSupport.park();
log.debug("1");
}, "t1");
t1.start();
new Thread(() -> {
log.debug("2");
LockSupport.unpark(t1);
}, "t2").start();
}
}
使用ReentrantLock的await/signal
@Slf4j
public class SyncPrintWaitTest2 {
private static final ReentrantLock LOCK = new ReentrantLock();
static Condition condition = LOCK.newCondition();
public static boolean t2Runned = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
LOCK.lock();
try {
while (!t2Runned) {
condition.await();
}
log.debug("1");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
},"t1");
Thread t2 = new Thread(() -> {
LOCK.lock();
try {
log.debug("2");
t2Runned = true;
condition.signal();
} finally {
LOCK.unlock();
}
},"t2");
t1.start();
t2.start();
}
}
2.2 交替输出
线程1输出a5次,线程2输出b5次,线程3输出c5次。现在要求输出 abcabcabcabcabc怎么实现
wait/notify版本
@Slf4j
public class AlternatePrint {
public static void main(String[] args) {
WaitNotify waitNotify = new WaitNotify(1, 5);
new Thread(() -> {
waitNotify.print("a", 1, 2);
}, "a线程").start();
new Thread(() -> {
waitNotify.print("b", 2, 3);
}, "b线程").start();
new Thread(() -> {
waitNotify.print("c", 3, 1);
}, "c线程").start();
}
}
class WaitNotify {
private int flag;
private int loopNumber;
public WaitNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
public void print(String str, int waitFlag, int nextFlag) {
for (int i = 0; i < loopNumber; i++) {
synchronized (this) {
while (waitFlag != flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
flag = nextFlag;
this.notifyAll();
}
}
}
}
abcabcabcabcabc
Process finished with exit code 0
await/signal版本
@Slf4j
public class AlternatePrintTest2 {
public static void main(String[] args) throws InterruptedException {
AwaitSignal awaitSignal = new AwaitSignal(5);
Condition a_condition = awaitSignal.newCondition();
Condition b_condition = awaitSignal.newCondition();
Condition c_condition = awaitSignal.newCondition();
new Thread(() -> {
awaitSignal.print("a", a_condition, b_condition);
}, "a").start();
new Thread(() -> {
awaitSignal.print("b", b_condition, c_condition);
}, "b").start();
new Thread(() -> {
awaitSignal.print("c", c_condition, a_condition);
}, "c").start();
Thread.sleep(1000);
System.out.println("开始...");
awaitSignal.lock();
try {
a_condition.signal();
} finally {
awaitSignal.unlock();
}
}
}
class AwaitSignal extends ReentrantLock {
private final int loopNumber;
public AwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
public void print(String str, Condition current, Condition next) {
for (int i = 0; i < loopNumber; i++) {
lock();
try {
try {
current.await();
System.out.print(str);
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
unlock();
}
}
}
}
LockSupport的park/unpark实现
@Slf4j
public class TestParkUnpark {
static Thread a;
static Thread b;
static Thread c;
public static void main(String[] args) {
ParkUnpark parkUnpark = new ParkUnpark(5);
a = new Thread(() -> {
parkUnpark.print("a", b);
}, "a");
b = new Thread(() -> {
parkUnpark.print("b", c);
}, "b");
c = new Thread(() -> {
parkUnpark.print("c", a);
}, "c");
a.start();
b.start();
c.start();
LockSupport.unpark(a);
}
}
class ParkUnpark {
private final int loopNumber;
public ParkUnpark(int loopNumber) {
this.loopNumber = loopNumber;
}
public void print(String str, Thread nextThread) {
for (int i = 0; i < loopNumber; i++) {
LockSupport.park();
System.out.print(str);
LockSupport.unpark(nextThread);
}
}
}
|