IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> ConcurrentHashMap(JDK1.8)的扩容方法transfer的源码分析 -> 正文阅读

[Java知识库]ConcurrentHashMap(JDK1.8)的扩容方法transfer的源码分析

目录

多线程扩容问题

????????这篇文章是对上篇文章ConcurrentHashMap的put过程做补充,在上篇文章末尾只是简单的提及了一下,没有深入分析扩容的细节;在put过程中,有2处地方会触发扩容的情况:

  • 在put完成之后,更新元素个数时发现元素个数已经超过扩容阈值sizeCtrl,这个时候就会触发扩容(addCount方法);
  • 在链表超过8,但是数组长度 小于 64时,不会将链表转换成红黑树,而是会选择扩容数组(tryPresize);

回到正文多线程环境下扩容与单线程环境的扩容有什么不同?

相同点:

  • 1.首先都需要新建一个数组用于扩容后的新容器;
  • 2.将现容器的数据迁移到新的容器中;

不同点:
????????在单线程中所有的操作都是只有一个线程按顺序操作,而多线程则可能同时有多个线程操作同一件事;如果按照单线程的做法对扩容过程不加限制,会产生很多问题;比如在单线程中:创建新数组的操作,在多线程中旧可能出问题;如果多个线程同时扩容就可会创建多个数组;

????????在ConcurrentHashMap扩容时要解决以下问题:

  • 多线程环境下,可能多个线程在put完成之后,同时发现元素个数已经超过sizeCtrl触发扩容;那么这个时候就必须保证只有一个线程去新建新数组;
  • 我们知道在ConcurrentHashMap中是多个线程同时扩容的,那么如何协调多个线程同时扩容;
  • 扩容时由多个线程共同完成,那如何确保数据全部迁移完成?

ConcurrentHashMap扩容过程

在ConcurrentHashMap中多个线程同时扩容时如何协同扩容呢?答案是:每个线程负责一部分固定的区域。

扩容的源码比较多,因此先对扩容流程有一个整体的印象;再读源码;

1.ConcurrentHashMap扩容时新建数组

	//tab :当前数组;
	//nextTab:新数组;
	//n:当前数组的长度
	//stride:每个线程负责迁移的区域长度
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //计算出每个线程负责迁移数据的区域长度;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
            
        if (nextTab == null) {  //新数组为null,新建一个数组用于迁移后数据的新容器;
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩容2倍
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
            //原数组扩容之后,超过int最大正数值;就不扩容了
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            //nextTable :是concurrentHashMap的成员变量;
            //在扩容时,由创建新数组的线程初始化这个变量;扩容完成后,将其设置为null;
            nextTable = nextTab;
            //是concurrentHashMap的成员变量;记录下一个线程迁移数据的起始位置;transferIndex-1就是下一个线程迁移数据的起始下标;
            transferIndex = n;
         }

            ....
            ....
            ....
            
        }

1.1 每个线程负责的数据迁移区域的长度:stride

????????在ConcurrentHashMap中每个线程的数据迁移区域长度stride 不是一个固定值,stride 的值是根据:数组长度和cpu的个数决定的;

//MIN_TRANSFER_STRIDE =16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // 16




stride计算分2步:

  • (NCPU > 1) ? (n >>> 3) / NCPU : n ;计算出stride的值
  • 与默认值比较;小于默认值使用默认值;大于默认值使用计算出的值;

1.2 关于transferIndex的说明

????????我们要确定一个线程的数据迁移区域,一个长度是不行的;还必须要知道一个数据迁移的起始的位置,这样才能通过:起始位置+长度;来确定迁移的范围;而transferIndex 就是确定线程迁移的起始位置;每个线程的起始位肯定都不同,因此这个变量会随着协助扩容的线程增加而不断的变化;

????????ConcurrentHashMap中,分配区域是从数组的末端开始,从后往前配分区域,第一个线程起始迁移数据的下标:transferIndex -1(数组最大下标),分配区域:[transfer-1,transfer-stride],第二线程的起始位置:(transferIndex - stride-1),分配区域:[transferIndex - stride-1,transferIndex - 2 * stride],依次类推。

