概述
哈希表 是一种古老的数据结构,在 1953 年就有人使用拉链法实现了哈希表,它能够根据键(Key)直接访问内存中的存储位置,也就是说我们能够直接通过键找到该键对应的一个值,哈希表名称的来源是因为它使用了哈希函数将一个键映射到一个桶中,这个桶中就可能包含该键对应的值。
哈希函数
哈希函数(常被称为散列函数)是可以用于将任意大小的数据映射到固定大小值的函数,常见的包括MD5、SHA系列等。 一个设计优秀的哈希函数应该包含以下特性:
- 均匀性:一个好的哈希函数应该在其输出范围内尽可能均匀地映射,也就是说,应以大致相同的概率生成输出范围内的每个哈希值。
- 效率高:哈希效率要高,即使很长的输入参数也能快速计算出哈希值。
- 可确定性:哈希过程必须是确定性的,这意味着对于给定的输入值,它必须始终生成相同的哈希值。
- 雪崩效应:微小的输入值变化也会让输出值发生巨大的变化。
不可逆:从哈希函数的输出值不可反向推导出原始的数据。
哈希冲突
哈希函数在实际中遇到的最常见的问题就是哈希碰撞, 即不同的键通过哈希函数可能产生相同的哈希值, 则他们会被分配到同一个桶中 主要有两种策略:
- 拉链法
拉链法是将同一个桶中的元素通过链表的形式连接起来. 好处就是 : 不用预先申请空间, 可以不断的链接新的元素 缺点就是 : 需要额外的指针来连接元素, 增加了哈希表的大小, 同时如果同一个桶中的连接元素太多了, 那么对于查找来说就不严格符合O(1), 当一条链表过长时, 我们需要改变这种情况, 让这个链表减少元素 , 更改哈希函数, 或者扩容. - 开放寻址法
假如, “CCCCC” 与 “GOGOGO” 的 hash(key) 对应的桶号都相同是 152,产生 hash 碰撞,那么后加入的元素 “GOGOGO” 自动放在 152 相邻的后一个位置 153 号桶,如果 “RustRust” 的 hash 值是153,发现153已经被占用了后,就放在相邻的后一个位置 154 上。 说的题外话,我们仔细思考一下就会发现,所谓 hash map 的时间复杂读是 O(1),那是理论上。 但如果我们的 hash 函数选的不好,散列不均匀,老是发生 hash 碰撞,即 hash map 的时间复杂度就会变成 O(n)。 所以要想 hash map 的性能好,hash 函数的选择是关键。
map基本数据结构
map的核心数据结构是 /runtime/map.go
type hmap struct {
count int
flags uint8
B uint8
noverflow uint16
hash0 uint32
buckets unsafe.Pointer
oldbuckets unsafe.Pointer
nevacuate uintptr
extra *mapextra
}
在源码中表示map 的结构体是hmap,即hash map的缩写。 count 指的是当前map的大小,即当前存放的元素个数,必须放在第一个位置,因为通过len()函数时,会通过unsafe.Poiner从这里读取此值并返回 flags 可以理解为一个标记位,几种值的定义(这里),在多个地方需要使用此值,如mapaccess1()。 B 是一个对数,表示当前map持有的 buckets 数量(2^B)。但需要考虑一个重要因素装载因子,所以真正可以使用的桶只有loadFactor * 2^B 个,如果超出此值将触发扩容,默认装载因子是6.5
noverflow 溢出buckets数量 hash0 hash 种子,在操作键值的时候,需要引入此值加入随机性 buckets 底层数组指针,指向当前map的底层数组指针 oldbuckets 同是底层数组指针,一般在进行map迁移的时候,用来指向原来旧数组。只有迁移过程中此字段才有意义,此字段数组大小只有buckets 的一半 nevacuate 进度计算器,表示扩容进度,小于此地址的 buckets 迁移完成 extra 可选字段,溢出桶专用,只要有桶装满就会使用.
mapextra结构体
type mapextra struct {
overflow *[]*bmap
oldoverflow *[]*bmap
nextOverflow *bmap
}
当 map 的 key 和 value 都不是指针且内联时,将会把 bmap 标记为不含指针,这样可以避免 gc 时扫描整个 hmap。但是,我们看到 bmap 有一个 overflow 的字段,是指针类型的,破坏了 bmap 不含指针的设想,这时会把 overflow 移动到 extra 字段来。
overflow 和 oldoverflow 只有当键和值不包含指针才使用,其中overflow 存储hmap.buckets 的溢出桶,而oldoverflow 存储hmap.oldbuckets 的溢出桶。
间接允许将指向切片的指针存储在hiter中
nextOverlflow 指向下一个空的溢出桶的指针
bmap结构体
type bmap struct {
tophash [bucketCnt]uint8
}
···
bmap即bucket map的缩写。
这里只有一个tophash 字段,而实际上在使用中值的类型是不固定的,甚至可以是一个自定义结构体的指针类型。这个结构体看起来可能有点让人费解,其实编译器在编译期间会动态创建一个新的同名数据结构,如下
type bmap struct {
topbits [8]uint8
keys [8]keytype
values [8]valuetype
pad uintptr
overflow uintptr
}
每个桶里有0到7共八个存储位置,也就是说每个桶最多可以存储8个元素,其中将所有的键放在一起,按先后顺序存放在一个keys字段中,再将所有的值按同样的顺序放在一起存放在values字段中(注意:没有将每个键值单独存储在一个字段中,主要是为了解决内存对齐带来的浪费问题。) 例如: map[int64]int8 ,如果按key/value/key/value/… 这种方式存储的话,则在每一个key/value之后需要padding7个字节才行。而如果按上面key/key/key/… 和 value/value/value/…这种方式的话,只需要在最后一个值的后面进行一次padding就可以了,大大节省了内存开支。如果不太懂这一块的话,可以了解一下golang的内存对齐。
如果当前bucket已满的话,则会创建新的bucket,并使用 overflow 字段进行bucket之间的链接,实现单向链表功能。 在以上图解示例中,该桶的第7位cell和第8位cell还未有对应键值对。需要注意的是,key和value是各自存储起来的,并非想象中的key/value/key/value…的形式。这样做虽然会让代码组织稍显复杂,但是它的好处是能让我们消除例如map[int64]int所需要的填充(padding)。此外,在8个键值对数据后面有一个overflow指针,因为桶中最多只能装8个键值对,如果有多余的键值对落到了当前桶,那么就需要再构建一个桶(称为溢出桶),通过overflow指针链接起来。
map结构体关系图
重要常量标志
const (
bucketCntBits = 3
bucketCnt = 1 << bucketCntBits
loadFactorNum = 13
loadFactorDen = 2
maxKeySize = 128
maxElemSize = 128
dataOffset = unsafe.Offsetof(struct {
b bmap
v int64
}{}.v)
emptyRest = 0
emptyOne = 1
evacuatedX = 2
evacuatedY = 3
evacuatedEmpty = 4
minTopHash = 5
iterator = 1
oldIterator = 2
hashWriting = 4
sameSizeGrow = 8
noCheck = 1<<(8*sys.PtrSize) - 1
)
key的定位算法
在map中要存储一个键值对,必须先找到所在的位置,一般采用hash算法即取模的结果来实现,这里我们主要介绍一下map中是如何实现的。
先介绍一下map装载相关的内容。上面我们介绍 bmap 结构体的时候,有一个 B 字段,这个字段决定了map对应的底层数组的大小,它的大小决定了可以容纳的bucket的个数。如果B=5的话,则可以bucket的数组元素值个数为 2^5=32个,但golang中需要考虑到装载分子6.5这个因素,所以真正装载的元素并没有这么多。每当一个map存储的元素个数占比达到65%的时候,就会触发map的扩容操作。
如果想知道一个key要放在哪个bucket个, 需要先计算出一个hash值,然后再除以 32取余数即可。golang 也是如此实现的,只不过是为了性能考虑,使用了位操作而已。如5 2 换作用位操作的话,就是将5左移1位即可。每左移1位一次就是2,左移两位就是*4,同理右移则是相除。
如 hash(key) 计算出的结果为
10010111 | 000011110110110010001111001010100010010110010101010 │ 00110
根据上面说的h.B = 5 ,则取后面的5 位 00110 ,值为6 ,则需要将key:value 存放在第6 号的bucket 中。
找到了要存储的桶位置,还需要找到放在桶的什么位置,每个位置我们可以称之为slot 或者Cell 。
const (
bucketCntBits = 3
bucketCnt = 1 << bucketCntBits
......
}
这里定义了一个bucket最多存放元素个数的相关常量,在go里一个bucket最多可以存放 2^3=8个 元素,再多的话就需要使用到溢出桶overflow。
slot位置计算公式为取hash的高8位 ,计算函数为tophash() 。这里高8位是 10010111 ,十进制的值为151, 于是在第6号bucket的中遍历每个slot,直到找到bmap.tophash值为151的slot ,当前slot在bucket中的索引位置就是要找的键和值的索引位置。然后根据当前位置索引值分别从bmap.keys 和bmap.values 中计算出对应的值。
如果当前bucket中没有找到,就去当前bucket 中的溢出桶overflow中查找,如果overflow为nil,则直接返回空值即可。
总结
要想实现元素的存储定位需要三个步骤:
- 根据h.B 来的值,来取出hash结果的最后 b.B 位数字(低位),定位到bucket
- 再根据hash结果的高8位,实现在bucket定位到指定的位置
- 最后根据位置分别从 bmap.key和bmap.values中读取存储的值内容
map创建
package main
func main() {
m := make(map[int]string)
m[0] = "a"
_ = m[0]
_, _ = m[0]
delete(m, 0)
}
我们先看一下与map想着的runtime有哪些
$ go tool compile -S main.go | grep runtime
0x0080 00128 (main.go:4) CALL runtime.fastrand(SB)
0x00aa 00170 (main.go:5) CALL runtime.mapassign_fast64(SB)
0x00bc 00188 (main.go:5) CMPL runtime.writeBarrier(SB), $0
0x00f1 00241 (main.go:6) CALL runtime.mapaccess1_fast64(SB)
0x0114 00276 (main.go:7) CALL runtime.mapaccess2_fast64(SB)
0x0137 00311 (main.go:8) CALL runtime.mapdelete_fast64(SB)
0x0153 00339 (main.go:5) CALL runtime.gcWriteBarrier(SB)
0x0160 00352 (main.go:3) CALL runtime.morestack_noctxt(SB)
......
可以看到每个对map操作基本都有相应的同名调用函数。不过对map的初始化操作函数不是fastrand()函数,而是 runtime.makemap() 函数(还有一个runtime.makemap_small函数)。fastrand()函数只是用来产生随机hash种子的。
注意:我们这里没有指定创建map的长度,runtime会根据是否指定这个字段做一些情况考虑。
func makemap(t *maptype, hint int, h *hmap) *hmap {
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
hint = 0
}
if h == nil {
h = new(hmap)
}
h.hash0 = fastrand()
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}
步骤:
- 计算指定的大小所需要的内容是否超出出系统允许的最大分配大小
- 初始化hmap,并指定随机种子
- 通过overLoadFactor(hint, B)函数找到一个能装下指定map大小个元素个数的最小B,从小到大开始循环B。刚开始map越小越好
- 开始对hash table进行初始化。如果B==0则buckets 进行延时初始化操作(赋值的时候才进行初始化),如果B值特别大,则初始化需要一段时间,主要通过 makeBucketArray() 函数实现
注意 makemap()函数返回的是一个指针,而makeslice()返回的是一个新的结构体,在参数传递的时候,是值复制,两者有差异,有些是引用的是同一个数组,有些不是。
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
base := bucketShift(b)
nbuckets := base
if b >= 4 {
nbuckets += bucketShift(b - 4)
sz := t.bucket.size * nbuckets
up := roundupsize(sz)
if up != sz {
nbuckets = up / t.bucket.size
}
}
if dirtyalloc == nil {
buckets = newarray(t.bucket, int(nbuckets))
} else {
buckets = dirtyalloc
size := t.bucket.size * nbuckets
if t.bucket.ptrdata != 0 {
memclrHasPointers(buckets, size)
} else {
memclrNoHeapPointers(buckets, size)
}
}
if base != nbuckets {
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
last.setoverflow(t, (*bmap)(buckets))
}
return buckets, nextOverflow
}
初始化bucket
- 对于b<4的话(桶的数量< 2^4),基本不需要溢出桶,这样就可以避免开销。对于b>4(桶的数量> 24)的话,则需要创建2(b-4)个溢出桶。
- 数组分配通过函数 newarray() 实现,第一个参数是元素类型,每二个参数是数组长度,返回的是数组内存首地址
map读取
对于map中的赋值与修改原理是基本一样的,先找到位置,不管原来位置有没有数据直接存储新数据就可以了。
对于map的赋值操作,是由函数 runtime.mapaccess1()和 runtime.mapaccess2() 来实现的。两者的唯一区别就是返回值不一样,runtime.mapaccess1() 返回的是一个值,runtime.mapaccess2() 返回的是两个值,第二个值决定了key是否在map中存在。这点可以通过上面我们的 compile 结果中看出他们的区别。这里我们只介绍runtime.mapaccess1()
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if raceenabled && h != nil {
callerpc := getcallerpc()
pc := funcPC(mapaccess1)
racereadpc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled && h != nil {
msanread(key, t.key.size)
}
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.hasher(key, 0)
}
return unsafe.Pointer(&zeroVal[0])
}
if h.flags&hashWriting != 0 {
throw("concurrent map read and map write")
}
hash := t.hasher(key, uintptr(h.hash0))
m := bucketMask(h.B)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
if c := h.oldbuckets; c != nil {
if !h.sameSizeGrow() {
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
if !evacuated(oldb) {
b = oldb
}
}
top := tophash(hash)
bucketloop:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
if t.key.equal(key, k) {
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
return e
}
}
}
return unsafe.Pointer(&zeroVal[0])
}
runtime.mapaccess1()函数返回一个指定h[key]的指针,如果key不存在,则返回当前map值类型的zero值,注意永远也不会返回一个nil值。
返回的指针在map中是永远有效的,尽量读取完尽快使用并释放,不要长期持久。
步骤:
- 判断h是否为nil或者h.count值是否为0,如果h为nil则表示未初始化,则可能panic,如果h.count=0,则表示map为空,则直接返回一个zero值。
- 考虑是否处于并发读写状态,否则产生panic
- 根据键key计算hash值
- 计算低B位的掩码bucketMask(h.B),比如 B=5,那 m 就是31,低五位二进制是全1
- 计算当前bucket的地址
- 根据h.oldbuckets是否为nil来判断是否处于扩容中,如果不是nil则表示当前map正处于扩容状态中。将m减少一半,重新计算当前key在 oldbuckets中的位置,如果oldbuckets没有全部迁移到新bucket的话(bmap.tophash的意义所在),则在oldbuckets中查找。
- 计算tophash,即高八位
- 真正开始查找key,外层for是循环bucket及溢出桶overflow,内层for是循环桶内的8个slot
如果 b.tophash[i] != top 则表示当前bucket的第i个位置的tophash 与当前key的tophash不同。这时分两种情况: 一种是当前的标识是 emptyRest 表示剩下的所有slot全是空,则直接结束当前bucket查找,继续下一下bucket重复此步骤(b = b.overflow(t)); 另一种则是继续查找下一个slot,直到8个slot全部查找完毕,然后再查找下一个bucket 如果b.tophash[i] == top,则根据键bucket的首地址+类型计算出字节长度*slot索引值 计算得出key的位置 判断同时bucket中的key与请求的key是否相等,如果相等的话,按取键的方法取出值,并返回,否则继续 - 如果在最后仍未找到key,则直接返回zero 值
key的定位公式
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
value 的定位公式
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
其中 dataOffset 表示key相对于bmap的偏移量,结构体如下
dataOffset = unsafe.Offsetof(struct {
b bmap
v int64
}{}.v)
所以键的地址=当前bucket的起始位置(unsafe.Pointer(b)) + key 的偏移量(dataOffset )+当前slot索引值(i) * 每个键的大小(uintptr(t.keysize) )
而对于值来说,由于bmap.values 在b.map.keys 后面,所以要先将8个键的地址全部计算上才行,同样值类型也有自己的大小 t.elemsize
对于bucket的几种状态码如下
emptyRest = 0
emptyOne = 1
evacuatedX = 2
evacuatedY = 3
evacuatedEmpty = 4
minTopHash = 5
emptyRest 表示当前cell为空,并且它后面的所有cell也为空,包括溢出桶overflow。 emptyOne 表示当前cell为空。 evacuatedX 扩容相关,当前bucket的slot值迁移到了新的X部分,由于每次扩容时,都是两倍扩容,所以原来bucket中的8个slot值,有的被迁移到了新bucket的X部分,有的是Y部分(原来的1个桶变成2个桶,分别称作X和Y)。 evacuatedY 扩容相关,第二部分迁移完毕。 evacuatedEmpty 当前cell为空,且迁移完成。
minTopHash tophash最小值,如果在调用 tophash(hash)时,计算出的值小于此值,则会加上此值(代码)
map赋值
与赋值相关的函数是mapassign,根据key的不同底层调用相同类型的函数 ,常见的有
key | 类型 | 插入 |
---|
uint32 | mapassign_fast32(t *maptype, h *hmap, key uint32) | unsafe.Pointer | uint64 | mapassign_fast64(t *maptype, h *hmap, key uint64) | unsafe.Pointer | string | mapassign_faststr(t *maptype, h *hmap, ky string) | unsafe.Pointer |
我们这里只主要介绍常用的mapassign函数。
函数原型
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {}
步骤
-
检查是否处于并发模式,否则产生panic -
进行 hash 值计算(同上) -
检查bucket是否进行过初始化,否则调用 newobject(t.buckt) 函数进行初始化,这点我们上面介绍过 -
检查map 是否处于扩容状态(h.growing()),如果是则调用函数growWork(t,h,bucket)进行扩容工作 -
计算当前bucket的地址 -
计算tophash,开始在bucket内通过 for 遍历出相应的位置,这点同上面的方法一样,双层for查找bucket,如果找到了再去overflow查找。找到则进行typedmemmove(t.key, k, key)更新并直接结束整个逻辑 -
如果没有map里没有找到key,则判断如果map元素个数+1的话会不会触发扩容 扩容所必须的两个条件:
- 达到了最大装载因子
- 溢出桶是否过多
只要这两个其中的一个条件满足,则先进行扩容然后再重头开始。 -
到目前为止才算正式开始追加新的kv。要写入前必须要知道要写入的位置,有两个重要的变量,一个是inserti,另一个是insertk。如果此时inserti 为 nil的话,说明在上面第6步骤的时候遍历了所有的bucket和overflow也没有找到要插入的位置(bucket或overflow满了),则创建新的overflow,然后在新的overflow里计算出key和vlaue要存放的内存位置(代码) -
根据key和value的地址,写入新的值,并将当前map的元素个数加1。(需要特别注意的在这里并没有写入value,因为函数参数并没有传入值,所以返回的是一个存放v的内存地址,有了这个地址赋值就解决了) 总结:
触发map扩容的两个条件,一个是装载因子,另一个是overflow过多,对于overflow多少算多呢见下方说明。
map删除
与删除有关的函数为 mapdelete,原型
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {}
根据 key 类型的不同,删除操作会被优化成更具体的函数:
key 类型 | 删除 |
---|
uint32 | mapdelete_fast32(t *maptype, h *hmap, key uint32) | uint64 | mapdelete_fast64(t *maptype, h *hmap, key uint64) | string | mapdelete_faststr(t *maptype, h *hmap, ky string) |
golang mapdelete 步骤
- 判断map是否为nil或者根本就没有任何元素
- 检查是否并发模式
- 计算hash,双层for循环遍历查找key,找到位置后进行以下工作
将key位置写入nil或置零值 将value位置写入nil或置零值 将tophash位置置为 emptyOne 状态 判断当前key是不是bucket中的最后一个元素,如果是最后一个元素,且当前bucket还有overflow,而overflow的首个tophash[0] != emptyRest 说明已经没有元素可查找了,于是直接结束。否则判断当前key的下一个 tophash是不是emptyRest,如果不是的话,直接结束逻辑。后面利用for循环重置tophash 的状态为最合适的状态,以方便后期循环使用
map扩容
随着map元素的添加,有可能出现bucket的个数不够,导致overflow桶越来越多,最后变成了一个单向链表了,查找效率越来越差,最差的情况下可能会变成O(n)。这与我们期待的O(1)相反,所以golang为了解决这个问题,就需要对map中的元素进行扩容重新整理,以达到最优的效果。那么什么时候才需要扩容呢?
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again
}
这里有两个条件,只要满足其中一项就会进行扩容
-
当装载因子越过6.5的时候,这个是内部固定的一个值(查看说明) -
当overflow 过多的时候 a. 当 B 小于 15,也就是 bucket 总数 2^B 小于 2^15 时,如果 overflow 的 bucket 数量超过 2^B; b. 当 B >= 15,也就是 bucket 总数 2^B 大于等于 2^15,如果 overflow 的 bucket 数量超过 2^15。 主要分界点就是15(代码)
针对第一个条件,就是说元素太多,桶装不下了,立即进行扩容,添加一倍的桶,保证元素个数最多只能达到桶总容量的65%; 针对第二种情况一般是先是大量的添加元素,后来又删除了,再添加导致出现了太多的空的overflow,这样查找key的话,就需要遍放多桶,效率大大下降,这时就需要重新对元素进行位置调整,类似于windows系统中的磁盘碎片整理一样,将overflow中的元素向前面的bucket迁移,以便下次查找数据时效率更高,这种情况我们称之为等量扩容 。
func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
if B > 15 {
B = 15
}
return noverflow >= uint16(1)<<(B&15)
}
虽然这里调用了 hashGrow() 函数,但它并不有真正的进行扩容,我们先看下它的源码
func hashGrow(t *maptype, h *hmap) {
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
bigger = 0
h.flags |= sameSizeGrow
}
oldbuckets := h.buckets
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0
if h.extra != nil && h.extra.overflow != nil {
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}
}
其中 hashGrow() 只是为扩容做的一些准备工作,而真正进行扩容的是 growWork() 和 evacuate()函数。
函数 growWork() 只有在赋值和删除的过程中会出现,所以也只有这两个动作才会触发扩容动作。
我们再看看growWrok()源码
func growWork(t *maptype, h *hmap, bucket uintptr) {
evacuate(t, h, bucket&h.oldbucketmask())
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}
从代码里可以看到,先是迁移我们正在使用的bucket ,如果还有 bucket 没有迁移完的话,就再迁移一个bucket,也就是说一次最多可迁移两个bucket。
这里 h.oldbucketmask() 是计算oldbucket 的位置的,实现原理我们上面介绍过的,计算hash值的最低B位来决定用哪个bucket。
func (h *hmap) oldbucketmask() uintptr {
return h.noldbuckets() - 1
}
下面我们重点关注 evacuate() 函数,最复杂的工作就是由它来完成的。
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
newbit := h.noldbuckets()
if !evacuated(b) {
var xy [2]evacDst
x := &xy[0]
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*uintptr(t.keysize))
if !h.sameSizeGrow() {
y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}
for ; b != nil; b = b.overflow(t) {
k := add(unsafe.Pointer(b), dataOffset)
e := add(k, bucketCnt*uintptr(t.keysize))
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
top := b.tophash[i]
if isEmpty(top) {
b.tophash[i] = evacuatedEmpty
continue
}
if top < minTopHash {
throw("bad map state")
}
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
var useY uint8
if !h.sameSizeGrow() {
hash := t.hasher(k2, uintptr(h.hash0))
if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
useY = top & 1
top = tophash(hash)
} else {
if hash&newbit != 0 {
useY = 1
}
}
}
if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
throw("bad evacuatedN")
}
b.tophash[i] = evacuatedX + useY
dst := &xy[useY]
if dst.i == bucketCnt {
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
dst.b.tophash[dst.i&(bucketCnt-1)] = top
if t.indirectkey() {
*(*unsafe.Pointer)(dst.k) = k2
} else {
typedmemmove(t.key, dst.k, k)
}
if t.indirectelem() {
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.elem, dst.e, e)
}
dst.i++
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
}
}
if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}
函数advanceEvacuationMark() 源码
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
h.nevacuate++
stop := h.nevacuate + 1024
if stop > newbit {
stop = newbit
}
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate++
}
if h.nevacuate == newbit {
h.oldbuckets = nil
if h.extra != nil {
h.extra.oldoverflow = nil
}
h.flags &^= sameSizeGrow
}
}
对于map 的扩容,主要分两种情况,一个是等量扩容,另一种是非等量扩容,什么意思呢?
所谓等量扩容是指原来是一个bucket,虽然扩容的时候创建了两个新的bucket,但使用的时候只使用了其中一个bucket,另一个没有使用,所以这种情况下的扩容是直接将原来oldbucket中的元素直接迁移到新 bucket中的x桶就可以了。
而非等量扩容就是将原来一个bucket 中的内容迁移到新的两个bucket中。
我们知道每个bucket 最多只有8个元素,迁移需要一个一个的迁移,每迁移完一个元素,都需要将当前cell作个标记,表示位置的元素已经迁移完成,对于等量扩容和非等量扩容两者是有些细微的差别的。我们先看一下等量迁移。
对于等量扩容来说,由于bucket是1对1的迁移,况且都迁移到了x桶,所以oldbucket中每个cell在迁移后被标记为evacuatedX,表示当前oldbucket中的元素全部被迁移到了x桶了。 经过等量扩容后,原来的overflow消失了,所有元素都放在了同一个bucket,节省资源的同时查询效率也提高了。是不是有点像widows 的磁盘碎片整理。 而对于非等量扩容,bucket的关系就是1对2了,oldbucket 中的元素有可能经过rehash后,被迁移到了x桶,也有可能被迁移到了y 桶。 如果是被迁移到了y桶的话,则标记为evacuatedY,否则就是evacuatedX。
如果oldbucket中的元素为空(emptyRest或emptyOne),即未存储任何元素或者原来存储过后来删除了,则需要标记为evacuatedEmpty。
经过迁移后,我们发现元素在oldbucket中的存储顺序,到新的bucket中发生了变化。如odlbucket里有8个元素(1-8),遍历打印的时候,会打印12345678。后来分成了两个bucket, 其中3/4/8 放在了x桶,1/2/5/6/7放在了y桶,那么这时遍历打印的话,则是遍历打印x桶,然后是y桶,最后打印34812567,与原来的顺序不一样了,这正是我们平时所说的对map的遍历并不保证一定的顺序的。
当然在源代码里起始位置也是随机定位的,先是随机选择一个bucket,然后再随机选择一个cell位置,开始往后遍历查找,当达到最后bucket的最后一个元素的时候再才第一个bucket查找,走到遍历到当前元素等于起始元素为止,或者官方是为了引起我们的注意故意这样设计的吧。所在就算你是硬编码写死的map不进行删除或添加,打印的也是随机的。
由于map内部是分bucket迁移的,每次最多迁移两个bucket,所以在遍历数据的时候有也些复杂,即可考虑新bucket的数据,也要考虑oldbucket中的数据。有些oldbucket中的数据被分两到两个新bucket中了,导致要考虑的情况要多一些。等有时间了再单独写写这一块的逻辑。
最后,当所有数据都迁移完成后,还需要将oldbucket清除,直接通过设置 h.oldbuckets为nil即可,记得还有一个mapextra.oldoverflow。
在扩容过程中,有一段说明需要注意到
作者:孙兴芳 链接:https://www.jianshu.com/p/888a90aa25b0 来源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 在扩容过程中,有一段说明需要注意到
// If key != key (NaNs), then the hash could be (and probably // will be) entirely different from the old hash. Moreover, // it isn’t reproducible. Reproducibility is required in the // presence of iterators, as our evacuation decision must // match whatever decision the iterator made. // Fortunately, we have the freedom to send these keys either // way. Also, tophash is meaningless for these kinds of keys. // We let the low bit of tophash drive the evacuation decision. // We recompute a new random tophash for the next level so // these keys will get evenly distributed across all buckets // after multiple grows.
在进行hash计算的时候,有一种key,它每次经过hash计算的时候,得出的结果都不一样,这个key就是math.NaN(),表示Not a Number,类型是float64。一旦使用此key进行存储后,几乎无法正确读取出来值,只有在遍历的时候才会读取到此值。为了解决这个问题,官方采用了使用tophash的最低位来决定是放在x还是y,如果是0则放在x,否则放在y。
总结
- map不是并发安全的,所以在源码里有大量的代码判断
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
在赋值时会设置map处于协程写的状态
h.flags ^= hashWriting
- 遍历map是无序的,因为随着元素的添加和删除会进行扩容,元素位置会发生变化
|