IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 数据结构与算法 -> ConcurrentHashMap源码注释(jdk1.8) -> 正文阅读

[数据结构与算法]ConcurrentHashMap源码注释(jdk1.8)

本文适用于了解过ConcurrentHashMap部分原理,但又没仔细研究过源码的同学!并且本文不涉及红黑树相关的操作,能力有限,掌握的不是很好。在看源码时,可以注意到Doug lea的编码风格,就是喜欢在if判断里进行一些属性的赋值,这一点一定要记清楚了,不然有些局部变量在哪赋值的都搞不清。

重要属性

//散列表数组最大限制
private static final int MAXIMUM_CAPACITY = 1 << 30;

//散列表默认大小
private static final int DEFAULT_CAPACITY = 16;

//并发级别,jdk1.7遗留。1.8只在初始化时使用过,不代表并发级别
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//负载因子,固定值(不可修改)
private static final float LOAD_FACTOR = 0.75f;

//树化阈值,指定桶位链表长度达到8,有可能发生树化操作
static final int TREEIFY_THRESHOLD = 8;

//链化阈值,从红黑树转为链表
static final int UNTREEIFY_THRESHOLD = 6;

//联合TREEIFY_THRESHOLD控制桶位是否树化,只有当散列表长度达到64,
//某个桶位中的链表达到8时,才会树化
static final int MIN_TREEIFY_CAPACITY = 64;

//线程迁移数据最小步长,控制线程迁移任务最小区间一个值
private static final int MIN_TRANSFER_STRIDE = 16;

//节点的hash值表示的不同含义
static final int MOVED     = -1; // hash for forwarding nodes  FWD节点
static final int TREEBIN   = -2; // hash for roots of trees    TreeBin代理节点

//散列表,长度为2的次方数
transient volatile Node<K,V>[] table;

//扩容过程中,会将扩容中的新table赋值给nextTable保存引用,扩容结束后设置为null
private transient volatile Node<K,V>[] nextTable;

//极其重要的一个属性!!!
//1. -1 表示当前table正在初始化(有线程在创建table数组),当前线程需要自旋等待
//2.!=-1 && <0 表示当前table数组正在进行扩容,高16位表示扩容标识戳,低16位
//  表示当前有(n+1)线程在参与并发扩容
//  扩容标识戳是指16->32,32->64...每个阶段都有不同的标识戳,如果现在正在执行
//  32->64的扩容,带有16->32扩容标识戳的线程是无法参与并发扩容的。
//3. 0 表示创建table数组时,使用DEFAULT_CAPACITY为容量
//4 >0
//  table未初始化时,表示初始化大小
//  table已经初始化,表示下次扩容时的触发阈值
private transient volatile int sizeCtl;

//扩容过程中,记录当前进度。所有线程都需要从transferIndex中分配区间任务,去执行自己的任务
private transient volatile int transferIndex;

//LongAdder中的baseCount,未发生竞争时或者当前LongAdder处于加锁状态时,增量累加到baseCount中
private transient volatile long baseCount;

//LongAdder中的cellsBusy
//0表示当前LongAdder对象无锁
//1表示加锁
private transient volatile int cellsBusy;

/**
  * LongAdder中的cells数组,当baseCount发生竞争后,会创建cells数组,
  * 线程会通过计算hash值取到自己的cell,将增量累加到指定的cell中
  * 总数=sum(cells)+baseCount
*/
private transient volatile CounterCell[] counterCells;

重要的辅助方法

tabAt

//UnSafe获取桶位头节点
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

casTabAt

//cas去table指定位置设置值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

setTabAt

//在指定位置设置值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

spread

/**
  * 扰动函数,原值右移16位再和原值异或
  *      h=1100 0011 1010 0101 0001 1100 0001 1110
  * h>>>16=0000 0000 0000 0000 1100 0011 1010 0101
  * h^(h >>> 16)=1100 0011 1010 0101 1101 1111 1011 1011
  * HASH_BITS=0111 1111 1111 1111 1111 1111 1111 1111
  * (h ^ (h >>> 16)) & HASH_BITS=0100 0011 1010 0101 1101 1111 1011 1011
  */
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

tableSizeFor