2.ConcurrentHashMap扩容时获取迁移数据区域

????????当有新线程协助扩容时首先要获取到一个起始位置才能确定负责迁移数据的范围。ConcurrentHashMap是如何处理的?因为需要考虑到多个线程同时竞争同一个起始位置,因此要使用CAS竞争让线程抢位置,竞争失败的线程则进入循环下一次继续尝试获取一个起始位置;【在ConcurrentHashMap中大量的使用了CAS来修改变量值,如果关于CAS还有疑惑,可以看这篇文章中关于CAS的部分:关于CAS

			.....
			.....
			.....
			.....

		transfer中,之前分析过的代码不再贴出。
	****************************************************************************************

        int nextn = nextTab.length;
        //扩容的标志对象,先准备好;每迁移完table数组的一个位置就将该对象,插入到该位置
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                //获得了起始位置之后,会将i的值修改为线程迁移数据的起始位置
                //bound是线程迁移数据的结束位置;
                //finishing是迁移数据结束后的标志
                if (--i >= bound || finishing)
                    advance = false;
            //当同时由多个线程,协助扩容时,可能被前面的线程将区域抢完了;导致后面线程没法获取迁移区域,
           //这个时候这些线程就不用协助扩容,只要等这些线程迁移完数据;
           
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //CAS竞争获取起始位置;竞争成功,修改transferIndex的值;
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                      //transferIndex修改之前的值是迁移数据的起始位置,修改之后的值是结束位置;也作为下一个线程的迁移数据的起始位置;
                    bound = nextBound;//nextBound修改后的:transferIndex,下一个线程的起始位置
                    i = nextIndex - 1;//nextIndex 修改前的transferIndex;数据迁移的起始下标
                    advance = false;
                }
            }
			.....
			.....
			.....
			.....
			

CAS 竞争位置,竞争成功之后由变量 i,bound 记录当前线程负责迁移数据的区域

在这里插入图片描述

2.1 总结

????????每个线程迁移数据时只能迁移获取到区域的数据:按顺序遍历的将获取到的区域: [i,bound]上每一个位置的数据完全迁移;但这里要注意下:i > bound ,迁移时顺序是 : i -> bound;

????????通过transferIndex 判断还有没有空闲的区域;transferIndex = 0 ;表示没有空闲区域了,所有区域都有线程负责迁移数据;

????????如果参与参与扩容的线程较多,那么可以将不同区域分配给不同线程;但如果参与扩容线程较少,那么线程完成了分配区域的数据迁移之后,获取下一块区域继续迁移数据,直到数据迁移完为止(为了保证在参与扩容线程很少时,也能将数据完全迁移);

  • 有较多线程参与扩容时:
  • 参与扩容线程较少

在这里插入图片描述

3. 判断数据迁移是否结束


			
			//进入到这个分支,表示数据迁移结束,扩容完成;	
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {//迁移结束
                    nextTable = null;//将nextTable清空;
                    table = nextTab;//新数组替换给table;完成扩容
                    sizeCtl = (n << 1) - (n >>> 1);//新数组长度nextn的0.75倍:0.75 * nextn=1.5 *n;
                    return;
                }
                //对于sizeCtrl:每当有线程协助扩容时:sizeCtrl += 1;
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }

3.1 每个线程完成自己区域内的数据迁移的判断条件

  • i < 0;
  • i >= n;
  • i + n >= nextn;

对第一个条件 i < 0; 满足这个条件的有3种情况:

  • 被分配到[0,stride]区域的线程,在完成数据迁移后,i 自减到 -1;

  • 没有分配到 [0,stride] 区域的线程完成了分配区域的数据迁移之后,数组中没有空闲区域需要线程来迁移数据;这个时候会将i设置为-1,进入到这个分支;

  • 没有分配到迁移区域的线程;也会将i设置为-1;

这三种情况都可以看作:当线程完成了分配区域内的数据迁移就会将 i 设置 为 -1;

