IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> BlockingQueue底层原理以及AQS源代码解析 -> 正文阅读

[网络协议]BlockingQueue底层原理以及AQS源代码解析

BlockingQueue

特征

队列是一种存储数据的数据结构,符合先进先出(FIFO)的原则。阻塞队列BlockingQueue是Java.concurrent.util包下的并发容器,除了符合队列的特点之外,还是线程安全的,保证在一个JVM中同一时刻只会有一个线程进行入队和出队操作。适用于解决并发生产者 - 消费者问题,在源代码的注释中有生产-消费示例:

	class Producer implements Runnable {
    private final BlockingQueue queue;
    Producer(BlockingQueue q) { queue = q; }
    public void run() {
      try {
        while (true) { queue.put(produce()); }
      } catch (InterruptedException ex) { ... handle ...}
    }
    Object produce() { ... }
  }
 
  class Consumer implements Runnable {
    private final BlockingQueue queue;
    Consumer(BlockingQueue q) { queue = q; }
    public void run() {
      try {
        while (true) { consume(queue.take()); }
      } catch (InterruptedException ex) { ... handle ...}
    }
    void consume(Object x) { ... }
  }
 
  class Setup {
    void main() {
      BlockingQueue q = new SomeQueueImplementation();
      Producer p = new Producer(q);
      Consumer c1 = new Consumer(q);
      Consumer c2 = new Consumer(q);
      new Thread(p).start();
      new Thread(c1).start();
      new Thread(c2).start();
    }
  }}

BlockingQueue中put / take方法支持阻塞的入队和出队操作:

  • 当阻塞队列满时,如果生产者put元素,队列则会一直阻塞生产者,直到队列可用或者响应中断退出;
  • 当阻塞队列为空,如果消费者take元素,队列则会一直阻塞消费者,直到队列不为空。

具体实现

Java中提供了很多实现BlockingQueue的阻塞队列,常见的比如:

  • ArrayBlockingQueue,由数组实现的有界阻塞队列;
  • LinkedBlockingQueue,用链表实现的有界阻塞队列(理论上有界,容量为Integer.MAX_VALUE);
  • PriorityBlockingQueue,支持优先级的无界阻塞队列;
  • DelayQueue,支持延时获取元素的无界阻塞队列;

BlockingQueue结构关系图:

image-20211208174451457

ArrayBlockingQueue

以ArrayBlockingQueue为例,分析put / take方法如何支持阻塞操作。

BlockingQueue queue = new ArrayBlockingQueue(10);//创建实例
queue.put(produce());//入队
queue.take();//出队

实例化

ArrayBlockingQueue构造方法以及核心属性:

...
final Object[] items;//底层数组
...
final ReentrantLock lock;//锁
private final Condition notEmpty;//出队需要等待的条件
private final Condition notFull;//入队需要等待的条件
...
public ArrayBlockingQueue(int capacity) {this(capacity, false);}

//capacity,队列容量
//fair,是否支持公平
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
  	//初始化数组,指定容量
    this.items = new Object[capacity];
  	//初始化ReentrantLock,支持公平锁和非公平锁
    lock = new ReentrantLock(fair);
  	//锁的条件对象
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

入队

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
          	//容器已满,入队需阻塞等待
            notFull.await();
      	//入队操作
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
      	putIndex = 0;
    count++;
  	//容器中已经有元素,通知消费者取
  	//take方法中如果容器已空,会notEmpty.await()
    notEmpty.signal();
}

出队

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
          	//容器已空,出队需阻塞等待
            notEmpty.await();
      	//出队操作
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
      	takeIndex = 0;
    count--;
    if (itrs != null)
      	itrs.elementDequeued();
  	//容器已经有空间,通知生产者放
  	//put方法中如果容器已满,会notFull.await()
    notFull.signal();
    return x;
}

Condition

Condition是Java.concurrent.util下提供的一个接口,使线程等待某个条件,当该条件满足时,唤醒等待的线程。主要提供了两类方法:线程等待和线程唤醒。

public interface Condition {
  
    void await() throws InterruptedException;

    void awaitUninterruptibly();

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    boolean await(long time, TimeUnit unit) throws InterruptedException;

    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();

    void signalAll();
}

AbstractQueuedSynchronizer有一个内部类ConditionObject,实现Condition接口,并重写await和signal。

await

把获取到锁的线程添加到条件等待队列中阻塞,并释放锁。

    public class ConditionObject implements Condition, java.io.Serializable {
        ...
        private transient Node firstWaiter;
        private transient Node lastWaiter;
      	...
        public final void await() throws InterruptedException {
          	//线程中断则抛异常
            if (Thread.interrupted())
                throw new InterruptedException();
          	//添加线程到条件等待队列中
          	//条件等待队列中的线程不可以获取锁
          	//获取锁必须是在同步等待队列(CLH)中且前驱节点状态为-1
            Node node = addConditionWaiter();
          	//通过release释放锁,同时该线程所在CLH中的节点被移除;并唤醒CLH中头节点后面的线程
          	//如果释放失败,则将节点标记为CANCELLED
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //判断节点是否在CLH中
            while (!isOnSyncQueue(node)) {
              	//线程阻塞
                LockSupport.park(this);
              	//如果线程被中断唤醒,则跳出循环
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
          	//通过acquireQueued在CLH中尝试获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
          	//线程中断的处理
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
      
      
      	...
        private Node addConditionWaiter() {
            Node t = lastWaiter;
          	//判断清除无效队列
          	//就是找出条件队列中最后一个没有被取消的节点,更新为lastWaiter
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
          	//用当前线程创建新的节点Node,状态waitStatus = CONDITION = -2;
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
          	//进入条件等待队列,firstWaiter / lastWaiter分别指向队首 / 队尾
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

signal

把条件等待队列中的节点移到同步等待队列(CLH)的后面,让其重新等待锁的获取。

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
  	//获取条件队列的第一个节点
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}


private void doSignal(Node first) {
  	//通过do...while循环,唤醒条件等待队列中节点
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
          	lastWaiter = null;
        first.nextWaiter = null;
      //如果唤醒第一个节点失败,而且条件队列中还有其他节点,就从前往后继续尝试其他节点
      //直到某一个节点唤醒成功或者等待队列中没有节点需要唤醒
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}


final boolean transferForSignal(Node node) {
  	//CAS更新节点的waitStatus为0,更新成功往下执行,失败则返回
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
      	return false;
		//CLH的入队操作,添加到队尾
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
      	LockSupport.unpark(node.thread);
    return true;
}
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-12-10 11:24:18  更:2021-12-10 11:26:11 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/8 6:18:26-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码