IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> PHP知识库 -> Java集合(十二)LinkedBlockingQueue使用及源码分析 -> 正文阅读

[PHP知识库]Java集合(十二)LinkedBlockingQueue使用及源码分析

??本系列文章:
????Java集合(一)集合框架使用综述
????Java集合(二)ArrayList使用及源码分析
????JDK集合(三)LinkedList使用及源码分析
????JDK集合(四)Vector使用及源码分析
????JDK集合(五)Stack使用及源码分析
????JDK集合(六)HashMap使用及源码分析
????Java集合(七)Hashtable使用及源码分析
????Java集合(八)LinkedHashMap使用及源码分析
????Java集合(九)HashSet使用及源码分析
????Java集合(十)LinkedHashSet使用及源码分析
????Java集合(十一)ArrayBlockingQueue使用及源码分析
????Java集合(十二)LinkedBlockingQueue使用及源码分析

一、LinkedBlockingQueue概述

??LinkedBlockingQueue是一个基于链表实现的可选容量的阻塞队列。LinkedBlockingQueue的继承关系:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable

??从继承关系看,LinkedBlockingQueue的特点:

  1. 继承AbstractQueue,实现BlockingQueue接口,具备队列的相关操作。
  2. 实现Serializable接口,可以序列化。

二、LinkedBlockingQueue使用

2.1 方法介绍

  1. 创建一个 LinkedBlockingQueue ,容量为 Integer.MAX_VALUE
public LinkedBlockingQueue()
  1. 创建一个 LinkedBlockingQueue ,容量为 Integer.MAX_VALUE ,最初包含给定集合的元素,以集合的迭代器的遍历顺序添加
public LinkedBlockingQueue(Collection<? extends E> c)
  1. 创建一个具有给定(固定)容量的 LinkedBlockingQueue
public LinkedBlockingQueue(int capacity)
  1. 从这个队列中原子地删除所有的元素
public void clear()
  1. 如果此队列包含指定的元素,则返回 true
public boolean contains(Object o)
  1. 从该队列中删除所有可用的元素,并将它们添加到给定的集合中
public int drainTo(Collection<? super E> c)
  1. 最多从该队列中删除给定数量的可用元素,并将它们添加到给定的集合中
public int drainTo(Collection<? super E> c, int maxElements)
  1. 以适当的顺序返回该队列中的元素的迭代器
public Iterator<E> iterator()
  1. 如果可以在不超过队列的容量的情况下立即将其指定的元素插入到队列的尾部,如果队列已满,则返回 false
public boolean offer(E e)
  1. 在该队列的尾部插入指定的元素,必要时等待指定的等待时间才能使空间变得可用
public boolean offer(E e, long timeout, TimeUnit unit)
  1. 检索但不删除此队列的头元素,如果此队列为空,则返回 null
public E peek()
  1. 检索并删除此队列的头元素,如果此队列为空,则返回 null
public E poll()
  1. 检索并删除此队列的头元素,等待指定的等待时间(如有必要)使元素变为可用
public E poll(long timeout, TimeUnit unit)
  1. 在该队列的尾部插入指定的元素,如果需要,等待队列变为可用
public void put(E e)
  1. 返回此队列可以理想地(在没有内存或资源限制)的情况下接受而不阻止的附加元素数
public int remainingCapacity()
  1. 从该队列中删除指定元素的单个实例(如果存在)
public boolean remove(Object o)
  1. 返回此队列中的元素数
public int size()
  1. 检索并删除此队列的头元素,如有必要,等待元素可用
public E take()
  1. 以适当的顺序返回一个包含此队列中所有元素的数组
public Object[] toArray()
  1. 以适当的顺序返回包含此队列中所有元素的数组; 返回的数组的运行时类型是指定数组的运行时类型
public <T> T[] toArray(T[] a)
  1. 返回此集合的字符串表示形式
public String toString()

