IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 数据结构与算法 -> ConcurrentHashMap1.8学习 -> 正文阅读

[数据结构与算法]ConcurrentHashMap1.8学习

在这里插入图片描述

目标

了解ConcurrentHashMap 工作原理。重点了ConcurrentHashMap的多线程下的扩容机制,TreeBin 、ForwardingNode 功能的认识,以及CAS和LockSupport的使用。

预备知识

重要属性

在这里插入图片描述

源码讲解

put 函数

在这里插入图片描述

public V put(K key, V value) {
        return putVal(key, value, false);
    }

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());//计算 hash值
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

putTreeVal

如果能够找到与要put进的元素的key "相等"的节点就返回,否则将放到红黑树上。与之前HashMap1.8 不同的是banlanceInsertion 之前需要上锁。


        /**
         * Finds or adds a node.
         * @return null if added
         */
        final TreeNode<K,V> putTreeVal(int h, K k, V v) {
            Class<?> kc = null;
            boolean searched = false;
            for (TreeNode<K,V> p = root;;) {
                int dir, ph; K pk;
                if (p == null) {
                    first = root = new TreeNode<K,V>(h, k, v, null, null);
                    break;
                }
                else if ((ph = p.hash) > h)
                    dir = -1;
                else if (ph < h)
                    dir = 1;
                else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                    return p;
                else if ((kc == null &&
                          (kc = comparableClassFor(k)) == null) ||
                         (dir = compareComparables(kc, k, pk)) == 0) {
                    if (!searched) {
                        TreeNode<K,V> q, ch;
                        searched = true;
                        if (((ch = p.left) != null &&
                             (q = ch.findTreeNode(h, k, kc)) != null) ||
                            ((ch = p.right) != null &&
                             (q = ch.findTreeNode(h, k, kc)) != null))
                            return q;
                    }
                    dir = tieBreakOrder(k, pk);
                }

                TreeNode<K,V> xp = p;
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    TreeNode<K,V> x, f = first;
                    first = x = new TreeNode<K,V>(h, k, v, f, xp);
                    if (f != null)
                        f.prev = x;
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                    if (!xp.red)
                        x.red = true;
                    else {
                        lockRoot();
                        try {
                            root = balanceInsertion(root, x);
                        } finally {
                            unlockRoot();
                        }
                    }
                    break;
                }
            }
            assert checkInvariants(root);
            return null;
        }

treeBin函数

treeBin函数主要是将普通Node节点转化为TreeNode节点,同时转化为红黑树。转化为红黑树的操作被隐藏在TreeBin 对象的构建过程。

private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }

TreeBin 构造函数

TreeBin 类继承自Node类,hash值为-2,代表该桶位上放的是一颗红黑树。它本身不存储键 与值,只是指向root,另外还维护一个读写锁,使得写线程等待读线程读操作,使其在重构红黑树之前完成读操作。

TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, null, null, null);
            this.first = b;
            TreeNode<K,V> r = null;
            for (TreeNode<K,V> x = b, next; x != null; x = next) {
                next = (TreeNode<K,V>)x.next;
                x.left = x.right = null;
                if (r == null) {
                    x.parent = null;
                    x.red = false;
                    r = x;
                }
                else {
                    K k = x.key;
                    int h = x.hash;
                    Class<?> kc = null;
                    for (TreeNode<K,V> p = r;;) {
                        int dir, ph;
                        K pk = p.key;
                        if ((ph = p.hash) > h)
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        else if ((kc == null &&
                                  (kc = comparableClassFor(k)) == null) ||
                                 (dir = compareComparables(kc, k, pk)) == 0)
                            dir = tieBreakOrder(k, pk);
                            TreeNode<K,V> xp = p;
                        if ((p = (dir <= 0) ? p.left : p.right) == null) {
                            x.parent = xp;
                            if (dir <= 0)
                                xp.left = x;
                            else
                                xp.right = x;
                            r = balanceInsertion(r, x);
                            break;
                        }
                    }
                }
            }
            this.root = r;
            assert checkInvariants(root);
        }

addCount函数

