IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 数据结构与算法 -> 【源码篇】LinkedBlockingQueue源码超详细解读 -> 正文阅读

[数据结构与算法]【源码篇】LinkedBlockingQueue源码超详细解读

??导读

LinkedBlockingQueue和它的名字一样,它是一个由链表实现的有界阻塞队列,该队列按照先进先出的逻辑对队列进行排序。该队列使用了两把锁分别控制放入和取出,大大提高了并发效率,下面将对它的源码进行详细解读。

🚩基本构造

image-20220421143744032

阻塞队列本质上也可算是集合,因此最上层也实现了Collection接口,并提供一些集合的常用方法,AbstractQueue是提供Queue的基本实现,我们重点关注QueueBlockingQueue提供的api。

Queue类提供最基本的API

// 插入,失败返回异常
boolean add(E e);
// 插入,失败返回false
boolean offer(E e);
// 移除队列头部元素,队列为空异常
E remove();
// 移除队列头部元素,队列为空返回null
E poll();
// 查看头部元素,队列为空异常
E element();
// 查看头部元素,队列为空返回null
E peek();

BlockingQueue提供阻塞操作相关API

// 将元素插入队列,如果队列没有可用空间则等待
void put(E e);
// 将元素插入队列,如果队列没用可用空间则等待设定的时间
boolean offer(E e, long timeout, TimeUnit unit)// 移除头部元素,如果没有可用则等待
E take();
// 移除头部元素,如果没有可用则等待设定的时间
E poll(long timeout, TimeUnit unit);
// 返回剩余可插入的元素数量
int remainingCapacity();
// 从队列中取出全部的元素并插入到指定集合中
int drainTo(Collection<? super E> c);
// 从队列中取出指定数量的元素并插入到指定集合中
int drainTo(Collection<? super E> c, int maxElements);

📖核心源码解读

LinkedBlockingQueue分别使用了一个读锁和一个写锁来控制并发,并使用Condition来控制他们的执行过程

// 读锁
private final ReentrantLock takeLock = new ReentrantLock();
// 队列不为空的Condition
private final Condition notEmpty = takeLock.newCondition();
// 写锁
private final ReentrantLock putLock = new ReentrantLock();
// 队列没有满的Condition
private final Condition notFull = putLock.newCondition();

put方法

将元素插入队列,如果队列没有可用空间则等待

public void put(E e) throws InterruptedException {
    // 如果元素是null抛出异常
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    // 使用写锁
    final ReentrantLock putLock = this.putLock;
    // 元素数量计数器
    final AtomicInteger count = this.count;
    // 获得锁
    putLock.lockInterruptibly();
    try {
        // 判断队列是否已满,如果已满写线程等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 将尾部元素指向当前元素
        enqueue(node);
        // 元素数量+1并返回操作前的数量
        c = count.getAndIncrement();
        // 如果元素数量没有满,则唤醒notFull.wait(),表示当前队列未满
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 解锁
        putLock.unlock();
    }
    if (c == 0)
        // 如果操作前元素数量为0,则通知写线程
        signalNotEmpty();
}

此处signalNotEmpty();就是通知被阻塞的读线程(如take/poll方法),队列里有数据了,赶紧消费

poll/take方法

poll 查看头部元素,队列为空异常

take 移除并返回头部元素,如果没有可用则等待

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    // 等待纳秒数
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    // 使用读锁
    final ReentrantLock takeLock = this.takeLock;
    // 加锁
    takeLock.lockInterruptibly();
    try {
        // 如果当前队列中没有元素则等待指定时长
        while (count.get() == 0) {
            if (nanos <= 0)
                // 等待超时,直接返回null
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 移除队列头中的节点并返回
        x = dequeue();
        // 元素-1
        c = count.getAndDecrement();
        // 如果队列中有数据,则通知其他线程该队列不为空
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 通知其他线程表示该队列未满
    if (c == capacity)
        signalNotFull();
    return x;
}

此处signalNotFull();是通知阻塞的写入线程(如put/offer),表示队列没满,可以写入

take逻辑与poll类似,只是等待策略不相同,take方法如下

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 等待逻辑与poll不一样,此处表示如果没有数据则一直等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

drainTo方法

从队列中取出全部的元素并插入到指定集合中

public int drainTo(Collection<? super E> c, int maxElements) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    boolean signalNotFull = false;
    // 使用读锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 读取数量不能大于剩余元素数量
        int n = Math.min(maxElements, count.get());
        // 从头部开始读取,h表示当前头节点
        Node<E> h = head;
        int i = 0;
        try {
            while (i < n) {
                Node<E> p = h.next;
                c.add(p.item);
                p.item = null;
                h.next = h;
                h = p;
                ++i;
            }
            return n;
        } finally {
            if (i > 0) {
                // 有读取出来元素,则更新最新头节点
                head = h;
                // 队列是否还有空间
                signalNotFull = (count.getAndAdd(-i) == capacity);
            }
        }
    } finally {
        takeLock.unlock();
        if (signalNotFull)
            signalNotFull();
    }
}

