IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 《JUC》CyclicBarrier原理/源码解析 -> 正文阅读

[Java知识库]《JUC》CyclicBarrier原理/源码解析

一、概述

在这里插入图片描述

1、作用?

允许一组线程互相等待,直到到达某个公共屏障(barrier)点。

  • 因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。

2、使用场景?

用于多线程计算数据,最后合并计算结果的场景。

3、常用类方法?

  • await():告诉CyclicBarrier,线程已经到达了屏障,计数减一;然后阻塞线程,直到count为0。
  • reset():重置CyclicBarrier的未加入到party的数量count和当前代Generation。

4、案例

public class CyclicBarrierTest {

    public static void main(String[] args) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

        new Thread(() -> {
            System.out.println("work thread start + 1");
            try {
                TimeUnit.SECONDS.sleep(1);
                cyclicBarrier.await();
                // 睡一会,让主线程先干活,work thread再继续
                TimeUnit.MILLISECONDS.sleep(100);
                System.out.println("main thread OK, work thread go on!");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "work-thread").start();

        // 主线程在这等着,等其他线程调用await()方法之后,大家再一起执行
        cyclicBarrier.await();
        System.out.println("main end");
    }
}

二、原理

1)CyclicBarrier中一个generation代表了每一代,通过这个实现CyclicBarrier的复用。

  • parties变量用来表示参与party的线程数;
  • count变量代表了还没到party的线程数;
  • 外加个ReentrantLock锁和一个Condition条件变量实现线程的并发和阻塞。

2)在CyclicBarrier类的内部有一个计数器count,每个线程在到达屏障点的时候都会调用await()方法将自己阻塞排队,并将计数器count减1;
3)当计数器count减为0的时候,所有因调用await()方法而被阻塞的线程将被唤醒
4)线程的排队进入party通过ReentrantLock实现;进入party后睡眠等待所有参会者通过锁的条件等待Condition实现

三、源码解析

1、成员变量和构造器

  1. CyclicBarrier内部是通过条件队列trip对线程进行阻塞;
  2. 两个int型的变量partiescount
  • parties表示每次拦截的线程数,即party的所有参会者
  • count表示还未拦截的线程数,即还有多少参会者没到party;它的初始值和parties相同,调用await()方法减1,减为0时将所有阻塞在条件变量上线程唤醒。
  1. 静态内部类Generation,代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待
  2. barrierCommand,表示换代前执行的任务。当前代结束会执行该任务,然后自动开启下一代
public class CyclicBarrier {
    // 该类用于CyclicBarrier的一次协同是否完成(正常完成、异常完成)
    // reset后会复用该结构,每一次的party都会生成一个该类的新实例
    private static class Generation {
        // 当前party有没有被强制中断,false表示没有
        boolean broken = false;
    }

    // 同步锁,线程进入 条件等待 时需要获取锁
    private final ReentrantLock lock = new ReentrantLock();
    // 用于阻塞线程的条件变量(有未到party的线程,已到party的线程则等待在该条件变量上)
    private final Condition trip = lock.newCondition();
    // 参与party的人数
    private final int parties;
    // 当所有的线程都参与到了party中, 回调的方法
    private final Runnable barrierCommand;
    // 当前party
    private Generation generation = new Generation();

    // 还未到party的线程数
    private int count;
    
    // 构造器
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
}

2、await()方法

CyclicBarrier的核心方法是await(),该方法是线程相互等待的关键,它有两种实现:一种是带等待超时的,一种是不带等待超时,本质上都是调用了同一个方法dowait(),只是带等待超时的多传了一个时间。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 直接调用自己的dowait()方法
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

