SynchronousQueue
SynchronousQueue是一种特殊的BlockingQueue,它本身没有容量。先调put(e),线程会阻塞; 直到另外一个线程调用了take(),两个线程才同时解锁,反之亦然。对于多个线程而言,例如3个线程, 调用3次put(e),3个线程都会阻塞;直到另外的线程调用3次take()。
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
put方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
take方法
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
可以看到put和take方法都调用了transfer()方法,只是第一个参数不一样,如果是put(…),则第1个参数就是对应的元素; 如果是take(),则第1个参数为null。
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
transferer有TransferQueue(队列模式)和TransferStack(栈模式)两个实现类,所以transfer()方法有两个不同的实现
假设有3个线程调用put方法,然后有3个线程调用take方法 如果是公平模式(队列模式),则第1个调用put(…)的线程1会在队列头部,第1个到来的take()线程 和它进行配对,遵循先到先配对的原则,所以是公平的;如果是非公平模式(栈模式),则第3个调用 put(…)的线程3会在栈顶,第1个到来的take()线程和它进行配对,后put的线程先配对
TransferQueue
public class SynchronousQueue<E> extends AbstractQueue<E> implements
BlockingQueue<E>, java.io.Serializable {
static final class TransferQueue<E> extends Transferer<E> {
static final class QNode {
volatile QNode next;
volatile Object item;
volatile Thread waiter;
final boolean isData;
}
transient volatile QNode head;
transient volatile QNode tail;
transient volatile QNode cleanMe;
TransferQueue() {
QNode h = new QNode(null, false);
head = h;
tail = h;
}
}
}
TransferQueue是一个基于单向链表而实现的队列,通过head和tail 2个指针记录头部和尾部。初始化的时候,head和tail会指向一个item为空的Qnode节点。
3个线程分别调用put,生成3个QNode,连成队列。 来了一个线程调用take,会和队列头部的第1个QNode进行配对
配对后移出队列 put节点和take节点一旦相遇,就会配对出队列,所以在队列中不可能同时存在 put节点和take节点,要么所有节点都是put节点,要么所有节点都是take节点。
TransferQueue的transfer实现
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null)
continue;
if (h == t || t.isData == isData) {
QNode tn = t.next;
if (t != tail)
continue;
if (tn != null) {
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0)
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s))
continue;
advanceTail(t, s);
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) {
clean(t, s);
return null;
}
if (!s.isOffList()) {
advanceHead(t, s);
if (x != null)
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else {
QNode m = h.next;
if (t != tail || m == null || h != head)
continue;
Object x = m.item;
if (isData == (x != null) ||
x == m ||
!m.casItem(x, e)) {
advanceHead(h, m);
continue;
}
advanceHead(h, m);
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
TransferStack
TransferStack也是一个单向链表。不同于队列,只需要head指针就能 实现入栈和出栈操作。
static final class TransferStack extends Transferer {
static final int REQUEST = 0;
static final int DATA = 1;
static final int FULFILLING = 2;
static final class SNode {
volatile SNode next;
volatile SNode match;
volatile Thread waiter;
Object item;
int mode;
}
volatile SNode head;
}
链表中的节点有三种状态,REQUEST对应take节点,DATA对应put节点,二者配对之后,会生成一 个FULFILLING节点,入栈,然后FULLING节点和被配对的节点一起出栈。
一开始head是指向null的
3个线程调用3次put,入栈
线程4调用take,和栈顶的第1个元素配对,生成FULLFILLING节点,入栈
栈顶的2个元素同时出栈 TransferStack的transfer实现
E transfer(E e, boolean timed, long nanos) {
SNode s = null;
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) {
if (timed && nanos <= 0) {
if (h != null && h.isCancelled())
casHead(h, h.next);
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) {
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next);
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) {
if (h.isCancelled())
casHead(h, h.next);
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) {
SNode m = s.next;
if (m == null) {
casHead(s, null);
s = null;
break;
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
s.casNext(m, mn);
}
}
} else {
SNode m = h.next;
if (m == null)
casHead(h, null);
else {
SNode mn = m.next;
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}
|