哈希表
哈希表也叫散列表,这种数据结构提供了键(Key)和值(Value)的映射关系。只要给出一个key,就可以高效查找到它所匹配的value,时间复杂度接近于O(1)。如下所示:
1.7和1.8的区别
主要是两方面的,一方面是1.7里加锁用的是segment分段锁,锁的粒度较大。1.8里面用的是synchronized+CAS来实现的,锁的粒度也从分段锁缩小为节点锁。
第二个是数据结构的层面,1.7里面是数组+链表,1.8里面是数组+链表+红黑树,时间复杂度O(n) -> O(logn)。 如下所示:
put()
点开put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
然后再进入putVal方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
这里有一个常量TREEIFY_THRESHOLD值是8,也就是在大于等于8的时候转红黑树,为什么是8呢,我们来看下作者写的注释,意思是说hash函数计算的结果离散性好,分布的很均匀,很少会出现链表很长的情况,当长度为 8 的时候,概率仅为 0.00000006。这是一个小于千万分之一的概率,等达到了8再转也不迟。
initTable()
初始化数组,首先看下sizeCtl这个变量,可以理解为一个状态机,作者在字段上面写了注释,我在下面也画了图,不理解这个字段看代码也是看不懂的。
下面是源码讲解
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield();
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
我们看到sc = n - (n >>> 2)其实就相当于sc=0.75*n,那为什么选择0.75作为扩容的阈值呢,我们还是看下作者写的注释,负载因子太小了浪费空间并且会发生更多次数的resize,太大了哈希冲突增加会导致性能不好,总的来说,这保持了一个普遍接受的时间/空间权衡。
treeifyBin()
如果链表长度大于等于8的话会进到这个方法,进来以后会判断是进行扩容还是转红黑树。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
所以以后回答链表长度大于等于8的时候转红黑树是不严谨的,应该是链表长度大于等于8并且数组长度大于64的时候,链表才会转为红黑树。
tryPresize()
该方法是扩容方法,有以下几个注意点。
- 扩容的时候是多线程并发(允许多个线程来协助)
- 扩容大小是乘以2的扩容,32->64,64->128
- 新建一个新的数组以后,需要把老数组里的值迁移到新数组中去
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
上面的int rs = resizeStamp(n)就是生成扩容戳,大概长这样:
1000 0000 0001 1011 1010 0000 0000 0010
上面提到过扩容的时候是多线程帮忙扩容的,如下图: 但是具体有多少个线程参与数据的迁移工作,要保证所有的线程完成了迁移的动作,才能够表示扩容完成,所以需要有个地方来记录,记录的地方就是在sizeCtl。当第一个线程进来的时候,将扩容戳 左移16位然后加2,然后赋值给sizeCtl,此时sizeCtl就变成了:
1010 0000 0000 0010 0000 0000 0000 0010
高位16表示当前的扩容标记,保证唯一性,低16位表示当前扩容的线程数量。 当再有线程进来帮忙扩容的时候低16位就不停的加1,加1…
1010 0000 0000 0010 0000 0000 0000 0011
1010 0000 0000 0010 0000 0000 0000 0100
1010 0000 0000 0010 0000 0000 0000 0101
...
transfer()
transfer是数据的迁移,首先计算当前线程的数据迁移空间,然后创建一个新的数组,最后进行数据的迁移。
- 如果是红黑树,数据迁移后,不满足红黑树的条件,则红黑树转链表
- 如果是链表,进行高低位链表迁移
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false;
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n;
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true;
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
比如有key的hash值为4,20,52,68,84,100,它们在 &(n-1) 即 &15 后的值都为 4,但是在扩容以后 n-1变为了 &31,此时我们再计算一下:
4 & 31 = 4
20 & 31 = 20
52 & 31 = 20
68 & 31 = 4
84 & 31 = 20
100 & 31 = 4
也就是说4,68,100还是留在原来4的位置,20,52,84则需要迁移到新的20的位置,这个就是高低位链迁移。 需要迁移的数据放在高位链,不需要迁移的放在低位链,然后一次性把高位和低位链set到指定的新的数组下标位置。 所以我们最后在源码中看到低位链留在原来的i(4)的位置,高位链则迁移到i(4)+n(16) 20的位置。
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
helpTransfer()
最后回到一开始putVal 方法中,当判断当前节点正常迁移的过程中,则去帮忙迁移。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
这段代码和tryPresize 方法里的一段代码是一样的,最终还是调用transfer 方法。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
addCount()
putVal 方法里面还有个方法addCount 就是计数,每put一个元素就+1,最后给size()统计用。我们知道肯定不是简单的+1,因为是非线程安全的,源码里是这样做的,定义baseCount和counterCells两个变量。
private transient volatile long baseCount;
private transient volatile CounterCell[] counterCells;
- 如果竞争不激烈的情况下,直接用cas( baseCount+1)
- 如果竞争激烈的情况下,随机负载数组中的某一个值进行cas+1,这其实就是负载均衡,达到分流效果的思想。
流程图:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
fullAddCount()
这个就是在初始化数组,以及对数组里的值进行+1操作。
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) {
CounterCell r = new CounterCell(x);
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue;
}
}
collide = false;
}
else if (!wasUncontended)
wasUncontended = true;
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false;
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try {
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
}
}
size()
size()方法里汇总数组+baseCount的值来完成数据累加即可。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
结尾
最后向读到这里的读者说声抱歉,本人水平有限,ConcurrentHashMap有6000多行源码,其中的很多变量,很多函数方法的细节我也不是完全懂,只能理解个大概,如果有写的不对或者没写到的地方,还请多多包涵,留言评论指教!
|