阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是BlockingQueue提供了可阻塞的插入和可阻塞的移除方法。当队列容器满了,插入线程会被阻塞,直到队列容器空出位置。当队列容器空了,移除线程会被阻塞,直到队列容器不为空. BlockingQueue 方法以四种形式出现:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:
成员变量
//用来存放数据的数组
final Object[] items;
//如果下次要获取数据,获取数据的位置
int takeIndex;
//如果下次要插入数据,插入数据的位置
int putIndex;
//数据的总数
int count;
//保证线程安全的锁
final ReentrantLock lock;
//Condition等待队列,主要用于获取数据的线程
private final Condition notEmpty;
//Condition等待队列,主要用于插入数据的线程
private final Condition notFull;
Put方法:
public void put(E e) throws InterruptedException {
//判断数组是否为null
checkNotNull(e);
final ReentrantLock lock = this.lock;
//尝试获取锁(可响应中断)
lock.lockInterruptibly();
try {
//判断总数count是否等于数组的长度,如果等于,表示数组满了
while (count == items.length)
//当前线程进入到notFull等待队列中
notFull.await();
//当阻塞队列不满的时候,进行入队操作
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}
put()方法调用了enqueue()方法进行入队操作:
private void enqueue(E x) {
//获得数组
final Object[] items = this.items;
//将元素x存放在数组的putIndex位置上
items[putIndex] = x;
//如果插入的位置已经是数组中的最后一个位置了,那么就将putIndex置为0
//因为已经到达最后一个了,那就只能从第一个位置插入元素
if (++putIndex == items.length)
putIndex = 0;
//总数+1
count++;
//唤醒notEmpty队列中等待获取数据的线程
notEmpty.signal();
}
Take方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//尝试获得锁(可响应中断)
lock.lockInterruptibly();
try {
//如果队列为空
while (count == 0)
//进入到notEmpty队列进行等待
notEmpty.await();
//返回获取的元素
return dequeue();
} finally {
//释放锁
lock.unlock();
}
}
take()方法调用dequeue()方法来从队列获取元素:
private E dequeue() {
//获取数组
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获得takeIndex位置上的数据
E x = (E) items[takeIndex];
//将takeIndex位置置为null
items[takeIndex] = null;
//判断是否是获取最后一个位置的数据,如果是,下一次就需要从0开始获取
if (++takeIndex == items.length)
takeIndex = 0;
//总数减一
count--;
//这个是itrs类实现的迭代器,是ArrayBlcokingQueue的一个内部类
if (itrs != null)
itrs.elementDequeued();
//因为取出了一个元素,所以队列中肯定要空位置,于是唤醒需要插入数据的线程。
notFull.signal();
return x;
}
Offer方法:
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
Poll()方法:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
|