关于判断条件 i >= n 和 i + n >= nextn 的疑惑

  • 对于后面2个判断,感到有点疑惑;n = 旧数组的长度; nextn = 新数组长度; ConcurrentHashMap中,数组每次扩容都是2倍;因此:nextn = 2 * n;这就可以看出 i >= n 和 i + n >= nextn;是完全等价的;相当于一个条件,没有必要同时写这2个判断;

  • 关于 i :取值范围是旧数组的下标区间[0,n-1];i 的最大取值也就是(n-1) ;在这个分支内部设置了 i = n ; 即使是在这里将i 的值设置为n,但是进入下一次循环的时候,i自减1,到这个分支也不会出现 i = n 的情况,更没有 i > n 的情况出现;因此对什么时候出现 i >= n 的情况暂时还没想到;

3.2如何判断整个旧数组中的数据有没有迁移完

?? ConcurrentHashMap中为了确定旧数组的数据被完全的迁移了,让最后一个完成数据迁移的线程在完成迁移之后,再重新遍历检查整个数组;遍历从后往前遍历 i : (n-1) -> 0 ; 在遍历过程中,发现有位置上的数据没有迁移,就迁移数据;这样当遍历结束之后,就可以确定整个数组的数据已经被迁移完了;

那具体是怎么做的呢?我们继续往下分析。

??进入这个分支后,内部第一个分支的判断条件只有一个标志量:finishing=true; 表示整个数组的数据已经全部迁移完;这部分比较简单,数据迁移完之后更新了以下几个变量的值:

  • nextTable=null,因为已经完成数据;
  • 将扩容后的数组nextTab赋值给table;
  • 设置扩容阈值 sizeCtrl = 0.75 * n;

??要看懂第二个分支里对sizeCtrl的处理就要了解在线程参与扩容前对sizeCtrl做了哪些处理?在线程扩容时对sizeCtrl的值做的处理:

  • 创建扩容数组的线程,将sizeCtrl 的值设置为:sizeCtrl = ( resizeStamp(n) << RESIZE_STAMP_SHIFT ) + 2 ;
  • 其余参与扩容的线程:每当有线程参与协助扩容 :sizeCtrl += 1;