/**
  * 返回>=c的最小2次方数
  * c=28
  * n=27=> 11011
  * 11011 | 01101 => 11111
  * 11111 | 00111 => 11111
  * ....
  * 11111+1=100000=32
  */
 private static final int tableSizeFor(int c) {
     //为什么此处要减1?
     //假如不减1  c=8 => 1000    1000 | 0100 = 1100   1100 | 0011 = 1111=15
     //此时会计算出容量为16
     int n = c - 1;
     n |= n >>> 1;
     n |= n >>> 2;
     n |= n >>> 4;
     n |= n >>> 8;
     n |= n >>> 16;
     return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
 }

构造方法分析

这里就拿一个做分析

public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    //计算容量,如果传入的初始容量大于最大容量右移1位(正数右移1位相当于除2),就用最大容量
    //否则就通过tableSizeFor计算出大于等于当前数的最小2次方
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
   /**
     * sizeCtl > 0
     * 当目前table未初始化时,sizeCtl表示初始化容量
     */
    this.sizeCtl = cap;
}

在构造中可以发现并没有散列表的初始化,也就是说散列表是懒初始化的。

put方法流程

public V put(K key, V value) {
    return putVal(key, value, false);
}

当我们在程序中调用put方法时,内部会去调用putVal方法。整个putVal方法特别长,并且涉及到初始化、树化,扩容等流程。

