BlockingQueue
特征
队列是一种存储数据的数据结构,符合先进先出(FIFO)的原则。阻塞队列BlockingQueue是Java.concurrent.util包下的并发容器,除了符合队列的特点之外,还是线程安全的,保证在一个JVM中同一时刻只会有一个线程进行入队和出队操作。适用于解决并发生产者 - 消费者问题,在源代码的注释中有生产-消费示例:
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}}
BlockingQueue中put / take方法支持阻塞的入队和出队操作:
- 当阻塞队列满时,如果生产者put元素,队列则会一直阻塞生产者,直到队列可用或者响应中断退出;
- 当阻塞队列为空,如果消费者take元素,队列则会一直阻塞消费者,直到队列不为空。
具体实现
Java中提供了很多实现BlockingQueue的阻塞队列,常见的比如:
- ArrayBlockingQueue,由数组实现的有界阻塞队列;
- LinkedBlockingQueue,用链表实现的有界阻塞队列(理论上有界,容量为Integer.MAX_VALUE);
- PriorityBlockingQueue,支持优先级的无界阻塞队列;
- DelayQueue,支持延时获取元素的无界阻塞队列;
- …
BlockingQueue结构关系图:
ArrayBlockingQueue
以ArrayBlockingQueue为例,分析put / take方法如何支持阻塞操作。
BlockingQueue queue = new ArrayBlockingQueue(10);
queue.put(produce());
queue.take();
实例化
ArrayBlockingQueue构造方法以及核心属性:
...
final Object[] items;
...
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
...
public ArrayBlockingQueue(int capacity) {this(capacity, false);}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
入队
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
出队
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
Condition
Condition是Java.concurrent.util下提供的一个接口,使线程等待某个条件,当该条件满足时,唤醒等待的线程。主要提供了两类方法:线程等待和线程唤醒。
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
AbstractQueuedSynchronizer有一个内部类ConditionObject,实现Condition接口,并重写await和signal。
await
把获取到锁的线程添加到条件等待队列中阻塞,并释放锁。
public class ConditionObject implements Condition, java.io.Serializable {
...
private transient Node firstWaiter;
private transient Node lastWaiter;
...
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
...
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
signal
把条件等待队列中的节点移到同步等待队列(CLH)的后面,让其重新等待锁的获取。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
|