??在上节分析过:进入该分支的线程,都已经完成了该线程所分配区域的数据迁移,并且此时旧数组中,没有还未分配的区域;也就是说所有区域都分配完了,自己区域内的数据也迁移完成;那么在完成了数据迁移之后,设置: sizeCtrl -= 1;表示有一个线程完成了数据迁移;对于判断条件:(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT;

?? 我们将公式稍作变形:sc != (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 不等式的右边这正是创建扩容数组时给sizeCtrl的初始值;如果相等,那就说明这是最后一个线程;

在这第二个分支内做的处理:

  • 如果不是最后一个退出扩容的线程,就直接退出扩容;
  • 如果是最后一个退出扩容的线程:i = n ,finishing = true 扫描全表,检查是否有没被迁移的数据,如果有就将其迁移到新数组检查完整个数组之后,将table更新为新数组:nextTab完成扩容然后退出;

    ?? 这里有一个疑问:既然每个线程负责一个区域,那么当最后一个线程完成了数据的迁移,所有的数据应该已经都迁移完,没有必要再检查一遍了。那为什么还要用最后一个退出线程去检查遍历整个数组呢?(关于这个疑问会在后面讨论。)
    ??

4.ConcurrentHashMap的数据迁移

4.1 ConcurrentHashMap数据迁移的原理

?? 在讲这之前要搞清楚迁移数据时要解决的问题:同一条链表(红黑树)的节点在数组扩容之后可能不在同一条链表(红黑树)上;因此不能直接将链表头节点(红黑树root节点)迁移到新数组来完成这条链表(红黑树)的迁移。

?? 在ConcurrentHashMap中,数组table的长度为2n大小,这个设计真的很方便扩容;扩容后节点的下标只有两种情况:下标的值不变 或者 下标值:i + n; (n扩容前数组长度)

??例子:

?? 我们看到同一个 node 扩容后:下标只有最高位有变化,比之前多了一位不确定 0 或 1 的高位,其余位置不变;最高位 x 只有2个值,因此 i 的下标也只能有2种取值:

  • 高位 x = 0;下标不变
  • 高位 x = 1;下标:i + n(i: 扩容前下标,n:扩容前数组长度)

??在ConcurrentHashMap扩容时,线程在数据在迁移时:一条链表最多可能分化为 2条Node链表;因此在迁移链表时就只需要构造2条链表即可转移该链表所有的节点;

??对于红黑树来说也是一样的:在同一棵红黑树上的节点对应了同一个数组位置,在扩容的时候,红黑树节点也只会有2个位置可以选择;因此红黑树也会分成 2条TreeNode链表;一个在原位置,一个在:i + n位置;你或许会有疑问,明明是红黑树,怎么会分成2条链表?在由Node链表 -> 红黑树时;首先是将Node链表 -> TreeNode链表;然后将TreeNode链表 -> 红黑树;在转换成红黑树之后,保留了TreeNode链表的头节点;也就是说,TreeNode同时保存2种结构:

  • TreeNode的链表结构,可以通过表头以及TreeNode中的next属性遍历整个链表;
  • TreeNode的红黑树结构,可以通过root根节点以及左右子节点遍历整颗红黑树;

?? 将TreeNode当成链表来看,就只关注TreeNode中的next属性,以表头first节点开始,通过next属性就可以遍历完整个TreeNode链表;

?? 将TreeNode当成红黑树来看就不能关注next节点,要关注它的left,right子节点以及parent父节点这些属于树的属性;搜索方法也是依靠二叉搜索树从root节点通过左右子节点来遍历整颗红黑树;

最后还有一个问题:当红黑树put值时,怎么解决既在链插入又在红黑树中插入?
  • 在链表上插入体现为:能成为链表中一个节点的next节点,或者链表中一个节点的前驱节点;
  • 在红黑树的插入体现为:是成为某个红黑树节点的left/right节点;==》关键就是找到父节点;

?? 在ConcurrentHashMap中,TreeNode链表的头节点是记录在TreeBin中的,因此它使用了非常简单高效的做法,直接将新插入的节点当作表头,将原头节点当作新表头的next节点;对于红黑树找父节点,就按照二叉搜索树的插入方法去找到parent节点即可;

?? 最后,我们在迁移红黑树时,使用TreeNode的链表结构来遍历TreeNode链表,同时构建2个TreeNode链表来迁移数据;在将TreeNode链表遍历完之后,分别判断2条链表的长度,来决定应该重新构建红黑树还是将TreeNode转换成Node链表;最后再分别将红黑树分裂形成的2个红黑树TreeBin或者是链表表头Node添加到新数组中;

TreeNode的结构:

    static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;
        //next属性继承Node
        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
....
....
....
}

4.2 ConcurrentHashMap数据迁移的源码分析

4.2.1 Node链表迁移的源码分析

						//f = table[i];
						//fh = f.hash;
						//n = table.length;(旧数组长度)
						//ln:所在的链表的位置,没变还是在下标:i
                        //hn:链表所在位置:i+n;
						Node<K,V> ln, hn;
                        if (fh >= 0) {//Node的hash值不为负数
                        //只有0 和n两种取值;结果是0,表示扩容后位置不变;结果不为0,表示扩容后位置:i+n;
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            //循环遍历旧数组中,table[i]所在链表遍历链表找出lastRun
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                           
                           //根据runBit的值,给ln,hn赋值;
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                           //遍历链表将节点分配到ln,hn中;
                           //新Node插入到表
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //遍历完旧数组的链表后,重新生成了ln,hn2条链表;
                            //ln链表插入到新数组的下标:i
                            setTabAt(nextTab, i, ln);
                            // hn插入到:i+n位置;
                            setTabAt(nextTab, i + n, hn);
                            //并将旧数组的i位置插入扩容标志节点:ForwardingNode对象;
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }

这里再简单提下:在ConcurrentHashMap中table数组可能存在三种Node节点

  • Node:普通node节点,表示链表;节点的hash值大于0;
  • ForWardingNode节点,扩容标志;hash = MOVE(-1);在这个对象中有nextTab对象,协助扩容时根据nextTab找到扩容后新数组;
  • TreeBin节点,表示红黑树;hash = TREEBIN(-2)在这个对象中存储了TreeNode链表表头first,以及红黑树根节点root;
4.2.1.1 ConcurrentHashMap中如何确定节点在哪一条链表

?? 在上一节中,分析到可以使用新数组的容量来计算出node节点迁移后的坐标;在ConcurrentHashMap迁移链表时分裂的2个新链表分别是:ln(下标:i),hn(下标:i+n);在判断node节点应该在哪条链表不是直接使用下标判断,而是利用扩容后下标位置有没有发生改变来判断;没有改变就在ln链表,位置改变了就在hn链表;这个时候直接使用扩容后的容量计算出下标就不能直接区分坐标到底是变了还是没有变;因此ConcurrentHashMap使用了另一种方法来判断扩容后下标变不变:fh & n ;这个方法可以直接计算出扩容后,node 坐标的位置的偏移量;

  • fh & n = 0;扩容后下标不变;
  • fh & n = n;扩容后下标位置改变(i + n);



4.2.1.2 lastRun和链表数据的迁移流程

?? lastRun是理解链表迁移的关键;要从整体来看lastRun,理解他在链表转移数据的过程有什么用;但是不好组织语言来描述;

?? 我们知道一条链表上的节点,在扩容之后会分成2条链表;而这条链表上有哪些节点扩容之后位置要改变的哪些节点位置是不改变的,这两种节点在链表上的分布是不确定的;我们可以将连续相同的节点看作是一段链表(一个节点也是一段链表),那么这条链表就可以看作2种链表组合组成的;而lastRun,是指向最后一段链表的头节点;再配合runBit(偏移量)就可以确定最后一段链表属于哪种节点;因此可以看到再次遍历数组时,遍历到lastRun就结束遍历;

??还是用一个例子,画出整个流程图就很好理解;

  • 1.遍历链表找出lastRun:
  • 2.根据runBit的值来确定lastRun属于哪一条链表;
    • runBit = 0;位置不变属于ln链表 :ln = lastRun;
    • runBit != 0; 位置改变属于hn链表 : hn = lastRun;
  • 3.重新遍历链表,将链表中的节点根据hash值计算出的偏移量选择插入的链表:ln,hn中;新插入的节点插在表头;当next指针指向与lastRun相同的对象时遍历结束,此时链表所有的节点都已经迁移到两个新链表中了;
  • 4.将ln,hn的值插入到新数组nextTab;将table[i] = ForwardingNode对象;

在这里插入图片描述

4.2.1.3 链表迁移的思考

ConcurrentHashMap对链表的数据迁移方法很巧妙,刚看完只感觉惊叹。同时想到如果是由我来写链表的数据迁移,我会怎么做呢?



在这里插入图片描述

??我应该会直接循环遍历链表,利用hash值计算出下标,由下标判断应该在哪条链表;

我感觉唯一不足的地方是:所有迁移的数据,都会利用Node的数据重新生成Node对象;为什么不能直接使用这些Node对象呢?少生成对象,能节约生成对象的时间,也能减少内存占用;
+++++++++++++++++++++++++++++++++++++++++++++++++++update+++++++++++++++++++++++++++++++++++++++++++++++++++++
??哎,还真想到了,对这个算法的优化。遍历链表之后只获取了最后一段有用信息lastRun,其余的都没利用到;因此还需要对链表再做一次遍历。我们上面提到过,原链表的节点上有2种节点:扩容后位置不变(i),扩容后位置改变(i+n);将连续相同的一类节点看作是一种链表,那么原链表可以看作是由2种链表组合而成;从表头开始遍历结合偏移量:runBit;我们又可以准确的判断一段链表的开头和结尾;我们有了这些信息之后就可以在遍历链表时将2类链表分开,分别拼接在一起;只用遍历一次链表即可;

在这里插入图片描述
代码实现:

    class Node<K,V>{
        int hash;
        K key;
        V val;
        Node next;
        
        public Node(int hash,K key ,V val,Node next){
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

    }

    /**
     *
     * @param head  原数组链表
     * @param n     table的长度
     */
    public void splitLinkedList(Node head,int n){
        Node lo=null , loTail = null;//扩容后下标不变 :i
        Node hi=null, hiTail = null;//扩容后下标变为 :i+n
        int runBit  = head.hash & n;//偏移量
        if(runBit == 0){
            lo = head;
        }else{
            hi = head;
        }
        for(Node p = head.next,prev = head ; p!=null ; p=p.next ){
            int pRunBit = p.hash & n;
            if(pRunBit != runBit){
                runBit = pRunBit;
                if(pRunBit == 0){//当前runBit = 0;与前面节点不同,说明前面的节点的runBit = n;
                //设置前一个链表片段的尾节点;prev是前一个链表片段的尾节点
                    hiTail = prev;
                    hiTail.next = null;
                    //与之前runBit = 0的链表拼接;
                    if(lo == null){
                        lo = p;
                    }else{
                        loTail.next = p;
                    }
                }else{//当前runBit = n;处理逻辑与 runBit =  0完全一样
                    loTail = prev;
                    loTail.next = null;
                    if(hi ==null){
                        hi = p;
                    }else{
                        hiTail.next = p;
                    }

                }
            }
            prev = p;

        }
        //打印测试
		System.out.println("lo链表扩容后位置不变:i\n");
        print(lo);
        System.out.println("\n+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n");
        System.out.println("hi链表扩容后位置:i+n\n");
        print(hi);

    }

    public void print(Node head){
         if(head ==null)return;
         if(head.next != null) {
             System.out.print("[hash="+head.hash+",("+head.key+","+head.val+")] ==> ");
             print(head.next);
         }else{
             System.out.print("[hash="+head.hash+",("+head.key+","+head.val+")]  ");
         }
    }


测试代码:

    @Test
    public void test(){
        //构造一条链表:偶数位置的节点的hash = 17是迁移的节点;奇数位置节点的hash =1位置不变
        //table.length = 16
        Node<Integer,Integer> head = new Node<>(1,1,1,null),tail = head,next;
        for (int i = 2; i <= 10; i++) {
            if( (i&1)== 0){//偶数
                 next = new Node<>(17,i,i,null);
            }else{
                next = new Node<>(1,i,i,null);
            }
            tail.next = next;
            tail = next;
        }
		 System.out.println("原链表:");
         print(head);
        System.out.println("\n+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n");
        splitLinkedList(head,16);

    }

测试结果:

原链表:
[hash=1,(1,1)] ==> [hash=17,(2,2)] ==> [hash=1,(3,3)] ==> [hash=17,(4,4)] ==> [hash=1,(5,5)] ==> [hash=17,(6,6)] ==> [hash=1,(7,7)] ==> [hash=17,(8,8)] ==> [hash=1,(9,9)] ==> [hash=17,(10,10)]  
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

lo链表扩容后位置不变:i

[hash=1,(1,1)] ==> [hash=1,(3,3)] ==> [hash=1,(5,5)] ==> [hash=1,(7,7)] ==> [hash=1,(9,9)]  
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

hi链表扩容后位置:i+n

[hash=17,(2,2)] ==> [hash=17,(4,4)] ==> [hash=17,(6,6)] ==> [hash=17,(8,8)] ==> [hash=17,(10,10)]  


?? 其实这种优化对Node链表来说并没有什么用,因为Node链表的长度小于8,长度太短遍历一次两次耗时不会差距太大;并且由于有 lastRun 存在,第二次遍历到 lastRun 指向的位置就会停止遍历,更进一步减少了时间的差距;我觉得真正的优化在于,没有利用原链表数据重新生成对象,这样既减少对象生成时间又少占用了内存空间;

4.2.2 红黑树迁移的源码分析

?? 红黑树的迁移逻辑要比链表的迁移逻辑要简单得多(PS:这才是正常人的逻辑);没那么多弯弯绕绕的逻辑,就是一把梭~在上面 数据迁移的原理部分,讲到过红黑树的TreeNode节点其实也维护了一个链表的结构;因此迁移时直接使用TreeNode的链表结构,迁移逻辑就会很简单了;
??
?? 与Node链表的迁移不同;红黑树的迁移是直接循环遍历TreeNode链表,利用hash值计算偏移量来决定TreeNode应该放到哪个链表上;同时插入的位置是在表尾;因此可以看到源码中,一个链表由表头表尾共同维护;在遍历完整个TreeNode的节点之后,再判断TreeNode链表是否应该转换成红黑树,还是退化成Node链表;

源码:

                        else if (f instanceof TreeBin) {//判断是不是红黑树节点
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            //lo:TreeNode链表表头:下标位置不变:i
                            //loTail:表尾,每次插入节点都在表尾插入
                        
                            TreeNode<K,V> lo = null, loTail = null;
                            //hi:TreeNode链表表头:下标位置:i+n;
                            //hiTail :表尾,每次插入节点都在表尾插入
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {//偏移量 = 0插入loTail
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;//lo链表插入表尾
                                    loTail = p;//更新lo链表表尾
                                    ++lc;//记录lo链表节点个数
                                }
                                else {//偏移量 != 0插入hiTail
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;//hi链表插入表尾
                                    hiTail = p;//更新hi链表表尾
                                    ++hc;//记录hi链表节点个数;
                                }
                            }
                            //分别判断ln,hn节点个数是否满足生成红黑树的条件;
                            //不满足则将TreeNode链表转换成Node链表;
                            //满足则将TreeNode节点重新生成红黑树;
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                //将ln,hn分别设置到新数组的i,i+n位置
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            //在table[i]中设置ForwardingNode对象;
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }

流程:

  • 1.遍历TreeNode链表,利用hash值计算扩容后偏移量(hash & n);根据偏移量是否为0来选择将节点添加到哪个链表。新加入的节点放到链表表尾,同时统计链表元素个数;

在这里插入图片描述

  • 2.根据元素个数判断是否将链表转换成红黑树;
  • 3.将:lo,hi分别迁移到新数组nextTab的:i,i+n位置;在旧数组table被迁移数据的位置:i,设置一个ForwardingNode对象;

5. 扩容逻辑的整体分析

只保留整体大框架,省略掉细节部分:


        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            第一部分:线程获取迁移数据的区域
            while (advance) {
				  int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            第二部分:扩容结束的判断
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
			第三部分:判断table元素的节点类型;
            //如果table[i]=null,就将fwd对象填充到这个位置,表示正在扩容;
            //其他线程看到了会通过fwd中的信息找到nextTab协助扩容;
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
                //判断位置上的节点是否是ForwardingNode对象,如果是表示已经有线程迁移了该位置的数据
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
                第四部分:迁移数据;
            else {
            	//获取锁
                synchronized (f) {
                //获取到对象f的锁之后,会对比:获得锁前后f对象有没有花生改变,如果发生改变,就不处理
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
							处理链表
                        }
                        else if (f instanceof TreeBin) {
                        	处理红黑树

                        }
                    }
                }
            }
        }