2.2 方法使用

        //1.构造方法
		LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>();
		ArrayList<String> arrayList = new ArrayList<>();
		LinkedBlockingQueue<String> linkedBlockingQueue2 = new LinkedBlockingQueue<>(arrayList);
		LinkedBlockingQueue<String> linkedBlockingQueue3 = new LinkedBlockingQueue<>(30);
		
		//2.删除所有的元素
		linkedBlockingQueue.add("aaa");
		linkedBlockingQueue.clear();
		System.out.println(linkedBlockingQueue); //[]
		
		//3.判断是否包含指定的元素
		linkedBlockingQueue.add("aaa");
		linkedBlockingQueue.add("bbb");
		linkedBlockingQueue.add("ccc");
		System.out.println(linkedBlockingQueue.contains("aaa")); //true
		
		//4.从该队列中删除所有可用的元素,并将它们添加到给定的集合中
		linkedBlockingQueue.drainTo(arrayList);
		System.out.println(arrayList); //[aaa, bbb, ccc]
		System.out.println(linkedBlockingQueue); //[]
		
		//5.最多从该队列中删除给定数量的可用元素,并将它们添加到给定的集合中
		linkedBlockingQueue.add("aaa");
		linkedBlockingQueue.add("bbb");
		linkedBlockingQueue.add("ccc");
		linkedBlockingQueue.drainTo(arrayList,2);
		System.out.println(arrayList); //[aaa, bbb, ccc, aaa, bbb]
		System.out.println(linkedBlockingQueue); //[ccc]
		
		//6.获取迭代器
		Iterator<String> iterator = linkedBlockingQueue.iterator();
		while(iterator.hasNext()) {
			System.out.print(iterator.next()+" "); //ccc
		}
		System.out.println();
		
		//7.添加元素到队列尾部
		linkedBlockingQueue.offer("eee");
		System.out.println(linkedBlockingQueue); //[ccc, eee]
		
		//8.该队列的尾部插入指定的元素,等待指定的等待时间,以使空间在队列已满时变为可用
		try {
			linkedBlockingQueue.offer("fff",2000,TimeUnit.MILLISECONDS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(linkedBlockingQueue);  //[ccc, eee, fff]
		
		//9.检索但不删除此队列的头元素
		System.out.println(linkedBlockingQueue.peek()); //ccc
		System.out.println(linkedBlockingQueue); //[ccc, eee, fff]
		
		//10.检索并删除此队列的头元素
		System.out.println(linkedBlockingQueue.poll()); //ccc
		System.out.println(linkedBlockingQueue); //[eee, fff]
		
		//11.检索并删除此队列的头元素,等待指定的等待时间(如有必要)使元素变为可用
		try {
			System.out.println(linkedBlockingQueue.poll(2000,TimeUnit.MILLISECONDS)); //eee
		} catch (InterruptedException e) {
			e.printStackTrace();
		} 
		System.out.println(linkedBlockingQueue); //[fff]
		
		//13.在该队列的尾部插入指定的元素
		try {
			linkedBlockingQueue.put("ggg");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(linkedBlockingQueue); //[fff, ggg]
		
		//14.返回此队列可以理想的情况下接受而不阻止的附加元素数
		System.out.println(linkedBlockingQueue.remainingCapacity()); //2147483645	
		
		//15.删除元素
		System.out.println(linkedBlockingQueue.remove("ddd")); //false
		System.out.println(linkedBlockingQueue); //[fff, ggg]
		
		//16.返回此队列中的元素数
		System.out.println(linkedBlockingQueue.size()); //2
		
		//17.检索并删除此队列的头元素
		try {
			System.out.println(linkedBlockingQueue.take()); //fff
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(linkedBlockingQueue); //[ggg]

三、LinkedBlockingQueue源码

3.1 节点

??先看LinkedBlockingQueue中所存储的节点的定义:

    static class Node<E> {
    	//当前节点value
        E item;
        //下一个节点
        Node<E> next;
        Node(E x) { item = x; }
    }

??从这个节点定义可以看出,LinkedBlockingQueue是一个单向链表构成的队列。
??接着看一些LinkedBlockingQueue中的变量:

    //队列容量
    private final int capacity;
    //当前队列元素数量
    private final AtomicInteger count = new AtomicInteger();
	//头结点,不存数据
    transient Node<E> head;
	//尾节点
    private transient Node<E> last;
    //出队锁,只有take,poll方法会持有
    private final ReentrantLock takeLock = new ReentrantLock();
    //出队等待条件
    private final Condition notEmpty = takeLock.newCondition();
    //入队锁,只有put,offer会持有
    private final ReentrantLock putLock = new ReentrantLock();
    //入队等待条件
    private final Condition notFull = putLock.newCondition();

3.2 构造方法

  • 1、LinkedBlockingQueue()
    public LinkedBlockingQueue() {
    	//未指定嘟列容量时,容量为Integer.MAX_VALUE
        this(Integer.MAX_VALUE);
    }
  • 2、LinkedBlockingQueue(int capacity)
	//指定初始容量的队列
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        //初始化首尾节点
        last = head = new Node<E>(null);
    }
  • 3、LinkedBlockingQueue(Collection<? extends E> c)
    public LinkedBlockingQueue(Collection<? extends E> c) {
    	//初始化容量
        this(Integer.MAX_VALUE);
        //初始化入队锁
        final ReentrantLock putLock = this.putLock;
        //加锁
        putLock.lock(); 
        try {
            int n = 0;
            //遍历集合
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                //入队
                enqueue(new Node<E>(e));
                ++n;
            }
            //更新队列中元素数量
            count.set(n);
        } finally {
        	//释放锁
            putLock.unlock();
        }
    }

3.3 添加元素

  • 1、offer(E e)
	//元素入队
    public boolean offer(E e) {
    	//不能添加非null元素
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        //如果队列已满,不能再往队列中添加元素
        if (count.get() == capacity)
            return false;
        int c = -1;
        //构建新节点
        Node<E> node = new Node<E>(e);
        //获取独占锁putLock
        final ReentrantLock putLock = this.putLock;
        //加锁
        putLock.lock();
        try {
        	//如果队列未满,元素入队
            if (count.get() < capacity) {
                enqueue(node);
                //更新队列中元素数量
                c = count.getAndIncrement();
                //如果队列未满,唤醒一个入队条件队列中阻塞的线程
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
        	//释放锁
            putLock.unlock();
        }
        //节点数量为0,说明队列是空的
        if (c == 0)
        	//唤醒一个出队条件队列阻塞的线程
            signalNotEmpty();
        return c >= 0;
    }
  • 2、offer(E e, long timeout, TimeUnit unit)
	//在队尾插入一个元素,超时则不尝试,支持中断。当前节点入队后,如果
	//队列没满,就唤醒一个入队的线程让其入队
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        //转换为纳秒
        long nanos = unit.toNanos(timeout);
        int c = -1;
        //获取入队锁,支持中断
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
        	//队列已满
            while (count.get() == capacity) {
            	//如果超时,返回false
                if (nanos <= 0)
                    return false;
                //更新时间
                nanos = notFull.awaitNanos(nanos);
            }
            //元素入队
            enqueue(new Node<E>(e));
            //更新队列中元素数量
            c = count.getAndIncrement();
            //说明当前元素后面还能再插入一个
            //就唤醒一个入队条件队列中阻塞的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
        	//释放锁
            putLock.unlock();
        }
        //节点数量为0,说明队列是空的
        if (c == 0)
        	//唤醒一个出队条件队列阻塞的线程
            signalNotEmpty();
        return true;
    }
  • 3、put(E e)
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        //创建节点
        Node<E> node = new Node<E>(e);
        //获取入队锁,支持中断
        final ReentrantLock putLock = this.putLock;
        //获取队列中元素数量
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
        	//队列满了,进入阻塞状态
            while (count.get() == capacity) {
                notFull.await();
            }
            //元素入队
            enqueue(node);
            c = count.getAndIncrement();
            //如果还可以插入元素,那么释放等待的入队线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //通知出队线程队列非空
        if (c == 0)
            signalNotEmpty();
    }

