??导读
LinkedBlockingQueue和它的名字一样,它是一个由链表实现的有界阻塞队列,该队列按照先进先出的逻辑对队列进行排序。该队列使用了两把锁分别控制放入和取出,大大提高了并发效率,下面将对它的源码进行详细解读。
🚩基本构造
阻塞队列本质上也可算是集合,因此最上层也实现了Collection接口,并提供一些集合的常用方法,AbstractQueue 是提供Queue 的基本实现,我们重点关注Queue 、BlockingQueue 提供的api。
Queue 类提供最基本的API
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
BlockingQueue 提供阻塞操作相关API
void put(E e);
boolean offer(E e, long timeout, TimeUnit unit);
E take();
E poll(long timeout, TimeUnit unit);
int remainingCapacity();
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
📖核心源码解读
LinkedBlockingQueue 分别使用了一个读锁和一个写锁来控制并发,并使用Condition 来控制他们的执行过程
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
put 方法
将元素插入队列,如果队列没有可用空间则等待
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
此处signalNotEmpty(); 就是通知被阻塞的读线程(如take/poll方法),队列里有数据了,赶紧消费
poll/take 方法
poll 查看头部元素,队列为空异常
take 移除并返回头部元素,如果没有可用则等待
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
此处signalNotFull(); 是通知阻塞的写入线程(如put/offer),表示队列没满,可以写入
take逻辑与poll类似,只是等待策略不相同,take方法如下
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
drainTo 方法
从队列中取出全部的元素并插入到指定集合中
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
int n = Math.min(maxElements, count.get());
Node<E> h = head;
int i = 0;
try {
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
if (i > 0) {
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}
remove 方法
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
注意一下此处的加锁逻辑
void fullyLock() {
putLock.lock();
takeLock.lock();
}
可以看到,remove方法会将读写锁都上锁,并且会扫描整个链表,时间复杂度为O(n)+悲观锁 。
一般情况下不建议使用remove方法,该方法性能较差,会阻塞所有核心逻辑。
??注意事项
使用LinkedBlockingQueue 时要额外注意影响性能的方法
如:remove /contains /toArray /toString /clear
以上方法的时间复杂度均为O(n)+悲观锁 ,如非必要最好不要使用
🚗应用场景
LinkedBlockingQueue本质上就是个内存级队列,它同样可以达到削峰填谷的目的,使用得当可以给系统减轻不小的压力。
- 调度外部服务,防止调用过于频繁,可以放入队列中,等待消费,并用
drainTo 归集然后统一请求。 - 令牌桶,可以通过产生令牌和分发令牌的方式控制业务/接口最大并发。
- 使用对象池化技术来减轻jvm回收的压力,将池化对象放入队列中。
下面使用LinkedBlockingQueue实现一个对象池,使用对象池可以防止频繁创建/回收对象,减少gc次数,池化对象长期存储在老年代中,对象数量可控
ResourcePool 对象池抽象类,实现该类就能初始化一个对象池
public abstract class ResourcePool<T extends ResourceModel> {
private final LinkedBlockingQueue<T> queue;
public ResourcePool(int poolMax) {
queue = new LinkedBlockingQueue<>(poolMax);
for (int i = 0; i < poolMax; i++) {
T model = createResource();
model.pool = this;
model.invalid = true;
queue.add(model);
}
}
public T getResource() {
try {
do {
T t = queue.take();
if (t.invalid) {
t.invalid = false;
return open(t);
}
} while (true);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
protected T open(T t) {
return t;
}
protected abstract T createResource();
public void free(T t) {
if (!t.invalid) {
t.invalid = true;
queue.offer(close(t));
}
}
protected T close(T t) {
return t;
}
}
ResourceModel 抽象对象
public abstract class ResourceModel implements Closeable {
ResourcePool pool;
boolean invalid;
@Override
public void close() throws IOException {
pool.free(this);
}
}
TestModel 对象实例
@Setter
public class TestModel extends ResourceModel {
public TestModel(String name, int age) {
this.name = name;
this.age = age;
}
private String name;
private int age;
}
TestPool 对象池实例
public class TestPool extends ResourcePool<TestModel> {
public TestPool(int poolMax) {
super(poolMax);
}
@Override
protected TestModel createResource() {
return new TestModel("", 0);
}
@Override
protected TestModel open(TestModel testModel) {
return super.open(testModel);
}
@Override
protected TestModel close(TestModel testModel) {
testModel.setAge(0);
testModel.setName("");
return super.close(testModel);
}
}
使用方式1
public static void main(String[] args) throws IOException {
TestPool testPool = new TestPool(30);
TestModel model = testPool.getResource();
model.close();
}
使用方式2
public static void main(String[] args) throws IOException {
TestPool testPool = new TestPool(30);
try(TestModel model = testPool.getResource()) {
}
}
|