整体过程可以分为以下几部分:

  • 获取数据迁移区域;
  • 结束扩容的判断;
  • 判断table[i]的类型:如果是null;就设置ForwardingNode对象
  • 判断table[i]的类型:如果是ForwardingNode对象则,迁移下个位置的数据
  • 迁移Node链表或红黑树的数据

5.1ConcurrentHashMap扩容引起的数据丢失问题的原因及解决办法

ConcurrentHashMap扩容导致丢失数据的原因

?? 在最后的部分,关于数据迁移:首先要获取f(table[i])的对象锁;获取到对象锁之后,会比较获取锁前后,table[i]的值f是否发生改变;为什么会做这个判断呢?因为在扩容期间,ConcurrentHashMap还是允许其他线程的:put,get,remove操作;其中put,remove操作,可能会更改table[i]的值;笔者认为原因有以下几点:

  • put操作导致:Node链表 -> 红黑树;
  • remove操作直接将table[i],remove掉了;
  • remove操作导致:红黑树 -> Node链表;

?? (PS:顺带一提:在remove操作完成之后会更新元素个数调用addCount方法,也就是说一个线程在remove时,也可能会遇到容器扩容的情况从而协助扩容;)

?? 回到正文继续分析:在f发生改变的情况下,ConcurrentHashMap不会迁移该位置的数据,而是会进入循环,i 自减 1 ;进入到下一个位置的数据迁移;在这种情况下,即使线程遍历完自己的区域[i,bound],但是并不能保证能将自己区域内的数据完全的迁移完;在上诉情况下有机率造成线程在迁移数据过程中发生数据丢失的情况;

