写在前面
本文一起来看下Java中阻塞队列相关内容。
1:队列
队列是一种先进先出FIFO的数据结构,是一种线性数据结构,一般有两种实现方式,一种是基于数组实现,另外一种是基于链表实现。接下来分别来看下。
1.1:基于数组实现
基于数组实现的队列叫做顺序队列,比较重要的点如下:
1:使用一个指针head指向下一个可以出队的位置。
2:使用一个指针tail作为下一个可以入队的位置。
3:队列空条件,head和tail重合。
4:队列满条件,tail的下一个位置是head。
如下图是从一个队列空到满的过程示意图:
如下图是出队列到队列空的过程示意图:
程序实现如下:
public class ArrayQueue {
private String[] items;
private int n;
private int head;
private int tail;
public ArrayQueue(int capacity) {
items = new String[capacity];
this.n = capacity;
head = 0;
tail = 0;
}
public boolean enqueue(String item) {
if (isFull()) {
System.out.println("队列已满!");
return false;
}
items[tail] = item;
tail = (tail + 1) % n;
System.out.println(item + "入队列!");
return true;
}
public String dequeue() {
if(isEmpty()) {
System.out.println("队列已空!");
return null;
}
String dequeueItem = items[head];
head = (head + 1) % n;
System.out.println(dequeueItem + "出队列!");
return dequeueItem;
}
public boolean isEmpty() {
return head == tail;
}
public boolean isFull() {
return head == (tail + 1) % n;
}
}
测试代码如下:
class FakeCls {
public static void arrayQueueTest() {
ArrayQueue arrayQueue = new ArrayQueue(4);
arrayQueue.enqueue("1");
arrayQueue.enqueue("2");
arrayQueue.enqueue("3");
arrayQueue.enqueue("4");
arrayQueue.enqueue("5");
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.enqueue("6");
arrayQueue.enqueue("7");
arrayQueue.dequeue();
arrayQueue.enqueue("8");
arrayQueue.enqueue("9");
arrayQueue.enqueue("10");
arrayQueue.enqueue("11");
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.dequeue();
}
}
运行如下:
1入队列!
2入队列!
3入队列!
队列已满!
队列已满!
1出队列!
2出队列!
3出队列!
队列已空!
队列已空!
6入队列!
7入队列!
6出队列!
8入队列!
9入队列!
队列已满!
队列已满!
7出队列!
8出队列!
9出队列!
队列已空!
队列已空!
1.2:基于链表实现
基于链表实现的队列叫做链式队列,具体实现类似于数组,但是不同之处在于数组是通过下标来定位元素,而链表需要通过指针来定位元素,因此每个元素需要维护其上一个元素和下一个元素的指针,我们可以定义一个如下的节点类:
public class LinkQueueNode {
public LinkQueueNode next;
public LinkQueueNode prev;
public String data;
public LinkQueueNode() {
}
public LinkQueueNode(LinkQueueNode prev, LinkQueueNode next, String data) {
this.prev = prev;
this.next = next;
this.data = data;
}
}
然后定义如下的基于链表的队列实现类:
public class LinkQueue {
public static final String HEAD_DATA = "head";
private LinkQueueNode firstNode;
private LinkQueueNode head;
private LinkQueueNode tail;
private int n;
public LinkQueue(int capacity) {
this.n = capacity;
LinkQueueNode curNode = null;
LinkQueueNode newNode = null;
for (int i = 0; i < capacity; i++) {
newNode = new LinkQueueNode(null, null, null);
if (i == 0) {
head = newNode;
tail = newNode;
firstNode = newNode;
} else {
curNode.setNext(newNode);
newNode.setPrev(curNode);
}
curNode = newNode;
}
}
public boolean enqueue(String item) {
if (isFull()) {
System.out.println("队列已满!");
return false;
}
tail.data = item;
tail = (tail.next == null) ? firstNode : tail.next;
System.out.println(item + "入队!");
return true;
}
public boolean dequeue() {
if (isEmpty()) {
System.out.println("队列已空!");
return false;
}
String dequeueItem = head.data;
head.data = null;
head = (head.next == null) ? firstNode : head.next;
System.out.println(dequeueItem + "出队!");
return true;
}
public boolean isFull() {
return head == tail && head.getData() != null;
}
public boolean isEmpty() {
return head == tail && head.getData() == null;
}
}
测试代码如下:
class FakeCls {
private static void linkQueueTest() {
LinkQueue arrayQueue = new LinkQueue(4);
arrayQueue.enqueue("1");
arrayQueue.enqueue("2");
arrayQueue.enqueue("3");
arrayQueue.enqueue("4");
arrayQueue.enqueue("5");
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.enqueue("6");
arrayQueue.enqueue("7");
arrayQueue.dequeue();
arrayQueue.enqueue("8");
arrayQueue.enqueue("9");
arrayQueue.enqueue("10");
arrayQueue.enqueue("11");
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.dequeue();
arrayQueue.dequeue();
}
}
2:什么是阻塞队列?
以上不管是基于数组实现的队列还是基于链表实现的队列,当队列满时数据会无法插入,当队列空时会直接取不到数据,与此对应的如果是,当队列空时取队列数据线程等待直到队列有数据,当队列满时插入数据线程等待队列有空闲位置,具有这种行为的队列我们叫做是阻塞队列 。接下来我们通过jdk提供的相关阻塞队列实现 来一起看下。
2.1:BlockingQueue
这是在jdk的java.util.concurrent 包中提供的一个接口,定义了阻塞队列相关的操作,其UML图如下:
可以看到其是java.util.Collection 集合类的子接口,因此阻塞队列也是集合 。接口源码如下:
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
2.2:PriorityBlockingQueue
带有优先级的队列,基于堆实现,这里是小顶堆,如下测试代码:
class FakeCls {
public static void main(String[] args) throws Exception {
PriorityBlockingQueue<Integer> priorityBlockingQueue = new PriorityBlockingQueue<>();
priorityBlockingQueue.put(23);
priorityBlockingQueue.put(12);
priorityBlockingQueue.put(54);
priorityBlockingQueue.put(42);
Integer curEle = null;
while ((curEle = priorityBlockingQueue.poll()) != null) {
System.out.println(curEle);
}
}
}
运行输出如下:
12
23
42
54
可以看到是按照从小到大的顺序输出的。
堆是这样的一种数据结构,i>=0 当ele(i)>ele(2i+1)并且ele(i)>ele(2i+2)时是大顶堆。当ele(i)<ele(2i+1)并且ele(i)<ele(2i+2)时是大顶堆。
2.3:DelayQueue
带有优先级和延迟时长的队列,只有超过了延迟时间数据才会被返回,加入到其中的元素必须实现java.util.concurrent.Delayed 接口。比如购物场景,当用户将某商品加入购物车30分钟后还没有进行支付,可以短信通知用户提醒付款。如下测试代码:
class MyDelayedTask implements Delayed {
private String name ;
private long start = System.currentTimeMillis();
private long time ;
public MyDelayedTask(String name,long time) {
this.name = name;
this.time = time;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
MyDelayedTask o1 = (MyDelayedTask) o;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "MyDelayedTask{" +
"name='" + name + '\'' +
", time=" + time +
'}';
}
}
public class TT {
private static DelayQueue delayQueue = new DelayQueue();
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
delayQueue.offer(new MyDelayedTask("task1",10000));
delayQueue.offer(new MyDelayedTask("task2",3900));
delayQueue.offer(new MyDelayedTask("task3",1900));
delayQueue.offer(new MyDelayedTask("task4",5900));
delayQueue.offer(new MyDelayedTask("task5",6900));
delayQueue.offer(new MyDelayedTask("task6",7900));
delayQueue.offer(new MyDelayedTask("task7",4900));
}
}).start();
while (true) {
Delayed take = delayQueue.take();
System.out.println(take);
}
}
}
运行:
MyDelayedTask{name='task3', time=1900}
MyDelayedTask{name='task2', time=3900}
MyDelayedTask{name='task7', time=4900}
MyDelayedTask{name='task4', time=5900}
MyDelayedTask{name='task5', time=6900}
MyDelayedTask{name='task6', time=7900}
2.3.1:Delayed
接口``java.util.concurrent.Delayed`,源码如下:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
2.3.2:DelayQueue
class FakeCls {
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
}
class FakeCls {
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null;
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
}
class FakeCls {
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
}
2.4:ArrayBlockingQueue
基于数组实现的阻塞队列,构造函数源码如下:
class FakeCls {
final Object[] items;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
int takeIndex;
int putIndex;
int count;
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
}
2.4.1:添加元素put
该方法在无可用空间时会阻塞等待,源码如下:
class FakeCls {
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
}
2022年3月15日15:25:20 处是等待队列有可用空间,并且使用了DCL ,防止线程唤醒的一瞬间,其他线程已经抢先一步入队元素。2022年3月15日15:27:34 处是入队方法,具体参考2.4.2:入队enqueue 。
2.4.2:入队enqueue
源码如下:
class FakeCls {
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
}
2.4.3:获取元素poll
该方法在有可用元素时获取元素,无可用元素是返回null(默认值),源码如下:
class FakeCls {
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
}
2022年3月15日15:51:48 处如果是当前无元素,返回null,否则调用dequeue方法获取元素,具体参考2.4.4:出队dequeue 。
2.4.4:出队dequeue
源码如下:
class FakeCls {
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
}
2.5:LinkedBlockingDeque
基于链表实现的阻塞队列,主要源码如下:
class FakeCls {
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}
transient Node<E> first;
transient Node<E> last;
private transient int count;
private final int capacity;
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
}
该类在dubbo的集群容错策略中ForkingCluster 中使用到了,是一个非常典型的使用场景。
写在后面
参考文章列表:
Java 阻塞队列–BlockingQueue 。
DelayQueue详解 。
|