remove方法

public boolean remove(Object o) {
    if (o == null) return false;
    // 加锁
    fullyLock();
    try {
        // 遍历全部元素
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                // 移除链接
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

注意一下此处的加锁逻辑

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

可以看到,remove方法会将读写锁都上锁,并且会扫描整个链表,时间复杂度为O(n)+悲观锁

一般情况下不建议使用remove方法,该方法性能较差,会阻塞所有核心逻辑。

??注意事项

使用LinkedBlockingQueue时要额外注意影响性能的方法

如:remove/contains/toArray/toString/clear

以上方法的时间复杂度均为O(n)+悲观锁,如非必要最好不要使用

🚗应用场景

LinkedBlockingQueue本质上就是个内存级队列,它同样可以达到削峰填谷的目的,使用得当可以给系统减轻不小的压力。

  1. 调度外部服务,防止调用过于频繁,可以放入队列中,等待消费,并用drainTo归集然后统一请求。
  2. 令牌桶,可以通过产生令牌和分发令牌的方式控制业务/接口最大并发。
  3. 使用对象池化技术来减轻jvm回收的压力,将池化对象放入队列中。

下面使用LinkedBlockingQueue实现一个对象池,使用对象池可以防止频繁创建/回收对象,减少gc次数,池化对象长期存储在老年代中,对象数量可控

ResourcePool 对象池抽象类,实现该类就能初始化一个对象池

public abstract class ResourcePool<T extends ResourceModel> {

    private final LinkedBlockingQueue<T> queue;

    public ResourcePool(int poolMax) {
        queue = new LinkedBlockingQueue<>(poolMax);
        for (int i = 0; i < poolMax; i++) {
            T model = createResource();
            model.pool = this;
            model.invalid = true;
            queue.add(model);
        }
    }

    public T getResource() {
        try {
            do {
                T t = queue.take();
                if (t.invalid) {
                    t.invalid = false;
                    return open(t);
                }
            } while (true);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    protected T open(T t) {
        return t;
    }

    protected abstract T createResource();

    public void free(T t) {
        if (!t.invalid) {
            t.invalid = true;
            queue.offer(close(t));
        }
    }

    protected T close(T t) {
        return t;
    }

}

ResourceModel抽象对象

public abstract class ResourceModel implements Closeable {

    ResourcePool pool;

    boolean invalid;

    @Override
    public void close() throws IOException {
        pool.free(this);
    }
}

TestModel对象实例

@Setter
public class TestModel extends ResourceModel {

    public TestModel(String name, int age) {
        this.name = name;
        this.age = age;
    }

    private String name;

    private int age;

}

TestPool对象池实例

public class TestPool extends ResourcePool<TestModel> {

    public TestPool(int poolMax) {
        super(poolMax);
    }

    // 创建对象实例
    @Override
    protected TestModel createResource() {
        return new TestModel("", 0);
    }

    // 获得对象的前置操作
    @Override
    protected TestModel open(TestModel testModel) {
        return super.open(testModel);
    }

    // 对象回收后操作
    @Override
    protected TestModel close(TestModel testModel) {
        testModel.setAge(0);
        testModel.setName("");
        return super.close(testModel);
    }
}

使用方式1

public static void main(String[] args) throws IOException {
    TestPool testPool = new TestPool(30);
    // 从池中获得一个对象
    TestModel model = testPool.getResource();
    // 回收对象
    model.close();
}

使用方式2

public static void main(String[] args) throws IOException {
    TestPool testPool = new TestPool(30);
    try(TestModel model = testPool.getResource()) {
        
    }
}
  数据结构与算法 最新文章
【力扣106】 从中序与后续遍历序列构造二叉
leetcode 322 零钱兑换
哈希的应用:海量数据处理
动态规划|最短Hamilton路径
华为机试_HJ41 称砝码【中等】【menset】【
【C与数据结构】——寒假提高每日练习Day1
基础算法——堆排序
2023王道数据结构线性表--单链表课后习题部
LeetCode 之 反转链表的一部分
【题解】lintcode必刷50题<有效的括号序列
上一篇文章      下一篇文章      查看所有文章
加:2022-04-22 19:01:27  更:2022-04-22 19:04:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/6 19:22:52-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码