ConcurrentHashMap是如何解决的呢?

?? 在分析扩容结束的部分有说到,会在最后一个退出的线程退出之前检查整个数组是否还有数据没有迁移。如果发现有还没有迁移的数据,会将数据迁移到新数组中;在检查完整个数组之后退出扩容;

?? 这也就能够解释,为什么要设置一个判断条件(fh = f.hash) == MOVED 来判断该位置数据是否已经迁移;因为在线程完成自己分配区域内的数据迁移时,是不会遇到这种情况的;每个线程都会按照 : i -> bound (i > bound)的顺序迁移自己区域内的数据;因此线程在负责自己区域内的数据迁移时,不会遇到数据已经迁移的情况;这个判断条件是给最后一个线程检查全数组用的;因为它要负责整个数组的检查,肯定大概率会遇到别的位置数据已经被迁移了;

最后一个线程一定扫描全数组吗?

?? 我们可以看看它时如何让最后一个线程检查全数组的。处理非常简单:将 i 设置为 n;我们前面分析过,一个线程负责[i,bound]的区域,当最后一个线程负责的区域是[stride,0],那么将i设置成n之后,这种情况线程是可以扫描全表的;如果最后一个退出的线程负责的区域:[i,bound]中bound不等于0,那么该线程不会不会扫描全表;这个时候可能会产生数据丢失的情况;要彻底解决这个bug,有两种方法:

  • 1.设置bound的值:在 i= n;之后添加 bound = 0;这样就能保证扫描全数组了;
  • 2.在获取f的锁之后,发现f已经被修改了;这个时候添加一个else分支,else{i++;};这样进入下次循环:i自减1,保证还是当前位置迁移数据;这样可以保证每个线程都能将自己区域内的数据全部迁移;也就不用最后一个线程全表扫描了;

