??本系列文章: ????Java集合(一)集合框架使用综述 ????Java集合(二)ArrayList使用及源码分析 ????JDK集合(三)LinkedList使用及源码分析 ????JDK集合(四)Vector使用及源码分析 ????JDK集合(五)Stack使用及源码分析 ????JDK集合(六)HashMap使用及源码分析 ????Java集合(七)Hashtable使用及源码分析 ????Java集合(八)LinkedHashMap使用及源码分析 ????Java集合(九)HashSet使用及源码分析 ????Java集合(十)LinkedHashSet使用及源码分析 ????Java集合(十一)ArrayBlockingQueue使用及源码分析 ????Java集合(十二)LinkedBlockingQueue使用及源码分析
一、LinkedBlockingQueue概述
??LinkedBlockingQueue是一个基于链表实现的可选容量的阻塞队列。LinkedBlockingQueue的继承关系:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
??从继承关系看,LinkedBlockingQueue的特点:
- 继承AbstractQueue,实现BlockingQueue接口,具备队列的相关操作。
- 实现Serializable接口,可以序列化。
二、LinkedBlockingQueue使用
2.1 方法介绍
- 创建一个 LinkedBlockingQueue ,容量为 Integer.MAX_VALUE
public LinkedBlockingQueue()
- 创建一个 LinkedBlockingQueue ,容量为 Integer.MAX_VALUE ,最初包含给定集合的元素,以集合的迭代器的遍历顺序添加
public LinkedBlockingQueue(Collection<? extends E> c)
- 创建一个具有给定(固定)容量的 LinkedBlockingQueue
public LinkedBlockingQueue(int capacity)
- 从这个队列中原子地删除所有的元素
public void clear()
- 如果此队列包含指定的元素,则返回 true
public boolean contains(Object o)
- 从该队列中删除所有可用的元素,并将它们添加到给定的集合中
public int drainTo(Collection<? super E> c)
- 最多从该队列中删除给定数量的可用元素,并将它们添加到给定的集合中
public int drainTo(Collection<? super E> c, int maxElements)
- 以适当的顺序返回该队列中的元素的迭代器
public Iterator<E> iterator()
- 如果可以在不超过队列的容量的情况下立即将其指定的元素插入到队列的尾部,如果队列已满,则返回 false
public boolean offer(E e)
- 在该队列的尾部插入指定的元素,必要时等待指定的等待时间才能使空间变得可用
public boolean offer(E e, long timeout, TimeUnit unit)
- 检索但不删除此队列的头元素,如果此队列为空,则返回 null
public E peek()
- 检索并删除此队列的头元素,如果此队列为空,则返回 null
public E poll()
- 检索并删除此队列的头元素,等待指定的等待时间(如有必要)使元素变为可用
public E poll(long timeout, TimeUnit unit)
- 在该队列的尾部插入指定的元素,如果需要,等待队列变为可用
public void put(E e)
- 返回此队列可以理想地(在没有内存或资源限制)的情况下接受而不阻止的附加元素数
public int remainingCapacity()
- 从该队列中删除指定元素的单个实例(如果存在)
public boolean remove(Object o)
- 返回此队列中的元素数
public int size()
- 检索并删除此队列的头元素,如有必要,等待元素可用
public E take()
- 以适当的顺序返回一个包含此队列中所有元素的数组
public Object[] toArray()
- 以适当的顺序返回包含此队列中所有元素的数组; 返回的数组的运行时类型是指定数组的运行时类型
public <T> T[] toArray(T[] a)
- 返回此集合的字符串表示形式
public String toString()
2.2 方法使用
LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>();
ArrayList<String> arrayList = new ArrayList<>();
LinkedBlockingQueue<String> linkedBlockingQueue2 = new LinkedBlockingQueue<>(arrayList);
LinkedBlockingQueue<String> linkedBlockingQueue3 = new LinkedBlockingQueue<>(30);
linkedBlockingQueue.add("aaa");
linkedBlockingQueue.clear();
System.out.println(linkedBlockingQueue);
linkedBlockingQueue.add("aaa");
linkedBlockingQueue.add("bbb");
linkedBlockingQueue.add("ccc");
System.out.println(linkedBlockingQueue.contains("aaa"));
linkedBlockingQueue.drainTo(arrayList);
System.out.println(arrayList);
System.out.println(linkedBlockingQueue);
linkedBlockingQueue.add("aaa");
linkedBlockingQueue.add("bbb");
linkedBlockingQueue.add("ccc");
linkedBlockingQueue.drainTo(arrayList,2);
System.out.println(arrayList);
System.out.println(linkedBlockingQueue);
Iterator<String> iterator = linkedBlockingQueue.iterator();
while(iterator.hasNext()) {
System.out.print(iterator.next()+" ");
}
System.out.println();
linkedBlockingQueue.offer("eee");
System.out.println(linkedBlockingQueue);
try {
linkedBlockingQueue.offer("fff",2000,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.peek());
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.poll());
System.out.println(linkedBlockingQueue);
try {
System.out.println(linkedBlockingQueue.poll(2000,TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(linkedBlockingQueue);
try {
linkedBlockingQueue.put("ggg");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.remainingCapacity());
System.out.println(linkedBlockingQueue.remove("ddd"));
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.size());
try {
System.out.println(linkedBlockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(linkedBlockingQueue);
三、LinkedBlockingQueue源码
3.1 节点
??先看LinkedBlockingQueue中所存储的节点的定义:
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
??从这个节点定义可以看出,LinkedBlockingQueue是一个单向链表构成的队列。 ??接着看一些LinkedBlockingQueue中的变量:
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
3.2 构造方法
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
- 2、LinkedBlockingQueue(int capacity)
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
- 3、LinkedBlockingQueue(Collection<? extends E> c)
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
3.3 添加元素
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
- 2、offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
3.4 删除元素
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
- 2、poll(long timeout, TimeUnit unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
3.5 查找元素
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
public boolean contains(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}
3.6 清空队列
public void clear() {
fullyLock();
try {
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;
p.item = null;
}
head = last;
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
fullyUnlock();
}
}
3.7 获取队列元素个数
public int size() {
return count.get();
}
四、LinkedBlockingQueue特点
- 基于链表实现的队列,从头部获取元素,在尾部插入元素,比基于数组的队列吞吐量更高。
- 双锁队列的变种实现,一把写锁,一把读锁。
- 默认队列的大小是Integer的最大值,如果添加速度大于读取速度的话,有可能造成内存溢出。
- 因为是两把锁,所以元素的个数使用了一个原子类型的变量来维护(AtomicInteger)。
|