一、阻塞队列简介
队列常被用来解决生产——消费者问题,Java中定义了Queue 接口以及通用的一些抽象方法
public interface Queue<E> extends Collection<E> {
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}
上面所列举出来的只是普通的队列的通用方法,而Java中的阻塞队列BlockingQueue ,继承了Queue 接口,同时又添加了两个具有阻塞功能的抽象方法,同时又提供了offer() 和poll 两个可阻塞的重载方法:
通过下面阻塞方法的定义可以看出,只要是会被阻塞的方法,都会抛出InterruptedException 异常
public interface BlockingQueue<E> extends Queue<E> {
void put(E e) throws InterruptedException;
E take() throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
}
对BlockingQueue 的常用方法做一个归纳如下:
方法 | 抛出异常 | 返回特定值 | 阻塞 | 指定阻塞时间 |
---|
入队 | add(e) | offer(e) | put(e) | offer(e,time,unit) | 出队 | remove() | poll() | take() | poll(time,unit) | 获取队首元素 | element() | peek() | 不支持 | 不支持 |
阻塞队列除了具有可阻塞的特性之外,还有另外一个重要的特性就是容量大小,分为有界和无界。没有绝对意义的上的无界,只是这个界限很大,可以放很多元素。以LinkedBlockingQueue 为例,它的容量大小为Integer.MAX_VALUE ,这是一个非常大的数字,我们通常认为它就是无界的。也有一些阻塞队列是有界的,比如ArrayBlockingQueue ,如果达到最大容量之后,也不会进行扩容。所以一旦满了就无法再往里面放数据了。
BlockingQueue 同时也是线程安全的,它可以保证多线程的情况下,保证生产者和消费者的线程安全,其内部大多都是采用CAS 和ReentrantLock 来保证线程安全,业务代码无需再关注多线程安全的问题,直接向队列里面放或者取就可以了,如图所示:
同时,阻塞队列还启动了资源隔离的作用,在复杂业务中,业务A完成后,只需要将结果丢到队列中即可,不需要关心后面的步骤,业务B会从队列中获取任务来执行对应的业务,实现了业务之间的解耦,也可以提高安全性。
下面就介绍一些常用的阻塞队列和部分核心源码
二、常用阻塞队列及核心源码分析
2.1 ArrayBlockingQueue
ArrayBlockingQueue 是一个典型的有界的线程安全的阻塞队列,初始化时需要指定其容量大小,其内部元素使用数组进行存储,以put() 方法为例,使用ReentrantLock 来保证线程安全,通过条件队列的两个条件notEmpty 和notFull 来进行阻塞和唤醒
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
final Object[] items;
int takeIndex;
int putIndex;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
}
由于ArrayBlockingQueue 的put() 和take() 方法使用ReentrantLock 进行同步,同时只有一个方法可以执行,所以在高并发的情况下,性能会比较差。
思考:ArrayBlockingQueue为什么采用双指针环形数组的方式?
普通的数组,删除数组元素时需要进行移位操作,导致它的时间复杂度为O(n),而采用双指针环形数组,不需要进行移位,只需要分别移动两个指针即可。
2.2 LinkedBlockingQueue
LinkedBlockingQueue 是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE ,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM 错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
}
LinkedBlockingQueue与ArrayBlockingQueue对比
- ArrayBlockingQueue使用一个独占锁,读写不分离,而LinkedBlockeingQueue使用两个独占锁,读写操作锁分离,性能更好
- 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
- 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
- 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
2.3 LinkedBlockingDeque
LinkedBlockingDeque 是对LinkedBlockingQueue 的增强,其顶层接口为Deque ,该接口定义了更加丰富的操作队列的方法,通过方法名就可以看出来,这些方法打破了队列先进先出的固有规则,提供了可以从头部或者尾部操作的API
public interface Deque<E> extends Queue<E> {
void addFirst(E e);
void addLast(E e);
boolean offerFirst(E e);
boolean offerLast(E e);
E removeFirst();
E removeLast();
E pollFirst();
E pollLast();
E getFirst();
E getLast();
E peekFirst();
E peekLast();
}
BlockingDeque 接口继承了Deque ,同时又提供了几个可阻塞的方法
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
void putFirst(E e) throws InterruptedException;
void putLast(E e) throws InterruptedException;
E takeFirst() throws InterruptedException;
E takeLast() throws InterruptedException;
}
而LinkedBlockingDeque 实现了BlockingDeque 接口,其内部通过双向链表来记录元素,通过一把ReentrantLock 来保证线程安全,该类可以看成是ArrayBlockingQueue 和LinkedBlockingQueue 的结合与增强
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}
transient Node<E> first;
transient Node<E> last;
private transient int count;
private final int capacity;
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
private boolean linkFirst(Node<E> node) {
if (count >= capacity)
return false;
Node<E> f = first;
node.next = f;
first = node;
if (last == null)
last = node;
else
f.prev = node;
++count;
notEmpty.signal();
return true;
}
}
2.4 SynchronousQueue
SynchronousQueue 是一个没有缓冲的BlockingQueue ,生产者线程对元素的插入操作put() 必须等待消费者的移除操作take() ,其模型如下图:
如上图所示,SynchronousQueue 最大的不同在于,它的容量为0,没有地方来缓存元素,这就导致了每次添加元素都会被阻塞,直到有线程来取元素;同理,取元素也是一样,取元素的线程也会被阻塞,直到有线程添加元素。
由于SynchronousQueue 不需要持有元素,它的作用在于直接传递,所以它非常适用于传递性场景做交换工作,生产者线程和消费者线程同步传递某些信息、事务或任务
SynchronousQueue 常见的一个场景就是在Executors.newCachedThreadPool() 中,因为不确定生产者的请求数量(创建任务),而这些请求又需要被及时处理,那么使用SynchronousQueue 为每个生产者线程分配一个消费者线程就是处理效率最高的方式。线程池会根据需要(新任务到来)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60s之后会被回收。
下面结合源码来分析它的实现原理:
SynchronousQueue 内部抽象类Transferer 提供了任务传递的方法transfer() ,而该方法内部包含了线程阻塞与唤醒的逻辑,而Transferer 有两个实现类TransferQueue 和TransferStack ,可以理解为存储阻塞线程的方式有两种:队列和栈。根据这两者的特性,可以分为公平和非公平的实现,队列满足FIFO(先进先出) 的特性,所以是公平的实现;而栈满足LIFO(后进先出) 的特性,所以是非公平的实现。
下面SynchronousQueue 的构造方法,提供了公平和非公平的选项,默认为非公平实现
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
下面以TransferQueue 为例简要分析元素入队和出队的操作,SynchronousQueue 的put() 和take() 都会去调用transfer() 方法添加元素或获取元素
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
下面分析TransferQueue 的transfer() 方法,在分析该方法之前,先看一下它的内部类QNode ,TransferQueue 中使用QNode 来记录元素和被阻塞的线程,其中还利用UNSAFE 来获取元素和下一个节点的偏移量,直接通过CAS修改对应的数值,QNode 中还有很多的CAS 方法这里没有一一列举出来。
static final class TransferQueue<E> extends Transferer<E> {
static final class QNode {
volatile QNode next;
volatile Object item;
volatile Thread waiter;
final boolean isData;
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
transient volatile QNode head;
transient volatile QNode tail;
TransferQueue() {
QNode h = new QNode(null, false);
head = h;
tail = h;
}
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
private static final long cleanMeOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
cleanMeOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("cleanMe"));
} catch (Exception e) {
throw new Error(e);
}
}
}
上面介绍了TransferQueue 大致的内部构造,下面重点看transfer() 方法实现,
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null)
continue;
if (h == t || t.isData == isData) {
QNode tn = t.next;
if (t != tail)
continue;
if (tn != null) {
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0)
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s))
continue;
advanceTail(t, s);
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) {
clean(t, s);
return null;
}
if (!s.isOffList()) {
advanceHead(t, s);
if (x != null)
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else {
QNode m = h.next;
if (t != tail || m == null || h != head)
continue;
Object x = m.item;
if (isData == (x != null) ||
x == m ||
!m.casItem(x, e)) {
advanceHead(h, m);
continue;
}
advanceHead(h, m);
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
在transfer() 中有一个重要的方法awaitFulfill ,它会去进行自旋阻塞
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
if (x != e)
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
2.5 PriorityBlockingQueue
PriorityBlockingQueue 是一个无界的基于数组的优先级阻塞队列,虽然它是无界的,但在初始化的时候,它是可以指定数组初始化容量的,它的无界是基于它可以进行动态扩容而言的。
如果没有指定初始化容量,它默认的容量为11,最大容量为Integer.MAX_VALUE - 8
private static final int DEFAULT_INITIAL_CAPACITY = 11;
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
同时,PriorityBlockingQueue 是一个优先级队列,它每次出队都会返回优先级最高或最低的元素,它的构造方法中提供了自定义Comparator 比较器,默认情况下使用自然顺序升序排序。
通过下面的构造方法也可以看出,该队列线程安全是由ReentrantLock 来保证的,同时需要注意的是PriorityBlockingQueue不能保证同等优先级元素的顺序
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
那么PriorityBlockingQueue 如果只是简单的使用数组操作来对插入元素移除进行排序,其性能将是非常低的,而它采用的是最大最小堆的方式来插入或移除数据,大小堆只是逻辑上的一种操作方式而已,其储存结构依然是数组
完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序
二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,二叉堆又可以分为两个类型:大顶堆和小顶堆。
最大最小堆满足以下特性:
- 最大堆:根结点的键值是所有堆结点键值中最大者
- 最小堆:根结点的键值是所有堆结点键值中最小者
下图展示了最小二叉堆的情况:
最大最小堆按照从上到下,从左到右来一次表示索引位置,上图中右下角的数字表示该元素在数组中的索引下标
在最大最小二叉堆中,插入或移除元素时,都可能涉及到元素位置调整,而在二叉堆中,利用元素的下标索引,可以很简单的计算其父节点以及左右节点的下标(以索引下标为t的元素为例):
父节点:P(t) = (t-1) >>> 1 <=> (t-1)/2
左节点:L(t) = t <<< 1 +1 <=> t*2 +1
右节点:R(t) = t <<< 1 + 2 <=> t*2 +2
下面结合源码分析它是如何添加和移除元素的
由于PriorityBlockingQueue 是无界队列,所以添加元素时线程不需要阻塞,容量不够进行扩容就可以了
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
扩容的代码如下,tryGrow() 方法在offer() 方法的while 循环体内部,就实现了CAS+自旋的方式来实现线程安全的扩容
private void tryGrow(Object[] array, int oldCap) {
lock.unlock();
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) :
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) {
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null)
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
核心的方法在于siftUpComparable() 和siftUpUsingComparator() 这两个方法,这两个方法才是二叉堆入队的核心方法,以siftUpUsingComparator() 为例来分析
这里面是一个while 循环,进行元素的上浮操作,每次都是获取当前节点的父节点,然后与插入的元素进行比对,如果比较的结果满足最大最小堆的结构,就直接退出循环,否则就换位,继续进行比较,知道满足条件
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}
获取元素的代码如下,因为获取元素的线程会被阻塞,所以这个方法会抛出中断异常
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
int n,
Comparator<? super T> cmp) {
if (n > 0) {
int half = n >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = array[child];
int right = child + 1;
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
c = array[child = right];
if (cmp.compare(x, (T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = x;
}
}
2.6 DelayQueue
DelayQueue 是一个支持延时获取元素的阻塞队列,内部采用PriorityQueue 存储元素,同时元素必须实现Delayed 接口,接口的getDelay() 方法可以返回延时时间延时的时间,方法参数为时间工具类TimeUnit
在获取元素是,只有延迟时间到了才能从队列中提取元素。
延迟队列的特点:并不是先进先出,而是按照延迟时间的长短进行排序,下一个被执行的任务排在队列的最前面。
由于队列元素必须实现Delayed 接口,而该接口又继承自Comparable 接口,所以,元素类还要去实现compareTo() 方法,这样在创建队列时就不需要在额外创建Comparator 对象了,元素本身就具有了排序的能力。
下面定义了一个元素类
class DelayObject implements Delayed {
private String name;
private long time;
public DelayObject(String name, long delayTime) {
this.name = name;
this.time = System.currentTimeMillis() + delayTime;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = time - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed obj) {
if (this.time < ((DelayObject) obj).time) {
return -1;
}
if (this.time > ((DelayObject) obj).time) {
return 1;
}
return 0;
}
}
使用Demo:
BlockingQueue<DelayObject> blockingQueue = new DelayQueue<>();
blockingQueue.put(new DelayObject("lizhi", 1000 * 10));
blockingQueue.put(new DelayObject("linan", 1000 * 30));
DelayObject lizhi = blockingQueue.take();
DelayObject linan = blockingQueue.take();
下面看一下DelayQueue 的构造,使用ReentrantLock 来保证线程安全,取元素需要进行阻塞,底层使用PriorityQeue 进行存储,这是一个优先级队列,与上面PriorityBlockingQueue 是一样的,只是没有阻塞功能
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final Condition available = lock.newCondition();
下面看一下具体的put() 和take()
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
下面是take() 方法,要比put() 方法复杂一些
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null;
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
延迟队列的应用场景**:**
- 订单超时关闭:下单后在规定时间内没有付款就取消订单
- 异步短信通知:外卖下单成功60S之后给用户发送短信
- 关闭空闲连接:连接池中,有一些非核心的连接在空闲一段时间后就关闭
三、选择合适的阻塞队列
我们接触的比较多的就是线程中使用阻塞队列,线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。
- FixedThreadPool(SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue
- CachedThreadPool 选取的是 SynchronousQueue
- ScheduledThreadPool(SingleThreadScheduledExecutor同理)选取的是延迟队列
注:ScheduledThreadPool 中使用的阻塞队列并不是DelayQueue ,而是自定义实现的DelayedWorkQueue
一般从以下几个维度来选择合适的阻塞队列
-
功能 比如是否需要阻塞队列帮我们排序,如优先级排序、延迟执行等。如果有这个需要,就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列。 -
容量 是否有存储的要求,还是只需要“直接传递”。在考虑这一点的时候,我们知道前面介绍的那几种阻塞队列,有的是容量固定的,如 ArrayBlockingQueue;有的默认是容量无限的,如 LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的,我们需要根据任务数量来推算出合适的容量,从而去选取合适的 BlockingQueue。 -
能够扩容 因为有时我们并不能在初始的时候很好的准确估计队列的大小,因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反,PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。 -
内存结构 我们分析过 ArrayBlockingQueue 的源码,看到了它的内部结构是“数组”的形式。和它不同的是,LinkedBlockingQueue 的内部是用链表实现的,所以这里就需要我们考虑到,ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。 -
性能 从性能的角度去考虑。比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue。
|