synchronized (f) {
//对比获取锁前后对象 :f是否发生改变
	if (tabAt(tab, i) == f) {//没变
		//迁移数据
	}else{//f改变
		i++;//下次循环,i会自减1,因此下次循环还是该位置,继续迁移数据;
	}


}

?? 从理论上来说,有丢失数据的机率但实际要丢失数据还是挺困难的;要满足很严格的要求,因此在网上搜了下没有搜索到关于使用ConcurrentHashMap扩容时数据丢失的情况;但理论上确实存在这个问题。

?? 为什么说要丢失数据很难呢?原因可能有以下2点:

  • 我们知道,区域的分配是按顺序分配的,第一个得到区域线程,最先开始迁移数据;而最后一个得到区域的(也就是负责[stride,0])是最后开始迁移数据的;因为最后一个开始迁移数据,在时间上就已经慢了其他线程一步了;因此,越是往后获取到区域的线程,越是会靠后退出;即使不是最后获取区域的线程最后退出,也大概率会是靠后获取区域的线程最后退出,而这种靠后的线程,扫描的区域也会很大;因此,在大概率会扫描数组的大部分区域的情况下,遇到数据丢失的机率就会非常小;
  • 其他线程put,remove值时,刚好遇到线程要迁移这个位置的数据;并且put,remove操作导致:红黑树 ——> 链表;或者 链表 -> 红黑树;要同时满足这2种情况的概率是也是很低的;即使遇到了,还有可能被最后一个退出的线程扫描到;因为添加这么一个扫描机制,导致丢失数据的可能被进一步减少;

最后,画个流程图总结下:

在这里插入图片描述


??

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-06-18 23:18:09  更:2022-06-18 23:21:04 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/18 4:49:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码