IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 数据结构与算法 -> SynchronousQueue -> 正文阅读

[数据结构与算法]SynchronousQueue

SynchronousQueue

SynchronousQueue是一种特殊的BlockingQueue,它本身没有容量。先调put(e),线程会阻塞;
直到另外一个线程调用了take(),两个线程才同时解锁,反之亦然。对于多个线程而言,例如3个线程,
调用3次put(e),3个线程都会阻塞;直到另外的线程调用3次take()。

public SynchronousQueue(boolean fair) {
	//fair是否是公平的,公平先进先出(队列),不公平先进后出(栈)
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

put方法

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

take方法

public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

可以看到put和take方法都调用了transfer()方法,只是第一个参数不一样,如果是put(…),则第1个参数就是对应的元素; 如果是take(),则第1个参数为null。

abstract static class Transferer<E> {
        /**
         * Performs a put or take.
         *
         * @param e if non-null, the item to be handed to a consumer;
         *          if null, requests that transfer return an item
         *          offered by producer.
         * @param timed if this operation should timeout
         * @param nanos the timeout, in nanoseconds
         * @return if non-null, the item provided or received; if null,
         *         the operation failed due to timeout or interrupt --
         *         the caller can distinguish which of these occurred
         *         by checking Thread.interrupted.
         */
        abstract E transfer(E e, boolean timed, long nanos);
    }

transferer有TransferQueue(队列模式)和TransferStack(栈模式)两个实现类,所以transfer()方法有两个不同的实现

假设有3个线程调用put方法,然后有3个线程调用take方法
如果是公平模式(队列模式),则第1个调用put(…)的线程1会在队列头部,第1个到来的take()线程
和它进行配对,遵循先到先配对的原则,所以是公平的;如果是非公平模式(栈模式),则第3个调用
put(…)的线程3会在栈顶,第1个到来的take()线程和它进行配对,后put的线程先配对

TransferQueue

public class SynchronousQueue<E> extends AbstractQueue<E> implements
BlockingQueue<E>, java.io.Serializable {
// ...
    static final class TransferQueue<E> extends Transferer<E> {
        static final class QNode {
			volatile QNode next;
		    volatile Object item;
		    volatile Thread waiter;
		    final boolean isData;
		    //...
		}
		/** Head of queue */
        transient volatile QNode head;
        /** Tail of queue */
        transient volatile QNode tail;
        /**
         * Reference to a cancelled node that might not yet have been
         * unlinked from queue because it was the last inserted node
         * when it was cancelled.
         */
        transient volatile QNode cleanMe;

        TransferQueue() {
        	//没有item的空Qnode节点
            QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;
        }
	}
}

TransferQueue是一个基于单向链表而实现的队列,通过head和tail 2个指针记录头部和尾部。初始化的时候,head和tail会指向一个item为空的Qnode节点。

在这里插入图片描述

3个线程分别调用put,生成3个QNode,连成队列。
在这里插入图片描述
来了一个线程调用take,会和队列头部的第1个QNode进行配对

在这里插入图片描述
配对后移出队列
在这里插入图片描述
put节点和take节点一旦相遇,就会配对出队列,所以在队列中不可能同时存在 put节点和take节点,要么所有节点都是put节点,要么所有节点都是take节点。

TransferQueue的transfer实现

E transfer(E e, boolean timed, long nanos) {
            

            QNode s = null; // constructed/reused as needed
            //true为put false为take
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                //如果未初始化完毕,自旋等待
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin
				//当队列为空,或者当前线程和队列中元素是同一种模式
                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    //不一致读,再来一次
                    if (t != tail)                  // inconsistent read
                        continue;
                    //t不是真正的尾节点,设置尾节点
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                    	//把要新增的e加入尾部
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;
					//后移tail指针
                    advanceTail(t, s);              // swing tail and wait
                    //LockSupport.park或者LockSupport.parkNanos挂起线程
                    Object x = awaitFulfill(s, e, timed, nanos);
                    //以下是唤醒后的操作
                    //挂起的时间到了,并没有等到和它配对的线程,移除这个节点
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }
					//轮到了s节点
                    if (!s.isOffList()) {           // not already unlinked
                    	//s节点的前一个节点,是他加入进去时的tail,把s节点前移
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                	//两个模式不一样的时候,和队列的第一个元素配对
                    QNode m = h.next;               // node to fulfill
                    //不一致读自旋
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    //如果是put节点,则isData=true,item!=null;如果是take节点,则isData=false, item=null
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        //尝试配对,配对失败,出队列
                        //把自己的item x换成对方的item e,如果CAS操作成功,则配对成功
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }
					//配对成功,出队列
                    advanceHead(h, m);              // successfully fulfilled
                    //唤醒配对的mqnode的线程
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }

TransferStack

TransferStack也是一个单向链表。不同于队列,只需要head指针就能 实现入栈和出栈操作。

static final class TransferStack extends Transferer {
    static final int REQUEST = 0; //take节点
    static final int DATA = 1;  //put节点
    static final int FULFILLING = 2;
	static final class SNode {
		volatile SNode next; // 单向链表 
		volatile SNode match; // 配对的节点 
		volatile Thread waiter; // 对应的阻塞线程 
		Object item;
		int mode;	//三种模式
		//...
	}
    volatile SNode head;
}

链表中的节点有三种状态,REQUEST对应take节点,DATA对应put节点,二者配对之后,会生成一 个FULFILLING节点,入栈,然后FULLING节点和被配对的节点一起出栈。

一开始head是指向null的
在这里插入图片描述

3个线程调用3次put,入栈
在这里插入图片描述

线程4调用take,和栈顶的第1个元素配对,生成FULLFILLING节点,入栈
在这里插入图片描述

栈顶的2个元素同时出栈
在这里插入图片描述
TransferStack的transfer实现

E transfer(E e, boolean timed, long nanos) {
            
            SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) {
                SNode h = head;
                //如果和栈顶的snode是同一种模式
                if (h == null || h.mode == mode) {  // empty or same-mode
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                            //入栈
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                    	//线程挂起
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                 //不是同一种模式
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    //生成FULFILLING节点,入栈
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {
                            	//两个节点一起出战
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // help a fulfiller
                	//已经匹配了,出栈
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                        	//两个节点一起出栈
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }
  数据结构与算法 最新文章
【力扣106】 从中序与后续遍历序列构造二叉
leetcode 322 零钱兑换
哈希的应用:海量数据处理
动态规划|最短Hamilton路径
华为机试_HJ41 称砝码【中等】【menset】【
【C与数据结构】——寒假提高每日练习Day1
基础算法——堆排序
2023王道数据结构线性表--单链表课后习题部
LeetCode 之 反转链表的一部分
【题解】lintcode必刷50题<有效的括号序列
上一篇文章      下一篇文章      查看所有文章
加:2021-08-18 12:56:50  更:2021-08-18 12:57:09 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/28 17:12:34-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计