是元素已经放进table中,后需要进行增加size的操作,但是并发环境都cas对baseCount 增加1,必然会使得大量的线层cas失败,导致大量线程空转,一次只有一个线程设置成功。这里doug lea 借鉴了LongAdder 的思想,使得不同线程尽可能分布到不同的CounterCell中进行计数。所以后面我们统计整个容器的size的时候,就分为两部分 baseCount + CounterCells .

另外一个功能就是扩容,当check < 0的时候 不检查扩容,当 check <= 1 时 只检查是否是 未竞争的状态。check – if <0, don’t check resize, if <= 1 only check if uncontended
resizeStamp

/**
 * Returns the stamp bits for resizing a table of size n.
 * Must be negative when shifted left by RESIZE_STAMP_SHIFT.
 */
static final int resizeStamp(int n) {
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

该函数时再resize时候 被调用,返回的是一个 stamp bits .我们应该清楚2点

1、计算 table的大小 n 的前0个数 tmpResult

2、使得tmpResult 的第16个bit为1(注意RESIZE_STAMP_BITS 为常量16)

这样resizeStamp返回的值在左移16位时会为negative。

所以为什么要这么做呢?

在下面代码33 - 34 行 代码 中 为了确保在执行U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)后 sizeCtl 被设置为负数,并且sizeCtl前16位保留了我们正在对大小为多少的table进行扩容这样的信息。

所以当第一个线程扩容的时候会走到代码33- 34 行代码,使得sizeCtl设置为一个负数,标明有线程在扩容.

对于下面代码26 - 29行的条件 ,(sc >>> RESIZE_STAMP_SHIFT) != rs 指的是 当前sizeCtl 值 右移16 位后 如果与 resizeStamp()返回的不等 说明扩容不是对同一个table操作,直接放弃。

sc == rs + 1 指的是 如果扩容已经结束

sc == rs + MAX_RESIZERS 指的是 如果扩容已经达到最大线程数 无法帮助扩容

(nt = nextTable == null) nt == null 说明不在扩容时间,所以停止扩容 直接break

transferindex <= 0 也代表不需扩容帮助。

关于此处有bug 的问题 请参考https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427

addCount check 参数的作用其实是可以控制扩容的,比如当我们删除元素的时候就会调用addCount(-1,-1),这样就不需要扩容操作了。

    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
      
      //s = (baseCount + x )or sumCount()
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                      
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

fullAddCount 函数

