多线程问题在日常的开发工作中很是常见,同时也是很重要的。多线程本身比较复杂,调试困难,在提高效率的同时,对线程安全也有更高的要求,因此多线程之间的同步是多线程问题的一个很重要的知识点。在面试过程中,也是判断候选人能力的一个很重要的考察点。
本文针对leetcode上的一些多线程同步问题进行分析和总结,并尝试对同一问题使用常见的多线程同步的不同解决方法,以期能够熟练掌握多线程同步(通信)相关内容。
基础知识
为保证线程安全,多个线程同一时刻只能有一个线程进入临界区,其他线程则等待,直到该线程退出临界区,所有线程重新开始竞争。为实现这种同步机制,Java中常用的方法有:
- 等待/通知机制
- 信号量
- CountDownLatch
- CyclicBarrier
等待/通知机制 wait()/notifyAll()
wait()是对象的一个通用方法,调用该方法的前提是在同步方法或者同步块中(配合synchronized 关键字使用),即调用该方法的线程持有该对象的锁。当线程调用并执行wait()后,会使得当前线程阻塞,同时会释放当前的锁,让出CPU。
那调用wait()的线程什么时候继续往下执行呢?即线程什么时候被唤醒呢?这就得依赖notify/notifyAll()方法了。
持有对象锁的线程调用该对象的notify/notifyAll方法后,会唤醒沉睡的线程,但不会立即释放锁,只有退出当前同步方法或者代码块后才会释放。
notify和notifyAll的区别在于前者只会唤醒一个等待该对象锁的线程,notifyAll 会唤醒所有线程。
wait()和sleep()的区别:
-
类不同 sleep是Thread的静态方法,wait是定义在对象上的方法 -
调用方式 wait依赖于同步,只能在同步方法或者同步代码块内,而sleep可以直接调用 -
释放锁 sleep只是暂时让出CPU的执行权,不释放锁 wait则是会释放锁
信号量 Semaphore
Semaphore 包含一组许可证,当许可证数量大于0时,线程可以继续往下执行,当许可证数为0时,那么线程会阻塞在这里,等待这个许可证被释放。
使用方法:首先需要确定需要使用几个信号量来进行,每个信号量对应初始的许可证是多少,最先开始的执行的线程是哪个,下一个被唤醒的阻塞线程是哪个,由谁来释放。
总结起来就是:确定初始值,哪个信号量最新执行,以及获得某个许可后,释放其他的许可,让其他的线程执行。
CountDownLatch
作用:一个线程等待await()其它线程执行完并调用countDown()方法发出通知后,当前才可以执行
CyclicBarrier
作用:一组线程互相等待,直到所有线程都准备好(全部调用进入await()方法之后),所有线程同时开始执行。
CyclicBarrier 可以配合 Semaphore信号量来控制程序的执行顺序,进行多线程的同步
需要确定有几个线程在执行
实战
多线程之间的同步,首先要明确需要同步的是什么,是一段代码块还是一个数据;然后明确初始线程及线程的执行顺序,以及如何实现前一个线程执行完了,通知下一个线程执行相应的任务。
三个线程按照一定顺序打印字符串,每个线程只打印一次。
要同步的三个线程的先后执行顺序,首先执行的是first线程,而second和third线程不能执行,当first线程执行完毕后,通知下一个线程second执行,因此需要有一个变量来记录first线程执行完毕并改变状态,让一开始阻塞的second线程开始执行,third线程也是如此,依赖于second线程执行完毕。
class Foo {
boolean first = false;
boolean second = false;
Object obj = new Object();
public Foo() {
}
public void first(Runnable printFirst) throws InterruptedException {
synchronized (obj) {
first = true;
printFirst.run();
obj.notifyAll();
}
}
public void second(Runnable printSecond) throws InterruptedException {
synchronized (obj) {
while (!first) {
obj.wait();
}
printSecond.run();
second = true;
obj.notifyAll();
}
}
public void third(Runnable printThird) throws InterruptedException {
synchronized (obj) {
while (!second) {
obj.wait();
}
printThird.run();
obj.notifyAll();
}
}
}
其实也可以用信号量来解决
public class Foo_1114_Semaphore {
public Foo_1114_Semaphore() {
}
Semaphore first = new Semaphore(0);
Semaphore second = new Semaphore(0);
public void first(Runnable printFirst) throws InterruptedException {
printFirst.run();
first.release();
}
public void second(Runnable printSecond) throws InterruptedException {
first.acquire();
printSecond.run();
second.release();
}
public void third(Runnable printThird) throws InterruptedException {
second.acquire();
printThird.run();
}
}
相对于1114,本问题需要循环交替执行,线程1执行完了通知线程2,线程2执行完了通知线程1,因此需要有两个变量来进行线程之间的切换,可以使用布尔变量和同步等待通知机制,也可以使用信号量。
方法1:同步等待/通知
public class FooBar_1115 {
private int n;
public FooBar_1115(int n) {
this.n = n;
}
Object obj = new Object();
boolean first_flag = false;
boolean second_flag = true;
public void foo(Runnable printFoo) throws InterruptedException {
for (int i = 0; i < n; i++) {
synchronized (obj){
while (!second_flag){
obj.wait();
}
printFoo.run();
obj.notifyAll();
first_flag = true;
second_flag = false;
}
}
}
public void bar(Runnable printBar) throws InterruptedException {
for (int i = 0; i < n; i++) {
synchronized (obj){
while (!first_flag){
obj.wait();
}
printBar.run();
obj.notifyAll();
second_flag = true;
first_flag = false;
}
}
}
}
方法2:信号量
初始时线程1执行,因此线程1先要获取一个许可,线程2没有许可阻塞,线程1获取一个许可,许可数为0阻塞,同时释放线程2要的许可证,线程2继续执行。
public class FooBar_1115_Semaphore {
private int n;
public FooBar_1115_Semaphore(int n) {
this.n = n;
}
Semaphore foo = new Semaphore(1);
Semaphore bar = new Semaphore(0);
public void foo(Runnable printFoo) throws InterruptedException {
for (int i = 1; i <= n; i++) {
foo.acquire();
printFoo.run();
bar.release();
}
}
public void bar(Runnable printBar) throws InterruptedException {
for (int i = 0; i < n; i++) {
bar.acquire();
printBar.run();
foo.release();
}
}
三个线程一个打印0,一个打印奇数,一个打印偶数。初始线程打印0,因此打印0的线程有一个初始许可,打印奇数和偶数的线程都被阻塞,打印0之后根据当前的值来判断具体是释放奇数线程的许可,还是偶数线程的许可。
import java.util.concurrent.Semaphore;
import java.util.function.IntConsumer;
class ZeroEvenOdd {
private int n;
private Semaphore zero = new Semaphore(1);
private Semaphore even = new Semaphore(0);
private Semaphore odd = new Semaphore(0);
public ZeroEvenOdd(int n) {
this.n = n;
}
public void zero(IntConsumer printNumber) throws InterruptedException {
for (int i = 1;i <= n; i++){
zero.acquire();
printNumber.accept(0);
if(i % 2 == 1){
odd.release();
}else{
even.release();
}
}
}
public void even(IntConsumer printNumber) throws InterruptedException {
for (int i = 2; i <= n;i += 2){
even.acquire();
printNumber.accept(i);
zero.release();
}
}
public void odd(IntConsumer printNumber) throws InterruptedException {
for (int i = 1; i <= n; i += 2){
odd.acquire();
printNumber.accept(i);
zero.release();
}
}
}
一个产生H原子的线程, 一个产生O原子的线程,当有2个H原子和1个O原子后会结合一个水分子。
当前如果产生了2个H原子,那么该线程会阻塞,等到一个O原子的产生,才会继续下一轮的数据生成,这个就和屏障的概念一致了,因此可以使用CyclicBarrier。
使用信号量和屏障来进行线程之间的同步,所有线程都进行等待,直到所有线程都准备好(全部调用进入await()方法之后),所有线程同时开始执行。
public class H2O_1117_CyclicBarrier {
public H2O_1117_CyclicBarrier() {
}
public final static int CNT = 10;
Semaphore hy = new Semaphore(2);
Semaphore ox = new Semaphore(1);
CyclicBarrier barrier = new CyclicBarrier(3);
public void hydrogen(Runnable releaseHydrogen) throws InterruptedException {
hy.acquire();
try {
if(barrier.getNumberWaiting()==2){
System.out.print("|");
}
barrier.await();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
releaseHydrogen.run();
hy.release();
}
public void oxygen(Runnable releaseOxygen) throws InterruptedException {
ox.acquire();
try {
if(barrier.getNumberWaiting()==2){
System.out.print("|");
}
barrier.await();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
releaseOxygen.run();
ox.release();
}
}
总共有4个线程,初始线程为打印数字,因此需要给该线程初始1个许可,然后根据下一个数字来判断哪一个线程将获取下一个许可
public class FizzBuzz_1195_Update {
private int n;
public FizzBuzz_1195_Update(int n) {
this.n = n;
}
Semaphore fizz = new Semaphore(0);
Semaphore buzz = new Semaphore(0);
Semaphore fizzbuzz = new Semaphore(0);
Semaphore number = new Semaphore(1);
Semaphore main = new Semaphore(0);
public void fizz(Runnable printFizz) throws InterruptedException {
for (int i = 1; i <= n; i++) {
if (i % 5 != 0 && i % 3 == 0) {
fizz.acquire();
printFizz.run();
number.release();
if(i == n){
main.release();
}
}
}
}
public void buzz(Runnable printBuzz) throws InterruptedException {
for (int i = 1; i <= n; i++) {
if (i % 3 != 0 && i % 5 == 0) {
buzz.acquire();
printBuzz.run();
number.release();
if(i == n){
main.release();
}
}
}
}
public void fizzbuzz(Runnable printFizzBuzz) throws InterruptedException {
for (int i = 1; i <= n; i++) {
if (i % 15 == 0) {
fizzbuzz.acquire();
printFizzBuzz.run();
number.release();
if(i == n){
main.release();
}
}
}
}
public void number(IntConsumer printNumber) throws InterruptedException {
for (int i = 1; i <= n; i++) {
number.acquire();
if (i % 3 != 0 && i % 5 != 0) {
printNumber.accept(i);
number.release();
} else if (i % 5 != 0 && i % 3 == 0) {
fizz.release();
} else if (i % 3 != 0 && i % 5 == 0) {
buzz.release();
} else {
fizzbuzz.release();
}
if(i == n){
main.release();
}
}
}
}
补充问题
由5个线程来对一个数从0开始进行累加,累加到1000,加至1000后,通知主线程,打印任务完成
分析:多个线程需要对一个整形变量加1,这个加1操作应该在临界区进行,需要进行同步
- 多个个线程在执行计数操作,每次只有一个线程能够进入临界区,进行加1操作,因此需要一个许可,当线程进入时,获取这个许可,许可证数为0,其他线程就在等待,
直到获取了许可证的线程完成加1操作并释放许可证后,线程重新开始竞争这个许可证 - 主线程一开始就要阻塞,直到所有任务都完成后,才打印任务完成,那如何感知呢?因此需要一个信号量来告诉主线程,任务完成。
- 当前线程获取到一个许可,就进行后续操作,如果当前计数小于指定值,那么做+1操作,否则说明多线程计数全部执行完毕,释放主线程的信号量,主线程不再阻塞
public class CountNumNew {
public volatile int cnt = 0;
final static int TOTAL = 1000;
Semaphore sema = new Semaphore(1);
Semaphore main = new Semaphore(0);
public void addNum() throws InterruptedException {
for(int i=1;i<=TOTAL;i++){
sema.acquire(1);
if (cnt < TOTAL) {
cnt = cnt + 1;
System.out.println(Thread.currentThread().getName() + " cnt=" + cnt);
} else {
main.release();
}
sema.release();
}
}
public static void main(String[] args) throws InterruptedException {
CountNumNew c = new CountNumNew();
new Thread(() -> {
try {
c.addNum();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
c.addNum();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
c.addNum();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
c.addNum();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
c.addNum();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
c.main.acquire();
System.out.println("main thread finished");
}
}
最后
本文主要对多线程同步问题进行了总结,同时运用总结的方法对leetcode一些具体的例子来分析,以期能够掌握同类问题,同时在生产实践中能够进行运用。
好久没有更博客了,需要加强输入和总结,期望以后能够每周更新一篇,与君共勉!
本文针对多线程同步问题进行了分析和总结,掌握一种方法,解决一类问题
更多信息欢迎关注个人公众号:惊鸿只为卿
|