redis6.2.6
一、简介
redis中位图不是一个数据类型,而是基于String类型的一系列位操作。并且redis的String类型是二进制安全的,最大长度512M,所以可以建立
2
32
2^{32}
232个比特位。 位图操作分为两类:
- 常量时间操作
比如对某位的设置、获取。 - 范围操作
比如对某个范围内的位进行统计计数等,这样时间复杂度就不是常量级别了。
因为最大分配512M, 所以当使用的位图很大时,第一次访问不存在的位图时,分配大的空间,容易导致主线程阻塞
二、位图操作
2. 1 位图的最大限制
可通过redis.conf配置文件进行修改
# In the Redis protocol, bulk requests, that are, elements representing single
# strings, are normally limited to 512 mb. However you can change this limit
# here, but must be 1mb or greater
#
# proto-max-bulk-len 512mb
如果不配置,则默认值如下
...
standardConfig configs[] = {
...
createLongLongConfig("proto-max-bulk-len", NULL, MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL),
...
};
如果修改,也不能太小,因为这也是请求协议数据最大的限制
2.2 SETBIT key offset value (设置)
时间复杂度 O(1) 比如下面的命令,设置key为name的第10bit为1
127.0.0.1:6379> setbit name 10 1
2.2.1 解析offset
因为网络上传输的都是字符串,这里需要将数字字符串转换为整数类型,比如本例子中的“10” -> 10
void setbitCommand(client *c) {
...
if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK)
return;
...
}
int getBitOffsetFromArgument(client *c, robj *o, uint64_t *offset, int hash, int bits) {
long long loffset;
char *err = "bit offset is not an integer or out of range";
char *p = o->ptr;
size_t plen = sdslen(p);
...
if (string2ll(p+usehash,plen-usehash,&loffset) == 0) {
addReplyError(c,err);
return C_ERR;
}
...
if ((loffset < 0) || (loffset >> 3) >= server.proto_max_bulk_len)
{
addReplyError(c,err);
return C_ERR;
}
*offset = loffset;
return C_OK;
}
2.2.2 解析value
转换value的值,并且只能是0 或者 1
void setbitCommand(client *c) {
...
if (getLongFromObjectOrReply(c,c->argv[3],&on,err) != C_OK)
return;
if (on & ~1) {
addReplyError(c,err);
return;
}
}
可以看到,最终都是调用的string2ll
getLongFromObjectOrReply
|
V
getLongLongFromObjectOrReply
|
V
getLongLongFromObject
|
V
string2ll
2.2.3 查询key
从全局字典中查询key
void setbitCommand(client *c) {
...
if ((o = lookupStringForBitCommand(c,bitoffset)) == NULL) return;
...
}
(1)未找到
1. 创建key, 插入到全局字典中
2. 创建 offset/8+1 字节的sds,作为值,默认清空的
robj *lookupStringForBitCommand(client *c, uint64_t maxbit) {
...
if (o == NULL) {
o= createObject(OBJ_STRING,sdsnewlen(NULL, byte+1));
dbAdd(c->db,c->argv[1],o);
}
...
}
(2)找到
robj *lookupStringForBitCommand(client *c, uint64_t maxbit) {
...
if (o == NULL) {
...
} else {
o = dbUnshareStringValue(c->db,c->argv[1],o);
o->ptr = sdsgrowzero(o->ptr,byte+1);
}
return o;
}
1. 判断是否为共享对象或者非原始字符串对象编码,是则创建一个新对象
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
serverAssert(o->type == OBJ_STRING);
if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
robj *decoded = getDecodedObject(o);
o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
dbOverwrite(db,key,o);
}
return o;
}
2. 判断是否需要扩容,进行自动的扩容
sds sdsgrowzero(sds s, size_t len) {
size_t curlen = sdslen(s);
if (len <= curlen) return s;
s = sdsMakeRoomFor(s,len-curlen);
if (s == NULL) return NULL;
memset(s+curlen,0,(len-curlen+1));
sdssetlen(s, len);
return s;
}
2.2.4 获取当前值,用于返回值
byte = bitoffset >> 3;
byteval = ((uint8_t*)o->ptr)[byte];
bit = 7 - (bitoffset & 0x7);
bitval = byteval & (1 << bit);
2.2.5 设置值
byteval &= ~(1 << bit);
byteval |= ((on & 0x1) << bit);
((uint8_t*)o->ptr)[byte] = byteval;
2.2.6 其他操作
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
server.dirty++;
2.2.7 响应客户端
addReply(c, bitval ? shared.cone : shared.czero);
2.3 GETBIT key offset (获取)
2.3.1 解析offset
将字符串的数字offset转换为整数
void getbitCommand(client *c) {
...
if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK)
return;
...
}
2.3.2 根据key查找对象,并进行类型判断
当key不存在时,直接返回0
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,o,OBJ_STRING)) return;
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
robj *o = lookupKeyRead(c->db, key);
if (!o) SentReplyOnKeyMiss(c, reply);
return o;
}
127.0.0.1:6379> getbit nokey 10
(integer) 0
2.3.3 计算在字节数据中的实际偏移位置
byte = bitoffset >> 3;
bit = 7 - (bitoffset & 0x7);
2.3.4 获取值
当offset大于最大的长度时,返回0
if (sdsEncodedObject(o)) {
if (byte < sdslen(o->ptr))
bitval = ((uint8_t*)o->ptr)[byte] & (1 << bit);
} else {
if (byte < (size_t)ll2string(llbuf,sizeof(llbuf),(long)o->ptr))
bitval = llbuf[byte] & (1 << bit);
}
else分支,当创建string对象时,如果值是一个数字,并且在[LONG_MIN, LONG_MAX]范围内,这不用创建value的sds对象了,直接将ptr指针空间存储这个数字,比内嵌字符串对象更节省空间
2.3.5 响应客户端
addReply(c, bitval ? shared.cone : shared.czero);
2.4 BITCOUNT key [start end](统计)
统计某个key中1的个数,可以设置start和end,只统计[start, end]范围内的置1的bit位
2.4.1 查询key
(1)key不存在,则直接返回0
(2)key存在,则对值进行类型检查,不是string则返回错误
void bitcountCommand(client *c) {
...
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,o,OBJ_STRING)) return;
...
}
2.4.2 获取key对应的值
void bitcountCommand(client *c) {
...
p = getObjectReadOnlyString(o,&strlen,llbuf);
...
}
如果是整数编码,则将整数转换位字符串返回,否则直接返回地址
unsigned char *getObjectReadOnlyString(robj *o, long *len, char *llbuf) {
serverAssert(o->type == OBJ_STRING);
unsigned char *p = NULL;
if (o && o->encoding == OBJ_ENCODING_INT) {
p = (unsigned char*) llbuf;
if (len) *len = ll2string(llbuf,LONG_STR_SIZE,(long)o->ptr);
} else if (o) {
p = (unsigned char*) o->ptr;
if (len) *len = sdslen(o->ptr);
} else {
if (len) *len = 0;
}
return p;
}
2.4.3 解析参数start,end
(1)如果没有指定start,end,则start=0,end=strlen-1
(2)如果指定参数,从参数中提取start,end,并对其进行处理
(2.1)如果start,end都为负数,并且start > end,则返回0
(2.1)如果start,end为负数,则加上strlen
(2.2)如果start,end还为负数,则都等于0
(2.3)end>=start, end = strlen-1
(3)其他,
if (c->argc == 4) {
if (getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK)
return;
if (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK)
return;
if (start < 0 && end < 0 && start > end) {
addReply(c,shared.czero);
return;
}
if (start < 0) start = strlen+start;
if (end < 0) end = strlen+end;
if (start < 0) start = 0;
if (end < 0) end = 0;
if (end >= strlen) end = strlen-1;
} else if (c->argc == 2) {
start = 0;
end = strlen-1;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
2.4.4 判断start,end
如果start > end,则直接返回0
...
if (start > end) {
addReply(c,shared.czero);
}
...
2.4.5 计算范围中的置1的bit位的个数,并返回
从代码可以看出,这个偏移是字节偏移,而非bit偏移
...
long bytes = end-start+1;
addReplyLongLong(c,redisPopcount(p+start,bytes));
long long redisPopcount(void *s, long count) {
long long bits = 0;
unsigned char *p = s;
uint32_t *p4;
static const unsigned char bitsinbyte[256] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8};
while((unsigned long)p & 3 && count) {
bits += bitsinbyte[*p++];
count--;
}
p4 = (uint32_t*)p;
while(count>=28) {
uint32_t aux1, aux2, aux3, aux4, aux5, aux6, aux7;
aux1 = *p4++;
aux2 = *p4++;
aux3 = *p4++;
aux4 = *p4++;
aux5 = *p4++;
aux6 = *p4++;
aux7 = *p4++;
count -= 28;
aux1 = aux1 - ((aux1 >> 1) & 0x55555555);
aux1 = (aux1 & 0x33333333) + ((aux1 >> 2) & 0x33333333);
aux2 = aux2 - ((aux2 >> 1) & 0x55555555);
aux2 = (aux2 & 0x33333333) + ((aux2 >> 2) & 0x33333333);
aux3 = aux3 - ((aux3 >> 1) & 0x55555555);
aux3 = (aux3 & 0x33333333) + ((aux3 >> 2) & 0x33333333);
aux4 = aux4 - ((aux4 >> 1) & 0x55555555);
aux4 = (aux4 & 0x33333333) + ((aux4 >> 2) & 0x33333333);
aux5 = aux5 - ((aux5 >> 1) & 0x55555555);
aux5 = (aux5 & 0x33333333) + ((aux5 >> 2) & 0x33333333);
aux6 = aux6 - ((aux6 >> 1) & 0x55555555);
aux6 = (aux6 & 0x33333333) + ((aux6 >> 2) & 0x33333333);
aux7 = aux7 - ((aux7 >> 1) & 0x55555555);
aux7 = (aux7 & 0x33333333) + ((aux7 >> 2) & 0x33333333);
bits += ((((aux1 + (aux1 >> 4)) & 0x0F0F0F0F) +
((aux2 + (aux2 >> 4)) & 0x0F0F0F0F) +
((aux3 + (aux3 >> 4)) & 0x0F0F0F0F) +
((aux4 + (aux4 >> 4)) & 0x0F0F0F0F) +
((aux5 + (aux5 >> 4)) & 0x0F0F0F0F) +
((aux6 + (aux6 >> 4)) & 0x0F0F0F0F) +
((aux7 + (aux7 >> 4)) & 0x0F0F0F0F))* 0x01010101) >> 24;
}
p = (unsigned char*)p4;
while(count--) bits += bitsinbyte[*p++];
return bits;
}
|