CyclicBarrier#dowait()方法

  • 首先因为await()是同步的,需要先加互斥锁ReentrantLock;
  1. 每次进来都将count减1,减完立马进行判断看看count是否等于0:
  • 如果等于0,则执行换代前要执行的任务barrierCommand,然后唤醒所有阻塞等待的线程,接着自动进入CyclicBarrier的下一代;将计数器count的值重新设为parties。如果barrierCommand运行异常,则打破栅栏的当前代,唤醒所有阻塞等待的线程。
  • count不等于0,这进入for循环:
  • 不是超时等待,直接调用Condition.await()阻塞当前线程。
  • 是超时等待,就在nanos时间内循环竞争锁;
  • 如果当前线程在await()获取锁过程中被中断了:
  1. 在当前代还没结束之前打破栅栏,即游戏在中途被打断,则设置generation的broken状态为true并唤醒所有线程。
  2. 当前代已经结束,则直接中断当前线程。
  1. 线程被唤醒后进行下面三个判断:
  1. 如果线程因为broken generation操作(即调用breakBarrier()方法)而被唤醒则抛出异常
    2.如果线程因为CyclicBarrier正常换代被唤醒,则返回计数器count的值
  2. 如果线程因为超时而被唤醒打破栅栏并抛出TimeOut异常
    注意:如果其中有一个线程因为等待超时而退出,那么整盘游戏也会结束,其他线程都会被唤醒
  1. 最后解锁。
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
    // 上锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 保存当前party时的generation快照,generation更新后不会影响这里
        final Generation g = generation;

        // 当party被强制中断时,抛出异常
        if (g.broken)
            throw new BrokenBarrierException();

        // 判断当前线程是否被中断
        if (Thread.interrupted()) {
            // 有中断的线程混入其中:
            // 1)broken当前generation;
            // 2)唤醒CyclicBarrier阻塞的所有线程;重新开始,注意:此时没有改变party的Generation
            // 3)抛出线程中断异常
            breakBarrier();
            throw new InterruptedException();
        }

        // 每次都将计数器的值减1,即未到场的参会人个数减一。
        int index = --count;
        // 当前线程是最后一个到达party的线程时,回调barrierCommand,然后唤醒所有阻塞在条件变量上的线程。
        // 若回调barrierCommand正常完成,则不需要手动调用reset()就可进入新的一代,因为这里调用了nextGeneration()
        if (index == 0) {
            boolean ranAction = false;
            try {
                // 唤醒所有线程前先执行指定的任务
                final Runnable command = barrierCommand;
                if (command != null)
                    // 若回调方法运行出现异常,异常直接上抛
                    command.run();
                ranAction = true;
                // 唤醒所有阻塞的线程,并进入下一代party,修改generation。
                nextGeneration();
                return 0;
            } finally {
                // barrierCommand回调方法发生了异常,那么设置broker标志位,并唤醒所有阻塞的线程
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 循环等待最后一个参与party的线程 唤醒自己 或者 被中断 或者 等待超时
        for (;;) {
            try {
                // 根据传入的参数决定是定时等待还是非定时等待
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) { // 发生中断异常
                // 如果当前线程在CyclicBarrier的等待唤醒期间(即:g没有被改变)被中断,则中断generation,唤醒所有线程
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else { // 如果当前线程已经完成在CyclicBarrier上的等待(即:g被改变),则直接标志当前线程被中断
                    Thread.currentThread().interrupt();
                }
            }

			// 如果线程因为broken generation操作而被唤醒则抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果g != generation ,说明CyclicBarrier换代了(即:generation改变了),线程因此被唤醒的话,则返回还有多个参会者没进来,即计数器的值count。
            if (g != generation)
                return index;

            // 如果线程因为时间到了而被唤醒,这broken generation 并抛出异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

3、breakBarrier()方法:

意味着有人搞破坏,游戏中途结束,将所有的等待线程全部唤醒。

  • await()方法通过抛出BrokenBarrierException 异常返回;
private void breakBarrier() {
    // 中断当前generation,即打破栅栏
    generation.broken = true;
    // 设置 未参会者数量count 等于 所有需要参会者数量
    count = parties;
    // 唤醒所有阻塞的线程
    trip.signalAll();
}

4、nextGeneration()方法:

开启栅栏新的一代。

private void nextGeneration() {
    // 唤醒所有阻塞的线程
    trip.signalAll();
    // 设置 未参会者数量count 等于 所有需要参会者数量
    count = parties;
    // 生成栅栏的下一代Generation,这也是和breakBarrier()方法的区别
    generation = new Generation();
}

5、reset()方法:

重置一个栅栏

  • 打破栅栏 中断当前代,await()方法通过抛出BrokenBarrierException 异常返回;
  • 开始新的下一代,重置count和generation。
public void reset() {
    final ReentrantLock lock = this.lock;
    // 上锁
    lock.lock();
    try {
        breakBarrier();   // 将所有参与party的线程唤醒
        nextGeneration(); // 生成下一代
    } finally {
        lock.unlock();
    }
}

barrierCommand正常完成,则不需要手动调用reset()就可自动进入新的一代,因为运行barrierCommand之后调用了nextGeneration()。

四、总结

简单说就是一个ReentrantLock加上一个Condition条件变量实现并发控制和多个线程的阻塞等待。
并且采用多线程协作机制,在多个线程协作过程中,只要有一个线程被中断或者发生异常,则整个协作过程取消

CyclicBarrier和CountDownLatch相同的是,它们都能让多个线程协调在某一个节点上等待;下面我们看一下他们的区别?

2、CyclicBarrier和CountDownLatch的区别?

  • CountDownLatch的计数器只能使用一次;CyclicBarrier的计数器可以使用reset()方法重置进而循环使用
  • CyclicBarrier是多线程协作,当线程达到等待数量或者一个线程出现异常或被中断时自动放行;而CountDownLatch是多线程阻塞后,需要等待外界条件达到某种状态才会被统一唤醒。所以CyclicBarrier中只需要await()CountDownLatch还需要额外的countDown()唤醒操作。
  • CountDownLatch的性能大于CyclicBarrier,因为CountDownLatch是自己采用CAS,利用共享锁的原理实现;CyclicBarrier是采用ReentrantLock独占锁 +Condition条件变量实现;
  • 应用场景不同,CyclicBarrier能处理更为复杂的业务场景
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-11-14 21:29:54  更:2021-11-14 21:30:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 2:20:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码