????????ConcurrentHashMap和HashMap的put值过程有些类似,ConcurrentHashMap的结构也是table + 链表+ 红黑树;在put值时,锁粒度是table的元素;也就是说,当put值时定位到table的第 i 个元素,那么就会给table[i]上锁;其他线程在put值时也定位到 i 时就需要等待获取锁;如果是其他位置则不需要等待锁,可以进行put操作。
????????下面具体分析ConcurrentHashMap的put值过程
1.计算hash值
????????ConcurrentHashMap在put值的时候会计算key的hash值,和HashMap类似;在ConcurrentHashMap计算hash值的是spread方法
public V put(K key, V value) {
return putVal(key, value, false);
}
||
||
\/
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
..
..
..
}
||
||
\/
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
HASH_BITS = 0x7fffffff;(最大整数)
- 获取key的hashcode
- 利用hashcode的高16位与低16位做" ^ "位运算,让高位也参与位运算,增加离散型。
- 再与HASH_BITS位运算,这是int最大正整数:最高位为0,其余位置为1,用来与hash值做 " & "运算;得到的结果保证最高位为0,其余位置不变;这样就保证得到的hash值是一个非负数;
举个例子:
2.put值
ConcurrentHashMap的数据结构与HashMap差不多,都是Node数组+红黑树+链表; 区别:
- 在扩容时ConcurrentHashMap会有一个特殊的标志对象:ForwardingNode;
- 在生成红黑树时,会生成一个TreeBin对象放在table中;
- 在put值时,会根据table中node的hash判断节点的类型;
- hash>=0,是Node节点;
- hash=-2是TreeBin,表示该table元素是红黑树的节点;
- hash=-1是ForwardingNode;容器正在扩容
在put值时有几个判断,与HashMap类似
- 判断table是否已经初始化了;如果是没有初始化,就会先初始化table,再往table中添加值;
- 根据hash值,计算出key-value在table的位置i,判断table[i]是否已经插入了值;如果没有插入值,就将该key-value插入到table[i]中;
- 判断table[i]的hash值是否是MOVE,如果是MOVE表示正在扩容(ForwardingNode的hash值就是MOVE),需要该线程帮忙将旧容器的值移动到新容器中
- 将key-value插入到链表或者红黑树中
- 判断是否需要扩容
2.1初始化table
????????因为是多线程环境,就需要考虑到如果有多个线程同时初始化table的情况;假如现在有三个线程:A,B,C同时判断到table == null。这个时候三个线程都会同时试图来初始化table,如果一个线程抢先修改了sizeCtl,他就可以初始化table,其余线程只需要等待初始化完成即可;
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
..
..
..
||
||
\/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield();
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
- 1.sizeCtl判断是否已经有线程初始化table,如果sizeCtl=-1,表示已经有线程对table初始化;这个时候会调用yield()让出cpu执行权
- 2.通过CAS修改sizeCtl=-1,初始化table
????????这段代码,有意思的是进入到初始化table的分支时try{}finally{}代码块还要判断table==null,为什么呢? ????????可能是对应这样一个场景:现在有 A ,B,C三个线程都判断table=null进入到initTable,A,B线程先获取到sizeCtl =0,此时A抢先修改到sizeCtl =-1开始初始化,而B线程修改失败,再次通过循环获取sizeCtl =-1,调用yield()放弃cpu执行权,等待A线程初始化table;A线程初始化完成之后修改sizeCtl 的值,修改后的sizeCtl >0;
????????而C线程比A,B线程慢一点,当C线程获取到sizeCtl的值时,A线程已经完成了table的初始化sizeCtl >0,C线程获取到的sc=sizeCtl >0,因此不会进入到休眠状态,会尝试修改sizeCtl 的值,这个时候没有其他线程竞争修改值,因此会修改成功;又会将sizeCtl 的值修改为 -1 ,【此时:sc=A线程初始化之后sizeCtl 的值;sizeCtl =-1】
????????进入到try代码块,判断table!=null,不会再重新初始化table,进入finally块,sizeCtl =sc;将sizeCtl 的值还原到A线程初始化时候的值;
????????添加的table=null的判断保证了只会有一个线程能初始化table;
流程:
2.2 关于CAS
上面在多线程中竞争修改sizeCtrl的值没有使用synchronized,而是使用了Unsafe类的compareAndSwapInt也就是CAS;
U.compareAndSwapInt(object,offset,value,update);
||
||
||
\/
U.compareAndSwapInt(this, SIZECTL, sc, -1);
看到这四个参数或许会感到有些蒙;这涉及到Java的实例对象在内存是如何存储的; Java对象有三部分:
- 头信息(hash,gc年龄,lock标志位,指向class对象的指针)
- 实例数据,也就是成员变量
- 填充对齐;(这部分不是必须的;如果:头信息+实例数据的字节数不是8的倍数,这个时候会填充字节让其满足8的倍数;)
下面用一个简单的图演示下:
????????在这个例子中,对象头+三个属性字段的字节数:24字节;刚好8的倍数,因此不用填充数据;从对象的内存布局,我们可以根据对象和对象属性的偏移量offset确定该对象的属性值存储位置; ????????上面说了对象的布局,再来看上面的方法:U.compareAndSwapInt(object,offset,value,update); 就很容易理解了; 举个例子:
U.compareAndSwapInt(user,12,34,48);
对象user+offset 定位到user对象的id字段;
value = 34,update =48;
通过上面的定位,取到user对象的id值与value(34)比较;
如果相等,就将user对象的id字段的值改为update(48);不相等,就不修改;
回到源码:
U.compareAndSwapInt(this, SIZECTL, sc, -1);
this:是ConcurrentHashMap对象
SIZECTL:SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"));===>获取到变量:sizeCtl的偏移量offset
上面的代码:sc与sizeCtl比较,如果相等就将sizeCtl的值修改为-1;
2.3思考
????????在try代码块中判断table是否已经初始化保证了只有一个线程能初始化table,这个和DCL(双重检测枷锁,差不多);这段代码能修改吗?:
if ((sc = sizeCtl) < 0)
Thread.yield();
||
||
||
\/
if ((sc = sizeCtl) != 0)
Thread.yield();
????????因为只要sizeCtl != 0,就说明已经有线程在初始化table,其余线程都可以等待带线程初始化完成,这样当线程获初始化table的时就不用再判断table是否为null了,因为只能有一个线程能进入这个分支,对table进行初始化; ????????这样看起来似乎没什么问题,但是忽略了一个问题,在ConcurrentHashMap的构造对象中有一个构造方法可以指定初始容量,而保存初始容量的变量就是sizeCtl ,也就是说如果指定了初始容量sizeCtl 值就 大于0;而指定初始容量时并不会初始化一个数组;因此不能修改成:(sc = sizeCtl) != 0,如果修改成这样,那么在指定初始容量之后,所有线程都不能初始化table了; 同时也可以看到在初始化时利用了sizeCtl的值:
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
3.通过hash值定位key-value位置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
这个分支其实就是判断table[i] = null,就将key-value值包装程成Node对象,放到table[i]中即可;
- i = (n - 1) & hash;与HashMap一样,定位存储下标,因为使用了 & 运算,因此要求table的容量n一定是2的幂次倍;
- casTabAt()将hash-key-value包装成node添加到table中,添加成功:返回true,进入分支break,结束put操作;不成功:进入下一次循环,继续put操作直到成功为止;
????????上面的casTabAt也使用到了Unsafe类的CAS方法,不再分析;简单说下定位 i 的计算;这个和hashMap中定位i的计算一样;正是因为使用了 & 这种位运算导致n的大小必须是2的幂次倍;否则会浪费table的存储空间,也会造成链表的长度快速增长;
????????2n - 1 的数字二进制数有一个特点:从最高位为1的位置 -> 最低位的每个位置都是1;这样与hash值做 & 运算时的结果取值范围[0,2n -1]的每一个数都能取到;反之,如果table的长度不是 2n 那么2n - 1从最高位到最低位之间肯定有些位置的值为:0;导致(len-1) & hash的结果取值只会是[0,2n -1]之间的部分数字,这样会使table的下标中有一些下标永远也获取不到,这就造成了这些位置的浪费;
table长度为: 2n table长度不是: 2n
从上面举得2个例子就可以看出,table的长度是必须为2的次幂倍;否则会浪费掉大量的存储空间,同时可能会造成put,get效率降低;
4.判断ConcurrentHashMap是否正在扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
????????在上一个分支,如果table[i] != null,就会在接下来的分支中,首先通过node节点的hash值判断,table是否处于扩容状态,如果是扩容状态: hash == MOVE; 当判定在扩容时,会要求这个线程帮忙完成扩容:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
????????扩容暂时在这里不分析,在后面会讲到;因为扩容是添加node导致,在添加元素之后会判断是否需要扩容;在前面提到过,在扩容时会产生:ForwardingNode;它的hash值就是MOVE(-1);由于扩容实在太复杂,并且扩容的原因并不在这里;扩容触发条件是添加node之后size到达扩容阈值触发扩容;因此把扩容的部分放在最后;
5. 判断节点的类型,并添加节点;
判断节点类型,并添加值:
- 如果是链表,就将值打包成Node,添加到链表中;
- 如果是红黑树,就将值打包成TreeNode添加到红黑树中;
????????这个过程和 HashMap一样;唯一的区别是,在多线程环境下,在确定table的下标之后,会获取table[i]对象的锁,只能有一个线程在table[i]所在的链表或者红黑树put值;其他线程要在table[i]中put值需要等待获取锁;
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
为了更好的看清上面代码的整体逻辑,去掉部分代码,留下一个大的框架:
synchronized (f) {
if (tabAt(tab, i) == f) {
if(fh>=0){
binCount=1;
链表,会将新node添加到链表末尾,在这个过程中binCount会记录链表的长度,用来判断是否需要将链表修改为红黑树;
还有一种情况是这个key已经存在,就直接更新value;
}else if(f instanceof TreeBin){
binCount=2;
红黑树,这里的binCount就是只用来表示该线程抢到锁,已经put值了;
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
流程:
链表转换成红黑树
?????????上面流程图中判断:f是TreeBin对象时表示红黑树节点;但红黑树的节点是由TreeNode保存;那么在链表转换成红黑树的转换过程是什么? ?????????在上述流程图中,可以看到链表转换成红黑树是由treeifyBin方法中进行的;treeifyBin的流程
- 判断table长度;如果len < 64,就将容器扩大2倍;
- len>=64;先将Node -> TreeNode;再生成一个TreeBin对象放在table中;TreeBin的hash=-2;TreeBin的构造方法中将:链表 -> 红黑树;并且这个TreeBin对象包含链表转换前的头节点,以及转换后红黑树的根节点;
6.addCount(1L, binCount);
这段代码在多线程环境下计数,并判断是否需要扩容。这部分代码实在太多,首先是在多线程环境下如何计数?再一个就是多线程如何扩容?后续再分析吧。
6.1 ConcurrentHashMap多线程计数原理
先说下多线程计数的原理吧,这里没有直接使用CAS来更新size的值;是为什么呢?
先来看下使用直接使用CAS有什么利弊?
??????????在多线程环境下,直接使用CAS更新size虽然可以保证数据的正确性,但是效率不高;如果线程竞争比较激烈,就需要多个线程排队更改单一的变量值;可以看到并发特别高的话,这样的做法效率就不高了,会让其他线程消耗cpu做空循环,占用了cpu又没做事; ??????????因此,ConcurrentHashMap采用了另一种高效的做法;设计了一种数据结构:基础值 + 数组;
int baseCount;
CounterCell[] counterCells;
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
??????????如果是多个线程同时put值需要更新size,进入到addCount中;这个时候仍然会先使用CAS更新baseCount,但是只有一个线程能更新成功;其余线程会分别将值更新到counterCells,如下图:
这就是ConcurrentHashMap在多线程环境下更新size的方法;相比直接使用CAS对一个变量更新,这种方法显然更高效;
??????????ConcurrentHashMap的更新size的大体原理就是这样,但细节处有所不同;CounterCells数组是一个懒加载,也就是说,没有多个线程同时竞争修改baseCount时,不会生成CounterCells数组,直接用CAS修改baseCount;当有多个线程竞争修改baseCount时才会生成CounterCells数组,每个线程在各自的CounterCell中计数互不干扰;
??????????这里说明一下,即使生成了CounterCells数组,也不会立即将CounterCells数组中所有元素都初始化一个CounterCell对象;ConcurrentHashMap的设计突出一个懒加载,生成数组时的策略是这样,生成在CounterCell对象时也是需要用到时才生成CounterCell对象用于处理线程的计数;
- 好处:这样可以节省内存空间;
- 坏处是看代码太费劲 ^ _ ^,逻辑也更复杂一些;【ps:现在机器 内存都挺大的了,没必要这么节省吧。】
addCount源码:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
*******************************************************************************************
}
流程图:
??????????上图中可以看到,fullAddCount是核心方法,很多分支都会经过这个方法;关于cunterCells数组的初始化,扩容,以及cunterCell计数都在这里面;fullAddCount源码比较多,但代码中并没有很复杂的算法并不难理解,只是分支多了一些; 在看fullAddCount方法之前简单介绍fullAddCount处理的流程:
- 判断counterCells是否为空
- counterCells为空,初始化counterCells数组;并生成CounterCell对象用于计数;
- counterCells不为空,获取到CounterCell对象计数;如果对象为空,new一个对象来处理;如果计数失败,说明有多个线程在使用同一个CounterCell对象计数;这个时候将会扩容counterCells数组;(扩容长度最大到cpu的个数不再扩容)扩容之后再定位处理线程的计数;
fullAddCount源码:
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) {
CounterCell r = new CounterCell(x);
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue;
}
}
collide = false;
}
else if (!wasUncontended)
wasUncontended = true;
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false;
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try {
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
}
}
保留代码整体框架,省略掉部分代码:
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
数组非空的处理逻辑;
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try {
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
}
}
counterCells 非空时的处理逻辑源码:
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) {
CounterCell r = new CounterCell(x);
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue;
}
}
collide = false;
}
else if (!wasUncontended)
wasUncontended = true;
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false;
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = ThreadLocalRandom.advanceProbe(h);
fullAddCount流程:
6.1.1为什么在线程在使用CounterCell对象计数时还是要使用CAS来更新值呢?
原因有以下2点:
- 在counterCells数组长度大于并发线程个数时:两个线程生成的随机数h不同,但是有可能定位到数组的同一个下标;这个时候如果两个线程同时进入到fullAddCount更新size就会产生冲突;这个情况,线程会重新产生一个随机数,来获取一个新的下标解决冲突问题;
- 在counterCells数组长度小于并发线程个数时;必然造成多个线程同时使用一个CounterCell;这种情况,会通过扩容来解决冲突;
6.1.1ConcurrentHashMap获取size
ConcurrentHashMap不是直接通过获取一个变量来获取size的;因为记录的方法:维护一个变量 baseCount + CounterCell数组;因此在获取size时,需要将counterCells数组中value的值累加,再加上 baseCount;
size =
∑
0
n
?
1
C
o
u
n
t
e
r
C
e
l
l
.
v
a
l
u
e
\sum_{0}^{n-1}CounterCell.value
∑0n?1?CounterCell.value + baseCount
6.2 ConcurrentHashMap扩容
宕机。待续。。
参考文章:并发之Striped64
|