final V putVal(K key, V value, boolean onlyIfAbsent) {
        //key和value不能为null
        if (key == null || value == null) throw new NullPointerException();
        //扰动函数计算hash值
        int hash = spread(key.hashCode());
        //表示当前k-v封装成node插入指定桶位后,在桶位中的所属链表的下标位置
        //0表示当前桶位为null,node可以直接放入
        //2表示当前桶位可能是红黑树
        int binCount = 0;
        //自旋 tab引用map.table
        for (Node<K,V>[] tab = table;;) {
            //f表示桶位的头节点
            Node<K,V> f;
            //n表示散列表数组的长度
            //i表示key寻址计算后,得到的桶位下标
            //fh表示桶位头节点的hash值
            int n, i, fh;
            //表示当前table还未初始化或初始化完成后还未插入值
            if (tab == null || (n = tab.length) == 0)
                //初始化完成后得到最新的map.table的引用
                tab = initTable();
            //获取指定桶位的头节点,赋值给f
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //说明头节点为null,直接用cas进行插入
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    //插入成功退出循环
                    break;                   // no lock when adding to empty bin
                //cas失败,表示在当前线程之前,有其他线程先在头节点处插入成功
                //再次自旋,走其他逻辑
            }
            //表示当前桶位的头节点是否为FWD节点,map是否处于扩容过程中
            else if ((fh = f.hash) == MOVED)
                //map处于扩容过程中,当前线程需要帮助扩容,完成数据迁移的工作
                tab = helpTransfer(tab, f);
            //当前桶位可能是链表,也可能是红黑树代理节点TreeBin
            else {
                //当插入的key存在时,会将旧值赋给oldVal,返回给方法调用处
                V oldVal = null;
                //给头节点加锁(锁头节点)
                synchronized (f) {
                    //为了避免其他线程在加锁前修改头节点,所以再次判断
                    if (tabAt(tab, i) == f) {
                        //说明加锁没有问题
                        //根据头节点的hash值判断当前桶位的情况
                        if (fh >= 0) {
                            //说明当前桶位是普通链表
                            //binCount
                            //1.当前插入key与链表当中所有元素的key都不一致时,当前的插入操作是追加
                            //到链表的末尾,binCount表示链表长度
                            //2.当前插入key与链表当中的某个元素的key一致时,当前插入操作可能就是替
                            //换了。binCount表示冲突位置(binCount-1)
                            binCount = 1;
                            //循环当前桶位的链表,e是每次循环处理的节点
                            //注意此处binCount进行累加操作,下面会用到
                            for (Node<K,V> e = f;; ++binCount) {
                                //当前循环节点的key
                                K ek;
                                //插入元素与当前元素hash一致,key也相同,说明发生冲突,插入操作变为替换
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    //将当前元素的val赋值给oldVal
                                    oldVal = e.val;
                                    //通过put方法调用进来,onlyIfAbsent始终为false
                                    if (!onlyIfAbsent)
                                        //将插入的value赋值给当前元素
                                        e.val = value;
                                    break;
                                }
                                //当前元素与插入元素的key不一致
                                //pred记录当前元素,在下一轮循环是表示前一个元素
                                Node<K,V> pred = e;
                                //循环到链表末尾了
                                if ((e = e.next) == null) {
                                    //在链表末尾插入元素(尾插法)
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //前置条件,该桶位不是链表
                        else if (f instanceof TreeBin) {
                            //表示当前桶位是红黑树代理节点TreeBin
                            //p表示红黑树中如果与你插入的key有冲突,该方法会返回冲突节点的引用
                            Node<K,V> p;
                            //强制设置binCount为2
                            binCount = 2;
                            //说明当前插入节点的key与树中的某个节点的key冲突
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                //将冲突节点的值赋给oldVal
                                oldVal = p.val;
                                //通过put方法调用进来,onlyIfAbsent始终为false
                                if (!onlyIfAbsent)
                                    //将插入的value赋值给当前元素
                                    p.val = value;
                            }
                        }
                    }
                }
                //说明当前桶位是链表或红黑树
                if (binCount != 0) {
                    //如果>=8,一定是链表,因为上面在循环链表示进行了累加
                    if (binCount >= TREEIFY_THRESHOLD)
                        //进行树化 (为什么没有判断散列表大小为64?这个方法里会找到答案)
                        treeifyBin(tab, i);
                    //说明插入元素有冲突,返回原数据
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //1.统计当前table一共有多少数据
        //2.判断是否达到扩容阈值标准,触发扩容
        addCount(1L, binCount);
        return null;
    }

putVal()大概的过程:

1.扰动函数计算key的hash值

2.进入自旋

3.判断散列表是否初始化,还未初始化先初始化

4.初始化完成后进行判断:

? ①当前桶位头节点为null,用cas插入,成功退出自旋;失败继续自旋

? ②根据桶位头节点的hash计算是否为FWD节点,是则帮助扩容

? ③当前桶位为链表或红黑树,则给头节点加锁

? 1)链表:循环链表,判断是添加还是替换

? 2)红黑树:调用红黑树代理节点TreeBin的putTreeVal方法,判断是添加还是替换

? 添加完成后判断是否达到树化阈值,并判断如果是替换,直接返回原数据

5.统计当前table是否达到扩容阈值,触发扩容

接下来去看putVal中调用到的方法

initTable初始化散列表

   /**
     * sizeCtl<0
     * 1.-1 表示当前table正在初始化(有线程正在创建table数组,当前线程需要自旋等待)
     * 2.表示当前table数组正在进行扩容,高16位表示扩容标识戳,低16位(1+n)表示有n个线程在参与并发扩容
     * sizeCtl=0,表示创建table数组时,使用DEFAULT_CAPACITY
     * sizeCtl>0
     * 1.table未初始化,表示初始化大小
     * 2.table已经初始化,表示下次扩容的阈值
     */
    private final Node<K,V>[] initTable() {
        //tab 引用map.table
        Node<K,V>[] tab;
        //sizeCtl的临时值
        int sc;
        //自旋 table是否初始化或初始化后还未插入值
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                //理论上来说这时sizeCtl为-1,表示其他线程正在创建table,当前线程让出cpu资源
                Thread.yield(); // lost initialization race; just spin
            //cas判断是否争抢到初始化的权利
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    //再次判断,防止其他线程已经初始化完毕,当前线程再次初始化覆盖原table
                    if ((tab = table) == null || tab.length == 0) {
                        //sc大于0,创建table使用sc为指定大小,否则默认16
                        //sizeCtl是在构造时进行计算赋值的
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        //赋值给map.table
                        table = tab = nt;
                        //计算下一次扩容阈值
                        //n>>>2 => 等于1/4n   n-(1/4)n=3/4n => 0.75*n
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //1.如果当前线程是第一次创建map.table的线程,sc表示下一次扩容阈值
                    //2.表示当前线程不是第一次创建table的线程,在进入else if时,将
                    //sizeCtl设置为-1,那么这时需要进行还原
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

treeifyBin树化过程

	private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            //条件成立:说明当前table数组长度未达到64,此时不进行树化操作,进行扩容操作
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1);
            //条件成立,说明当前桶位有数据,且是普通node
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                //锁头节点
                synchronized (b) {
                    //双重检查
                    if (tabAt(tab, index) == b) {
                        //hd 头节点
                        //tl 尾节点
                        TreeNode<K,V> hd = null, tl = null;
                        //循环链表创建TreeNode
                        //建树过程比较简单,自行分析一下
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        //创建完后将头节点包装成TreeBin放在该桶位上
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }

	private final void tryPresize(int size) {
        //c表示扩容大小
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        //sizeCtl的临时变量
        int sc;
        //>0 table未初始化时,表示初始化大小 
        //table已经初始化,表示下次扩容时的触发阈值
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            //初始化
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            //达到最大容量时不扩容
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            //扩容,因为逻辑相同,具体请看addCount()(下面开始分析)
            else if (tab == table) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

addCount添加元素后计算容量

CounterCell计算容量的思想和LondAdder相同,不了解的可以自行去查看一下。

private final void addCount(long x, int check) {
        //as表示LongAdder.cells
        CounterCell[] as;
        //b表示LongAdder.base
        //s表示当前map.table中元素的数量
        long b, s;
        //条件一:(as = counterCells) != null
        //true 表示cells已经初始化了,当前线程应该去使用hash寻址找到合适的cell去累加数据
        //false 表示当前线程应该将数据累加到base
        //条件二:!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
        //true 表示写base失败,有线程竞争,当前线程应该去尝试创建cells
        //false 表示写base成功
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            //进入此处前提:
            //1.cells已经初始化,当前线程需要找到合适的cell累加数据
            //2.写base失败,与其他线程在base上发生竞争,当前线程应该尝试创建cells
            //a 表示当前线程hash寻址命中的cell
            CounterCell a;
            //v 表示当前线程写cell时的期望值
            long v;
            //m 表示cells数组的长度
            int m;
            //是否发生竞争
            boolean uncontended = true;
            //条件一:as == null || (m = as.length - 1) < 0
            //true 表示当前线程时通过写base失败进入的,调用fullAddCount去扩容或重试  LongAdder.longAccumulate
            //条件二:(a = as[ThreadLocalRandom.getProbe() & m]) == null 前置条件:cells已经初始化
            //true 表示当前线程命中的cell是null,需要调用fullAddCount进行初始化cell,放入当前位置
            //条件三:!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
            //true 表示当前线程使用cas更新当前命中的cell失败,需要进入fullAddCount进行重试或者扩容cells
            //false 表示当前线程cas更新成功
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                //涉及LongAdder思想,自行查看
                fullAddCount(x, uncontended);
                //fullAddCount里面逻辑比较复杂,就让当前线程不参与到扩容相关的逻辑了,直接返回到调用点
                return;
            }
            if (check <= 1)
                return;
            //获取当前散列表元素个数,这是一个期望值
            s = sumCount();
        }
        //表示一定是一个put操作调用的addCount
        if (check >= 0) {
            //tab 引用map.table
            //nt 表示map.nextTable
            Node<K,V>[] tab, nt;
            //n 表示map.table数组的长度
            //sc 表示SizeCtl的临时值
            int n, sc;
            //自旋
            //条件一:s >= (long)(sc = sizeCtl)
            //true 1.sizeCtl为负数,表示正在扩容 2.sizeCtl为正数,表示扩容阈值
            //false 还未达到扩容条件
            //条件二:(tab = table) != null
            //条件三:(n = tab.length) < MAXIMUM_CAPACITY 小于最大值可以进行扩容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                
                //以下开始同terrifyBin中未解释的部分!!!
                //扩容唯一标识戳
                int rs = resizeStamp(n);
                //条件成立,表示当前table正在扩容,当前线程理论上应该帮助table完成扩容
                if (sc < 0) {
                    //条件一:(sc >>> RESIZE_STAMP_SHIFT) != rs
                    //true 说明当前线程获取到的扩容唯一标识戳不是本次扩容
                    //条件二:sc == rs + 1 此处有bug
                    //真正想表达的是 sc==(rs<<16)+1 低16位才表示扩容的线程数
                    //线程数为低16位-1 !!! 因为底下触发扩容是+2
                    //true 表示扩容完毕,当前线程不需要参与进来
                    //false 扩容还在进行,当前线程可以参与
                    //条件三:sc == rs + MAX_RESIZERS 此处有bug,同上 sc==(rs<<16)+MAX_RESIZERS
                    //true 表示当前参与并发扩容的线程达到了最大值
                    //false 表示当前线程可以参与扩容
                    //条件四:(nt = nextTable) == null
                    //true 表示本次扩容结束
                    //false 扩容正在进行
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    //前置条件,table正在扩容,当前线程有机会参与扩容
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        //条件成立:说明当前线程成功参与扩容,并将sc的低16位+1,表示多一个线程参与扩容
                        //协助扩容线程,持有nextTable
                        transfer(tab, nt);
                    //条件失败:
                    //1.当前有其他线程也在尝试修改sizeCtl,其他线程修改成功,当前线程竞争失败
                    //2.transfer任务内部的线程也修改了sizeCtl
                }
                //(rs << RESIZE_STAMP_SHIFT) + 2
                //1000 0000 0001 1011 0000 0000 0000 0000 +2 =>1000 0000 0001 1011 0000 0000 0000 0010
                //触发扩容+2!!!
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    //说明当前线程是触发扩容的第一个线程,在transfer方法里需要做一些准备工作
                    //触发扩容条件的线程不持有nextTable
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

具体的扩容流程后面单独挑出来分析,以上就是put的流程,源码还是非常多,并且也很复杂,涉及到的字段和计算都要理清楚。

接下来看看get的流程。

get方法流程

public V get(Object key) {
        Node<K,V>[] tab;//引用map.table
        Node<K,V> e, p;//e:当前元素  p:目标节点
        int n, eh;//n:table数组长度  eh:当前元素hash
        K ek;//当前元素key
        //扰动函数运算后的hash值
        int h = spread(key.hashCode());
        //条件一:(tab = table) != null
        //true:已经put过数据,因为map是懒初始化   false:还未初始化
        //条件二:(n = tab.length)>0  table可能在初始化后没来的及添加数据,其他线程就执行到这
        //条件三:(e = tabAt(tab, (n - 1) & h)) != null)
        //true:当前桶位有值   false:当前桶位为null,直接返回null
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //当前桶位有数据,才会进入这
            //当前元素是否与查询的key的hash一致
            if ((eh = e.hash) == h) {
                //键是否相同
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    //匹配成功返回数据
                    return e.val;
            }
            //1. eh=-1  fwd节点,说明table正在扩容,且当前查询的桶位数据已经被迁移走
            //2. eh=-2 TreeBin节点,需要使用TreeBin的find方法查询
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //当前桶位已成链表
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

	//这里只分析一下FWD节点的find(),
	//顺便提一下,如果是TreeBin的find(),会在红黑树被加锁时,线性的搜索TreeBin中持有的链表
	#ForwardingNode
	Node<K,V> find(int h, Object k) {
            // loop to avoid arbitrarily deep recursion on forwarding nodes
            outer: for (Node<K,V>[] tab = nextTable;;) {
                //e表示在扩容而创建新表使用寻址算法得到的桶位头节点
                Node<K,V> e;
                //n表示为扩容而创建的新表的长度
                int n;
                //(e = tabAt(tab, (n - 1) & h)) == null
                //在扩容表中,重新定位hash对应的头节点
                //true 1.在oldTable中,对应桶位在迁移前就是null  2.扩容完成后,有其他线程将此桶设置为null
                if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                    return null;
                //此时对应桶位一定不为null,e为桶位头节点
                //e可能为哪些类型
                //1.node  2.TreeBin  3.FWD
                for (;;) {
                    //eh 当前处理节点的hash
                    int eh;
                    //ek 当前处理节点的key
                    K ek;
                    //比较key
                    if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        //相同则返回
                        return e;
                    //1.TreeBin  2.FWD(新表在并发很大的情况下,可能在此方法再次拿到FWD类型)
                    if (eh < 0) {
                        //FWD
                        if (e instanceof ForwardingNode) {
                            tab = ((ForwardingNode<K,V>)e).nextTable;
                            continue outer;
                        }
                        //TreeBin
                        else
                            //使用TreeBin查找红黑树节点
                            return e.find(h, k);
                    }
                    //链表,向后查找
                    if ((e = e.next) == null)
                        //没有找到返回null
                        return null;
                }
            }
        }

相对于put来说,get的代码就显得简单多了,可以发现在读取数据的时候并未进行加锁,只是通过桶位节点的类型,去使用不同的方法查找对应的数据,所有get方法的性能还是比较高的。

接下来继续分析remove和replace流程

remove&replace流程

因为remove和replace都是调用的replaceNode方法,所以这里拿出来一起分析

	public V remove(Object key) {
        return replaceNode(key, null, null);
    }

	public V replace(K key, V value) {
        if (key == null || value == null)
            throw new NullPointerException();
        return replaceNode(key, value, null);
    }

	final V replaceNode(Object key, V value, Object cv) {
        //计算key的hash
        int hash = spread(key.hashCode());
        //自旋
        for (Node<K,V>[] tab = table;;) {
            //f表示桶位头节点
            Node<K,V> f;
            //n表示当前table数组长度
            //i表示hash命中桶位下标
            //fh表示桶位头节点hash
            int n, i, fh;
            //(f = tabAt(tab, i = (n - 1) & hash)) == null  当前桶位为null
            if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
                //退出
                break;
            //(fh = f.hash) == MOVED
            //当前table正在扩容
            else if ((fh = f.hash) == MOVED)
                //帮助扩容
                tab = helpTransfer(tab, f);
            //当前桶位可能是链表或红黑树
            else {
                //保留替换前的引用
                V oldVal = null;
                //校验标记
                boolean validated = false;
                //锁头节点
                synchronized (f) {
                    //再次判断
                    if (tabAt(tab, i) == f) {
                        //成立:说明桶位是链表或者单个node
                        if (fh >= 0) {
                            //如果加锁时头节点被替换,就不会执行到此处,
                            //validated就会为false,在最后面会用到
                            validated = true;
                            //e表示当前循环处理元素
                            //pred表示循环节点的前一个节点
                            for (Node<K,V> e = f, pred = null;;) {
                                //当前节点key
                                K ek;
                                //比较key是否相同
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    //当前节点的value
                                    V ev = e.val;
                                    //如果cv为null,就是删除操作
                                    //cv == ev || (ev != null && cv.equals(ev)) 就是替换操作
                                    if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                        //保存旧值
                                        oldVal = ev;
                                        //成立:替换操作
                                        if (value != null)
                                            e.val = value;
                                        //成立:说明当前节点不是头节点
                                        else if (pred != null)
                                            //当前节点的前一个节点,指向当前节点的下一个节点
                                            pred.next = e.next;
                                        else
                                            //说明当前节点为头节点,将桶位头节点设置为下一个节点
                                            setTabAt(tab, i, e.next);
                                    }
                                    break;
                                }
                                //向后遍历
                                pred = e;
                                if ((e = e.next) == null)
                                    break;
                            }
                        }
                        //红黑树代理节点
                        else if (f instanceof TreeBin) {
                            validated = true;
                            //转型TreeBin
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            //r表示红黑树根节点
                            //p表示红黑树中查找到对应key 一致的node
                            TreeNode<K,V> r, p;
                            //条件一:(r = t.root) != null 根节点不为null
                            //条件二:(p = r.findTreeNode(hash, key, null)) != null
                            //findTreeNode() 以当前节点为入口,向下查找key(包括节点本身)
                            //true 说明查找到相应key对应的node节点,赋值给p
                            if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                                //保存旧值
                                V pv = p.val;
                                //cv == null 删除
                                //cv == pv || (pv != null && cv.equals(pv)) 替换
                                if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                    oldVal = pv;
                                    //替换
                                    if (value != null)
                                        p.val = value;
                                    //删除
                                    else if (t.removeTreeNode(p))
                                        setTabAt(tab, i, untreeify(t.first));
                                }
                            }
                        }
                    }
                }
                //当其他线程修改过桶位头节点时,当前线程sync头节点锁错对象时,validated为false,会进入下次自旋
                if (validated) {
                    if (oldVal != null) {
                        //替换值为null,说明当前是一次删除操作
                        if (value == null)
                            //更新元素个数
                            addCount(-1L, -1);
                        return oldVal;
                    }
                    break;
                }
            }
        }
        return null;
    }

replace的流程相比于put来说不是特别复杂

1.扰动函数计算key的hash值

2.自旋:

? ①桶位为null,退出

? ②正在扩容,帮助扩容

? ③锁头节点,判断是链表或红黑树,分别执行相应的查找,替换或删除逻辑,如果是删除,还需更新元素个数

3.返回被替换的元素值或null

接下来就是最重要的扩容逻辑了。

transfer并发扩容流程

在很多方法进入扩容逻辑前,会通过helpTransfer()进入到扩容逻辑,在这里先分析一下这个方法。

	final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        //引用map.nextTable
        Node<K,V>[] nextTab;
        //sizeCtl的临时变量
        int sc;
        //条件一:tab != null
        //条件二:f instanceof ForwardingNode
        //条件三:(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null
        //以上三个条件从putVal方法调用处来看,是恒成立
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            //拿到当前扩容标识戳
            int rs = resizeStamp(tab.length);
            //条件一:nextTab == nextTable
            //true 表示当前正在扩容
            //false 1.nextTable被设置为null,说明扩容完成  2.再次触发扩容,nextTab已经过期
            //条件二:table == tab
            //true 表示扩容正在进行,还未完成
            //false 表示扩容已经结束,扩容结束后,最后退出的线程会将table赋值为nextTable
            //条件三:(sc = sizeCtl) < 0
            //true 表示正在扩容,不清楚的请回去看sizeCtl的含义
            //false sizeCtl>0 表示下次扩容的阈值,说明当前扩容已经结束·
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                //扩容,具体请看addCount,同样代码不注释
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    //真正的扩容逻辑
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

	//这里开始就是最核心的部分了,解释了为什么ConcurrentHashMap可以并发扩容
	private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        //n 表示扩容前的table数组大小
        //stride 表示分配给线程任务的步长,假定为16
        int n = tab.length, stride;
        //计算步长
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        //成立:表示当前线程为触发扩容线程,需要做一些准备工作
        //不成立:表示当前线程为帮助扩容的线程
        if (nextTab == null) {            // initiating
            try {
                //创建比之前大一倍的table
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            //赋值给map.nextTable,方便扩容线程拿到新表引用
            nextTable = nextTab;
            //记录迁移数据整体位置的一个标记。index从1开始计算
            transferIndex = n;
        }
        //表示新数组的长度
        int nextn = nextTab.length;
        //FWD节点,当某个桶位数据处理完毕后,将此桶位设置为FWD节点,其他线程看到后会有不同的逻辑处理
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        //推进标记
        boolean advance = true;
        //完成标记
        boolean finishing = false; // to ensure sweep before committing nextTab
        //i 表示分配给当前线程任务,执行到的桶位
        //bound 表示分配给当前线程任务的下界限制
        //自旋
        for (int i = 0, bound = 0;;) {
            //f 桶位的头节点
            Node<K,V> f;
            //fh 头节点的hash
            int fh;
            /**
             * 1.给当前线程分配任务区间
             * 2.维护当前线程任务进度(i表示当前处理的桶位)
             * 3.维护map对象全局范围内的进度
             */
            while (advance) {
                //分配任务的开始/结束下标
                int nextIndex, nextBound;
                //成立:表示当前线程的任务尚未完成,还有相应的区间的桶位要处理,--i 就让当前线程处理下一个桶位
                //不成立:说明当前线程任务已完成或未分配
                if (--i >= bound || finishing)
                    advance = false;
                //成立:表示当前对象全局范围内的桶位都分配完毕,没有区间可分配,设置当前线程的i为-1,跳出循环后执行退出迁移任务
                //不成立:表示对象全局范围内的桶位尚未分配完成,还有区间可分配
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //cas给当前线程分配任务
                //nextIndex > stride ? nextIndex - stride : 0 判断剩余部分是否大于步长,不大于就直接到末尾0
                //从这可以看出迁移时是从table尾到头去迁移数据
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    //设置处理边界
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //成立:表示当前线程未分配到任务
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                //判断是否完成扩容迁移
                if (finishing) {
                    //执行迁移完毕退出逻辑
                    nextTable = null;
                    table = nextTab;
                    //将sizeCtl更新为下一次扩容阈值
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //设置sizeCtl低16位 -1 ,成功说明当前线程可以退出
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    //条件成立:说明当前线程不是最后一个退出该方法的线程
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        //正常退出
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            //成立:说明当前桶位无数据,直接将此处设置为fwd节点
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //成立:说明当前桶位已经迁移过,当前线程不用再处理,再次更新当前线程任务索引
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            //前置条件:当前桶位有数据,且node不是fwd节点,说明这些数据需要迁移
            else {
                //锁头节点
                synchronized (f) {
                    //再次判断头节点是否被修改过
                    if (tabAt(tab, i) == f) {
                        //以下逻辑是将原链表拆分为两段
                        /**
                         * 假设当前table数组长度为16
                         * 当前链表为数组下标为9的链表,分别有两个节点的hash值为01001和11001
                         * 01001 & 01111 =01001 = 9
                         * 11001 & 01111 =01001 = 9
                         * 这时低位为1001的key都会存到9号桶
                         * 扩容后数组长度为32
                         * 对9号桶进行拆分
                         * 01001 & 11111 =01001 = 9
                         * 11001 & 11111 =11001 = 25
                         * 这时高位为0的节点还是在9号桶
                         * 而高位为1的节点在25号桶
                         * 这样就将一条链表拆分为低位链和高位链
                         */
                        //ln 表示低位链表引用
                        //hn 表示高位链表引用
                        Node<K,V> ln, hn;
                        //成立:说明当前桶位是链表桶位
                        if (fh >= 0) {
                            //用来配合lastRun,判断是高位链还是低位链
                            int runBit = fh & n;
                            //lastRun机制
                            //假设链表元素hash高位是这样的结构:1->0->1->0->0->0
                            //lastRun可以省去单个处理,直接将后面3个为0的元素看成一个节点处理
                            //获取出当前链表末尾连续高位不变的node,这样就不用一个一个去处理
                            Node<K,V> lastRun = f;
                            //循环链表,lastRun
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                //当前元素和lastRun的高位不一致时,进行更新
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            //lastRun低位链处理
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            //lastRun高位链处理
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            //循环处理剩余部分,终止条件是循环到lastRun
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                //低位链处理
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                //高位链处理
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //假设对9号桶拆分
                            //i就为9号桶,对应低位链
                            setTabAt(nextTab, i, ln);
                            //i+n为25号桶,对应高位链
                            setTabAt(nextTab, i + n, hn);
                            //处理完成,将当前桶设置为fwd节点
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        //成立:红黑树代理节点TreeBin
                        else if (f instanceof TreeBin) {
                            //转换头节点为TreeBin引用t
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            //低位双向链表lo指向低位链表的头 loTail指向低位链表尾
                            TreeNode<K,V> lo = null, loTail = null;
                            //高位双向链表hi指向高位链表的头 hiTail指向高位链表尾
                            TreeNode<K,V> hi = null, hiTail = null;
                            //lc 低位链的长度
                            //hc 高位链的长度
                            int lc = 0, hc = 0;
                            //循环TreeBin中的双向链表,从头至尾
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                //h表示循环处理的当前元素的hash
                                int h = e.hash;
                                //使用当前节点构造出来的新的TreeNode
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                //成立:表示当前循环节点属于低位链的节点
                                if ((h & n) == 0) {
                                    //成立:说明当前低位链表还没有数据
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    //成立:说明低位链表已经有数据了,此时当前元素追加到低位链表的末尾
                                    else
                                        loTail.next = p;
                                    //将低位链表尾指向p
                                    loTail = p;
                                    //长度++
                                    ++lc;
                                }
                                //高位链
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //判断是否达到链化阈值
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            //同上
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

扩容的流程还是十分复杂的,总的来说就是第一次触发扩容的线程,需要多负责创建新表的任务,然后分配迁移区间,从旧表的尾部开始迁移桶位数据到新表,如果桶位是链表或红黑树,还需将该桶位的数据拆分成两块分别放在新表的两个桶位上。一个桶位迁移完成后,需要在旧表该桶位上放上FWD类型的节点,表示该桶位已经迁移完成,如果有线程并发操作该桶位时,需要到新表上去进行对应操作。循环上面操作后,最后一个退出扩容的线程,需要进行收尾工作,需要设置下一次扩容阈值,并且更新全局范围内的table和nextTable的引用。

以上就是ConcurrentHashMap中比较重要的几个方法的注释分析了。当然,还有几个比较重要的,比如TreeBin和ForwardingNode中有一些方法也比较重要,但由于能力有限,以后理解透了会在本文中追加出来,就比如TreeBin可以保证红黑树进行增删改操作,锁住红黑树的时候,可以不阻塞读线程的执行,让读线程线性搜索TreeBin持有的链表完成读操作。这些小的细节点都是ConcurrentHashMap在保证并发安全的情况下,想尽办法提升性能,相信大家看完并理解源码后,一定能体会到这种感觉。

  数据结构与算法 最新文章
【力扣106】 从中序与后续遍历序列构造二叉
leetcode 322 零钱兑换
哈希的应用:海量数据处理
动态规划|最短Hamilton路径
华为机试_HJ41 称砝码【中等】【menset】【
【C与数据结构】——寒假提高每日练习Day1
基础算法——堆排序
2023王道数据结构线性表--单链表课后习题部
LeetCode 之 反转链表的一部分
【题解】lintcode必刷50题<有效的括号序列
上一篇文章      下一篇文章      查看所有文章
加:2021-12-04 13:41:41  更:2021-12-04 13:43:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/26 14:22:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码