此函数是在 CounterCells 为空 或者线程对应的counterCell 为空 或者 cas设置baseCount 失败的时候 被调用的,所以主要逻辑有CounterCells的初始化 和 真正的 +x的操作。该类改善了在多线程环境下并发修改baseCount的情况。如下37-61行代码,表示在该线程对应的CounterCell 创建时对应的位置已经不为空了,那么进行rehash 、cas 、甚至扩容CounterCell 的一系列措施 提高并发,注意CounterCells 不是无限扩容的,限制就是collide变量,只有在该变量为true时扩容才可能发生,然而可以观察41行代码中的条件,该行代码会在 其他线程已经修改CounterCells数组或者当时数组容量已经大于等于机器CPU核心数了 那么就没必要扩容。62-77 行是CounterCells 初始化的过程。78-79 这两行代码 是当有线程发现已经有线程在初始化了,那么它也不会闲着,而是会fall back尝试cas设置baseCount.

    // See LongAdder version for explanation
    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
      
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            if ((as = counterCells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = ThreadLocalRandom.advanceProbe(h);
            }
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

transfer 函数 即扩容函数

在这里插入图片描述
扩容如上图,简要来说就是并发扩容时会设置一个步长stride,这里我们假设为1,那么第一个扩容的线程A会从右向左进行元素的移动,再有其他线程比如B来时,会计算出自己的扩容区间,然后进行扩容,直到所有元素扩容完成。

下面代码 设置全局变量transferIndex = n (旧数组大小),然后对于一个线程来说,在经过23 - 40 行代码,一个步长的所有桶都会被该线程移动,其他线程执行此段代码也会得到自己应该执行的 分段,直到所有的桶都被移动完,即此时transferIndex <= 0,此时会走到代码50 - 55 行,执行此代码的可以是所有帮助扩容的代码,但是总会有一个执行扩容的线程会进行收尾工作 ,收尾工作对应代码的44 - 49 行 ,主要就是新table 设置,那么这个是如何做到呢?主要是通过sizeCtl 第一个发起扩容的线程会设置sizeCtl 为 一个数,之后每个帮助的线程都会将这个数+1,而在帮助完成之后-1,所以最后完成的一定会是将sizeCtl 设置为原来的数的那个线程,大致逻辑是这样。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating 说明是第一个开始扩容的线程
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;//设置从右开始向左 移动桶
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {//该fwhile循环 逻辑上主要是为每个线程 分配一段旧table 上 (bound,i】来让其完成转移工作,
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                  // 此时已经计算得该线程应该转移的桶位为(bound,i] 
                    advance = false;
                }
            }
          
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
          			//将空桶位上的元素放置fwd,标识该table正在扩容,不要轻举妄动
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;//ln 代表 低位 节点  hn代表高位节点
                        if (fh >= 0) {//代表该桶位上的节点是 Node节点 而不是TreeBin 
                            int runBit = fh & n;//只能是 0  或者1 意思是计算新参与运算的位 如果是1 那就为1 如果为 0 那就是0 
                            Node<K,V> lastRun = f;
                          //下面for循环 主要是找到在同一个桶位的最后一串
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                          //对于 ln hn 赋值
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                          //前面仍然相当于是一个优化, HashMap 1.8 同样可以看到同样的逻辑。然后对每个节点 一个个放置到相同的位置上
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {//此for循环是使得这桶位上的数据全部被初始化为一条链,我们之前说对于TreeNode 我们得从两个角度 第一个角度是红黑树 第二角度是双链表 而此步骤就是
                              //把双链表结构相关的指针先链接好
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;//hc 如果等于 0 说明现在这个结构t 直接可以整体移到相应新table的桶位上。
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;//lc 如果等于 0 说明现在这个结构 可以直接整体移到相应新table的桶位上。
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);//设置旧桶位 为 fwd 说明正在扩容 不要亲举妄动
                            advance = true;
                        }
                    }
                }
            }
        }
    }

helpTransfer 函数

该函数会在多个场景下调用,主要是增加 删除 时候。在判断要处理的桶位上的第一个元素的hash 如果为常量MOVED(-1),那么代表此时正扩容,此时可以帮助扩容执行 此函数。该函数的返回值为Node<K,V>[] 即新table ,所以帮助扩容完成之后便可以进行自己想进行的增加或者删除操作了。

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

get 函数


在这里插入图片描述

TreeBin 中的find方法

读写锁

TreeBin内部维护了一个自旋读写锁,lockstate 第一个bit 是写bit ,第二个bit是等待 bit,第三个比特是 读比特,对于读操作,读读操作是不互斥的,而写操作是互斥的。对于写锁,我们在putTreeVal 中balanceInsertion 之前 lockRoot() 会上写锁,尝试将TreeBin 的属性lockstate的写 bit设置为1。

// Acquires write lock for tree restructuring
private final void lockRoot() {
            if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
                contendedLock(); // offload to separate method
}
private final void contendedLock() {
            boolean waiting = false;
            for (int s;;) {
                if (((s = lockState) & ~WAITER) == 0) {//如果lockstate 为0 或者 lockstate 为 WATITER (2)
                    if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {//尝试获取写锁
                        if (waiting)
                            waiter = null;
                        return;
                    }
                }
                else if ((s & WAITER) == 0) {//如果没有线程在等待写则 将自己设置为等待的线程
                    if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
                        waiting = true;
                        waiter = Thread.currentThread();
                    }
                }
                else if (waiting)//将自己park 
                    LockSupport.park(this);
            }
        }

下面5-10行代码可以看出在有线程持有TreeBin 的写锁或者等待时,有读操作到来时,红黑树回退化为链表进行遍历读,否则就给lockstate 增加READER,代表此线程在读,在红黑树读完之后,将增量减回去,并且发现有线程在等待写的话则进行唤醒操作(17-22行)。