3.4 删除元素

  • 1、offer(E e)
	//检索并删除此队列的头元素
    public E poll() {
        final AtomicInteger count = this.count;
        //队列未空,返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        //获取独占锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
        	//队列不空则出队
            if (count.get() > 0) {
                x = dequeue();
                //队列中数量-1
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        //如果出队前,队列是满的,则唤醒一个被take()阻塞的线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
  • 2、poll(long timeout, TimeUnit unit)
	//检索并删除此队列的头元素,可以超时等待
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        //转化为纳秒
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        //获取独占锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
            	//超时时间到,返回null
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            //元素出队
            x = dequeue();
            c = count.getAndDecrement();
            //如果队列中还有数据可取,释放notEmpty条件等待队列中的第一个线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        //如果出队前,队列是满的,则唤醒一个被take()阻塞的线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
  • 3、take()
	//检索并删除此队列的头元素
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        //获取独占锁,可中断
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
        	//队列为空,进入阻塞状态
            while (count.get() == 0) {
                notEmpty.await();
            }
            //队列不为空,元素出队
            x = dequeue();
            //队列中元素个数-1
            c = count.getAndDecrement();
            //如果队列中还有数据可取,释放notEmpty条件等待队列中的第一个线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        //如果出队前,队列是满的,则唤醒一个被take()阻塞的线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
  • 4、remove(Object o)
 public boolean remove(Object o) {
        //因为队列不包含null元素,返回false
        if (o == null) return false;
        //获取两把锁
        fullyLock();
        try {
            //从头的下一个节点开始遍历
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                 //如果匹配,那么将节点从队列中移除,trail表示前驱节点
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            //释放两把锁
            fullyUnlock();
        }
    }

3.5 查找元素

  • 1、peek()
	//检索但不删除此队列的头元素
    public E peek() {
    	//队列未空,返回null
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
        	//因为head实际上是个空节点,所以要返回的是head的下一个节点
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
  • 2、contains(Object o)
    public boolean contains(Object o) {
        if (o == null) return false;
        //加两把锁
        fullyLock();
        try {
        	//从头结点从前向后遍历
            for (Node<E> p = head.next; p != null; p = p.next)
                if (o.equals(p.item))
                    return true;
            return false;
        } finally {
        	//释放两把锁
            fullyUnlock();
        }
    }

3.6 清空队列

  • clear()
    public void clear() {
    	//加两把锁
        fullyLock();
        try {
        	//从头结点开始,全部元素置为null
            for (Node<E> p, h = head; (p = h.next) != null; h = p) {
                h.next = h;
                p.item = null;
            }
            //头尾节点置为同一个节点
            head = last;
 
            if (count.getAndSet(0) == capacity)
                notFull.signal();
        } finally {
        	//释放两把锁
            fullyUnlock();
        }
    }

3.7 获取队列元素个数

  • size()
    public int size() {
        return count.get();
    }

四、LinkedBlockingQueue特点

  1. 基于链表实现的队列,从头部获取元素,在尾部插入元素,比基于数组的队列吞吐量更高。
  2. 双锁队列的变种实现,一把写锁,一把读锁。
  3. 默认队列的大小是Integer的最大值,如果添加速度大于读取速度的话,有可能造成内存溢出。
  4. 因为是两把锁,所以元素的个数使用了一个原子类型的变量来维护(AtomicInteger)。
  PHP知识库 最新文章
Laravel 下实现 Google 2fa 验证
UUCTF WP
DASCTF10月 web
XAMPP任意命令执行提升权限漏洞(CVE-2020-
[GYCTF2020]Easyphp
iwebsec靶场 代码执行关卡通关笔记
多个线程同步执行,多个线程依次执行,多个
php 没事记录下常用方法 (TP5.1)
php之jwt
2021-09-18
上一篇文章      下一篇文章      查看所有文章
加:2021-09-18 09:51:10  更:2021-09-18 09:52:16 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/13 6:10:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码