final Node<K,V> find(int h, Object k) {
            if (k != null) {
                for (Node<K,V> e = first; e != null; ) {
                    int s; K ek;
                    if (((s = lockState) & (WAITER|WRITER)) != 0) {//说明有其他线程在等待写锁或者持有写锁,即在红黑树上左平衡操作 那么就不要用红黑树遍历 退化为普通遍历
                        if (e.hash == h &&
                            ((ek = e.key) == k || (ek != null && k.equals(ek))))
                            return e;
                        e = e.next;
                    }
                    else if (U.compareAndSwapInt(this, LOCKSTATE, s,
                                                 s + READER)) {// == 0 说明没有线程持有写锁或者等待写锁,读锁可以叠加, 设置将s 增加 READER 成功说明该线程持有读锁,
                        TreeNode<K,V> r, p;
                        try {
                            p = ((r = root) == null ? null :
                                 r.findTreeNode(h, k, null));//红黑树上搜索
                        } finally {
                            Thread w;
                            if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                                (READER|WAITER) && (w = waiter) != null)//注意这里是getAndAddInt 我们进入此代码块的时候是没有写锁或者是等待的线程,此时如果有等待的话需要进行唤醒
                                LockSupport.unpark(w);
                        }
                        return p;
                    }
                }
            }
            return null;
        }

ForwardingNode 的find方法

Node<K,V> find(int h, Object k) {
            // loop to avoid arbitrarily deep recursion on forwarding nodes 使用循环 避免多次遇到ForwardingNode 形成深递归
  							
            outer: for (Node<K,V>[] tab = nextTable;;) {//在新数组上查找
                Node<K,V> e; int n;
                if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                    return null;
                for (;;) {
                    int eh; K ek;
                  //检查第一个节点
                    if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                    if (eh < 0) {
                        if (e instanceof ForwardingNode) {
                            tab = ((ForwardingNode<K,V>)e).nextTable;
                          //遇到ForwardingNode 相当于一次递归
                            continue outer;
                        }
                        else
                          //treeBin 节点调用
                            return e.find(h, k);
                    }
                  
                    if ((e = e.next) == null)
                        return null;
                }
            }
        }

总结

整体结构上使用了数组+链表+红黑树,放弃了1.7Segment 分段锁,扩容时直接对整个数组进行。

初始化时,会根据loadFactor预估出更加合理的初始容量,避免不必要的扩容。

锁的粒度更细,对桶位上的第一个节点加锁。

计算元素总数使用CounterCell数组,进行极端情况下的优化。

扩容时,其他线程可以帮助扩容。

节点可分为普通节点、TreeBin节点、ForwardingNode.

ForwardingNode 标识扩容移动元素,内部有find函数,在扩容时仍可接受get,只不过是在新表上查询。

TreeBin 标识桶位树化,管理红黑树,比如插入、删除、查找等。内部维护lockstate 来控制红黑树的读写操作。

ConcurrentHashMap put 为尾插法,当扩容移动桶位上的元素时候时头插法,因为移动的时候首先找了桶位上的最后一串能放在新数组上相同位置的节点,之后又从从开始遍历。

ConcurrentHashMap1.8 初始化使用了懒加载方式,改变了1.7初始化时就创建Segment数组缺点,有效避免初始化开销。

  数据结构与算法 最新文章
【力扣106】 从中序与后续遍历序列构造二叉
leetcode 322 零钱兑换
哈希的应用:海量数据处理
动态规划|最短Hamilton路径
华为机试_HJ41 称砝码【中等】【menset】【
【C与数据结构】——寒假提高每日练习Day1
基础算法——堆排序
2023王道数据结构线性表--单链表课后习题部
LeetCode 之 反转链表的一部分
【题解】lintcode必刷50题<有效的括号序列
上一篇文章      下一篇文章      查看所有文章
加:2021-08-05 17:36:08  更:2021-08-05 17:37:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/28 1:41:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计