Redis原理
Redis源码可以去官网下载,也可以从我下面提供的这个链接进行下载:
redis-6.2.4.tar.gz
数据结构
动态字符串SDS
redis中保存的Key是字符串,value大多也是字符串或字符串集合,因此字符串是Redis中最常使用的一种数据结构。
不过Redis没有直接使用C语言中的字符串,因为C语言字符串存在很多问题:
- 获取字符串长度需要的复杂度为O(N)
- 非二进制安全,C语言使用空字符’\0’作为字符串结尾的标记,如果保存的字符串本身含义该标记,那么会造成读取被截断,获取的数据不完整
- 不可修改
- 容易造成缓冲区溢出,例如字符串拼接时,超过原本的空间大小,可能会覆盖掉相邻变量的内存空间
而SDS就是对c字符串的封装,以此来解决上述的问题。
SDS结构
SDS是C语言实现的一个结构体:
一个简单的例子如下:
动态扩容
在c语言中,如果要对字符串操作:
- 拼接–>先进行内存重分配来扩展底层数组大小,如果忘记了这一步,会导致缓冲区溢出
- 缩短–>需要通过内存重分配来释放字符串不再使用的那部分空间,如果忘记了会导致内存泄露
因为内存重分配需要执行系统调用,并且系统实现内存重分配算法也非常复杂,所以这通过是一个比较耗时的操作
-
因此通过内存预分配可以减少内存重分配的次数,进而提高整体执行效率 -
并且SDS还提供了惰性空间释放的功能,即对字符串缩短操作而言,不会立刻使用内存重分配算法来回收多出来的字节,而是通过一个free属性进行记录,当后面需要进行字符串增长时,就会用到
小结
SDS优点如下:
- O(1)复杂度获取字符串长度
- 杜绝缓冲区溢出
- 减少修改字符串长度时所需的内存重分配次数
- 二进制安全
- 兼容部分C字符串函数(因此SDS遵循了以’\0’结尾的惯例)
整数集合IntSet
IntSet是vlaue集合的底层实现之一,当一个集合只包含整数值元素,并且这个集合元素数量不多的情况下,Redis就会使用IntSet作为该value集合的底层实现。
IntSet是Redis用于保存整数值集合抽象数据结构,它可以保存类型为int16_t,int32_t,int64_t的整数值,并且保证集合中不会出现重复元素。
IntSet结构如下:
typedef struct intset {
uint32_t encoding;
uint32_t length;
int8_t contents[];
} intset;
contents是整数数组底层实现,用来存储元素,并且各个项在数组中的值按从小到大有序排列,并且数组中不包含重复元素。
其中的encoding包含三种模式,表示存储的整数大小不同:
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))
为了方便查找,Redis会将intset中所有的整数按照升序依次保存在contents数组中,结构如图:
现在,数组中每个数字都在int16_t的范围内,因此采用的编码方式是INSET_ENC_INT16,每部分占用的字节大小为:
- encoding: 4字节
- length: 4字节
- contents: 2字节*3=6字节
上图中给出的公式是计算每个数组元素起始地址,从这里也能看出为什么很多语言中,数组元素下标都从0开始
因为,如果从1开始,那么公式就变成了: startPtr+(sizeof(int16)*(index-1))
还要额外计算一次减法操作,这会浪费额外的cpu资源
- startPtr: 数组首元素起始地址
- sizeof(int16): 数组中每个元素的大小,数组中每个元素大小一致,便于按照下标寻址
- sizeof(int16)*(index): index下标元素举例起始地址多远,即index元素的起始地址
IntSet升級
- 升级编码为INTSET_ENC_INT32,每个整数占4字节,并按照新的编码方式及元素个数扩容数组
- 倒序依次将数组中的元素拷贝到扩容后的正确位置
正序挨个拷贝,会导致前面的元素扩容后覆盖后面的元素,而倒序可以避免这种情况。
c语言写数组插入元素的算法时,也是将元素挨个后移,然后腾出位置,插入新元素。
- 最后,将intset的encoding属性改为INTSET_ENC_INT32,将length属性改为4
升级源码分析
intset *intsetAdd(
intset *is,
int64_t value,
uint8_t *success) {
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
if (success) *success = 1;
if (valenc > intrev32ifbe(is->encoding)) {
return intsetUpgradeAndAdd(is,value);
} else {
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
return is;
}
is = intsetResize(is,intrev32ifbe(is->length)+1);
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
}
_intsetSet(is,pos,value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}
- intsetUpgradeAndAdd–升级数组编码
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
uint8_t curenc = intrev32ifbe(is->encoding);
uint8_t newenc = _intsetValueEncoding(value);
int length = intrev32ifbe(is->length);
int prepend = value < 0 ? 1 : 0;
is->encoding = intrev32ifbe(newenc);
is = intsetResize(is,intrev32ifbe(is->length)+1);
while(length--)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));
if (prepend)
_intsetSet(is,0,value);
else
_intsetSet(is,intrev32ifbe(is->length),value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
int64_t cur = -1;
if (intrev32ifbe(is->length) == 0) {
if (pos) *pos = 0;
return 0;
} else {
if (value > _intsetGet(is,max)) {
if (pos) *pos = intrev32ifbe(is->length);
return 0;
} else if (value < _intsetGet(is,0)) {
if (pos) *pos = 0;
return 0;
}
}
while(max >= min) {
mid = ((unsigned int)min + (unsigned int)max) >> 1;
cur = _intsetGet(is,mid);
if (value > cur) {
min = mid+1;
} else if (value < cur) {
max = mid-1;
} else {
break;
}
}
if (value == cur) {
if (pos) *pos = mid;
return 1;
} else {
if (pos) *pos = min;
return 0;
}
}
整数集合升级策略有两个好处:
降级
整数集合不支持降级操作,一旦对数组进行了升级,编码就会一直保持升级后的状态。
内存都是连续存放的,就算进行了降级,也会产生很多内存碎片,如果还要花时间去整理这些碎片更浪费时间。
当然,有小伙伴会说,可以参考SDS的做法,使用free属性来标记空闲空间大小—>当然应该存在更好的做法,大家可以尝试去思考更好的解法
小结
intset具备以下特点:
- Redis会确保intset中的元素唯一,有序
- 具备类型升级机制,可以节约内存空间
- 底层采用二分查找方式来查询
字典(DICT)
Redis是一个键值型(Key-Value Pair)的数据库,我们可以根据键实现快速的增删改查,而键与值的映射关系正是通过Dict实现的。
Dict由三部分组成,分别是: 哈希表(DictHashTable),哈希节点(DictEntry).字典(Dict)
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;
当出现hash碰撞的时候,会采用链表形式将碰撞的元素连接起来,然后链表的新元素采用头插法
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
long rehashidx;
int16_t pauserehash;
} dict;
扩容
static int _dictExpandIfNeeded(dict *d)
{
if (dictIsRehashing(d)) return DICT_OK;
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio) &&
dictTypeExpandAllowed(d))
{
return dictExpand(d, d->ht[0].used + 1);
}
return DICT_OK;
}
收缩
Dict除了扩容以外,每次删除元素时,也会对负载因子做检查,当LoadFactory<0.1时,会做哈希表收缩:
int hashTypeDelete(robj *o, sds field) {
int deleted = 0;
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl, *fptr;
zl = o->ptr;
fptr = ziplistIndex(zl, ZIPLIST_HEAD);
if (fptr != NULL) {
fptr = ziplistFind(zl, fptr, (unsigned char*)field, sdslen(field), 1);
if (fptr != NULL) {
zl = ziplistDelete(zl,&fptr);
zl = ziplistDelete(zl,&fptr);
o->ptr = zl;
deleted = 1;
}
}
}
else if (o->encoding == OBJ_ENCODING_HT) {
if (dictDelete((dict*)o->ptr, field) == C_OK) {
deleted = 1;
if (htNeedsResize(o->ptr)) dictResize(o->ptr);
}
} else {
serverPanic("Unknown hash encoding");
}
return deleted;
}
- htNeedsResize–判断是否需要重置Dict大小
htNeedsResize(dict *dict) {
long long size, used;
size = dictSlots(dict);
used = dictSize(dict);
return (size > DICT_HT_INITIAL_SIZE &&
(used*100/size < HASHTABLE_MIN_FILL));
}
int dictResize(dict *d)
{
unsigned long minimal;
if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
minimal = d->ht[0].used;
if (minimal < DICT_HT_INITIAL_SIZE)
minimal = DICT_HT_INITIAL_SIZE;
return dictExpand(d, minimal);
}
rehash源码分析
- _dictExpand函数是真正完成扩容的方法,下面来看看这个方法干了啥
int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
{
if (malloc_failed) *malloc_failed = 0;
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
dictht n;
unsigned long realsize = _dictNextPower(size);
if (realsize == d->ht[0].size) return DICT_ERR;
n.size = realsize;
n.sizemask = realsize-1;
if (malloc_failed) {
n.table = ztrycalloc(realsize*sizeof(dictEntry*));
*malloc_failed = n.table == NULL;
if (*malloc_failed)
return DICT_ERR;
} else
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
d->ht[1] = n;
d->rehashidx = 0;
return DICT_OK;
}
rehash流程分析
渐进式rehash
上面列出的rehash看上去很好,但是redis没有这样做,因为如果需要迁移元素很多,由于redis单线程的特性,会导致主线程被阻塞,因此redis采用的是渐进式hash,即慢慢的,慢慢的迁移元素。
小结
ZipList(压缩列表)
ZipList是一种特殊的"双端链表",由一系列特殊编码的连续内存块组成。可以在任意一端进行压入/弹出操作,并且该操作的时间复杂度为0(1)。
压缩列表可以包含任意多个节点,每个节点可以保存一个字节数组或者一个整数值。
压缩列表是列表键和哈希键的底层实现之一。当一个列表键只包含少量列表项,并且每个列表项要么就是小整数值,要么就是长度比较短的字符串,那么Redis底层就会使用ziplist存储存储结构。
当一个哈希键只包含少量列表项,并且每个列表项要么就是小整数值,要么就是长度比较短的字符串,那么Redis底层也会使用ziplist存储存储结构。
zipList构成
zipListEntry构成
ZipList中所有存储长度的数值采用小端字节序,即低位字节在前,高位字节在后。
例如: 数值0x1234,采用小端字节序后实际存储值为: 0x3412
encoding编码
例如: 我们要保存字符串"ab"和"bc"
存储长度的数值采用小端字节序表示
最后一种特殊情况: 1111xxxx,可以在后四位xxxx,表示0001-1101范围的大小,即1-13,但是需要减去一,实际可保存0-12的范围.
如果数值大小在1-12区间内,那么采用最后一种特殊编码方式,不需要content属性
例如: 一个ZipList中包含两个整数值: “2"和"5”
连锁更新问题
- 此时,如果我们将一个长度大于254字节的新节点设置插入进来,称为压缩列表头节点,那么旧头节点的pre_entry_len需要扩展到5字节表示新节点的大小.
- 旧节点加上4字节后变成了254,那么后面的节点需要再次扩展…直到某个节点pre_entry_len扩展到5字节后长度并没有超过254为止
ZipList这种特殊情况下产生的多次空间扩展操作称之为连续更新。
新增,删除都可能导致连锁更新的发生。
连锁更新虽然复杂度高,会大大降低性能,但是由于产生概率较低,并且及时出现了,只要被更新节点数量不多,性能上不会有太大影响。
小结
- 压缩列表可以看做一种连续内存空间的双向链表
- 列表的节点之间不是通过指针连接,而是记录上一节点和本节点的长度来寻址,内存占用低
- 如果列表数据过多,导致链表过长,可能影响查询性能
- 增或删较大数据时有可能发生连续更新问题
QuickList(快速链表)
why need ?
限制zipList大小
压缩节点
结构
typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count;
unsigned long len;
int fill : QL_FILL_BITS;
unsigned int compress : QL_COMP_BITS;
unsigned int bookmark_count: QL_BM_BITS;
quicklistBookmark bookmarks[];
} quicklist;
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl;
unsigned int sz;
unsigned int count : 16;
unsigned int encoding : 2;
unsigned int container : 2;
unsigned int recompress : 1;
unsigned int attempted_compress : 1;
unsigned int extra : 10;
} quicklistNode;
- fill为-2表示每个每个ziplist最大内存不超过8kb
- compress为1表示首尾不压缩,中间节点压缩
特点
- 是一个节点为ZipList的双端链表
- 节点采用ZipList,解决了传统链表的内存占用问题
- 控制了ZipList大小,解决了连续内存空间申请效率问题
- 中间节点可以压缩,进一步节省内存
SkipList(跳跃表)
SkipList首先是链表,但与传统链表相比有几点差异:
- 元素按照升序排列存储
- 节点可能包含多个指针,指针跨度不同
Redis使用跳跃表作为有序集合键,如果一个有序集合包含的元素数量很多,或者有序集合中元素成员是比较长的字符串,Redis就会使用跳跃表作为有序集合键的底层实现。
例如: sortedSet
Redis目前只在两处地方使用到了SkipList,分别是 :
结构
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned long span;
} level[];
} zskiplistNode;
特点
- 跳跃表是一个双向链表,每个节点都包含score和ele值
- 节点按照score排序,score值一样则按照ele字典排序
- 每个节点都可以包含多层指针,层数是1到32之间的随机数
- 不同层指针到下一个节点的跨度不同,层级越高,跨度越大
- 增删改成效率与红黑树基本一致,实现却更为简单
RedisObject
Redis中的任意数据类型的键和值都会被封装为一个RedisObject,也叫做Redis对象,源码如下:
- Redis通过引用计数实现了相关内存回收机制,并且还利用该引用计数实现了对象共享机制。
- 通过记录对象最后一次访问时间,可以在服务器启用了maxmemory功能的情况下,将那么较长时间无人访问的键优先淘汰
对象类型与编码
Redis使用对象来表示数据库中的键和值,每次当我们在Redis的数据库中新创建一个键值对时,我们至少会创建两个对象,一个用于做键值对的键,另一个对象做键值对的值。
Reids中会根据存储的数据类型不同,选择不同的编码方式,功包含11种不同的类型:
每种数据类型使用的编码方式如下:
我们可以使用TYPE命令来查看redis中某个键对应的值对象的类型,而不是键对象的类型。
String对象
String是Redis中最常见的数据存储类型:
- 其基本编码方式是RAW,基于简单动态字符串SDS实现,存储上限为512mb.
- 如果存储的SDS长度小于44字节,则会采用EMBSTR编码,此时Object head与SDS是一段连续空间。申请内存时只需要调用一次内存分配函数,效率更高。
内存释放也只需要一次调用
- 如果存储的字符串是整数值,并且大小在LONG—MAX范围内,则会采用INT编码:直接将数据保存在RedisObject的ptr指针位置(刚好8字节),不再需要SDS了
编码的转换
- 如果对保存整数值的字符串对象追加了一个字符串值,那么该字符串对象底层会从int编码转换为raw编码
- 如果对embstr编码的字符串进行修改,那么底层编码也会从embstr转换为raw
List对象
列表对象的编码可以是以下三种:
插入源码分析
void pushGenericCommand(
client *c,
int where,
int xx) {
int j;
robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
if (checkType(c,lobj,OBJ_LIST)) return;
if (!lobj) {
if (xx) {
addReply(c, shared.czero);
return;
}
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
dbAdd(c->db,c->argv[1],lobj);
}
for (j = 2; j < c->argc; j++) {
listTypePush(lobj,c->argv[j],where);
server.dirty++;
}
addReplyLongLong(c, listTypeLength(lobj));
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
robj *createQuicklistObject(void) {
quicklist *l = quicklistCreate();
robj *o = createObject(OBJ_LIST,l);
o->encoding = OBJ_ENCODING_QUICKLIST;
return o;
}
Set对象
Set是Redis中的单列集合:
- 不保住有序性
- 包装元素唯一(可以判断元素是否存在)
- 求交集,并集,差集
那什么样的数据类型适合实现set数据结构呢?
HashTable,也就是Redis中的DICT,不过DICT
robj *setTypeCreate(sds value) {
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
return createIntsetObject();
return createSetObject();
}
robj *createIntsetObject(void) {
intset *is = intsetNew();
robj *o = createObject(OBJ_SET,is);
o->encoding = OBJ_ENCODING_INTSET;
return o;
}
robj *createSetObject(void) {
dict *d = dictCreate(&setDictType,NULL);
robj *o = createObject(OBJ_SET,d);
o->encoding = OBJ_ENCODING_HT;
return o;
}
int setTypeAdd(robj *subject, sds value) {
long long llval;
if (subject->encoding == OBJ_ENCODING_HT) {
dict *ht = subject->ptr;
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
}
else if (subject->encoding == OBJ_ENCODING_INTSET) {
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
if (intsetLen(subject->ptr) > server.set_max_intset_entries)
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
setTypeConvert(subject,OBJ_ENCODING_HT);
serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
return 1;
}
} else {
serverPanic("Unknown set encoding");
}
return 0;
}
set_max_intset_entries默认为512,通过下面的命令可以进行查询
config get set_max_intset_entries
编码转换后:
ZSet对象
因此zset底层将这两个数据结构结合在了一起,具体结构如下:
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
创建ZSet对象的方法源码如下:
robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
robj *o;
zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST;
return o;
}
具体结构如图:
可以当前ZSet最大的问题在于内存的占用过大,因此为了解决这个问题,ZSet提供了两种编码方式,上面给出的是其中一种,适合在是数据量大的情况下使用,发挥出其快速查找的优势
当数据量比较小的时候,ZSet采用ziplist作为底层结构
add源码
void zaddGenericCommand(client *c, int flags) {
....
zobj = lookupKeyWrite(c->db,key);
if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
if (zobj == NULL) {
if (xx) goto reply_to_client;
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetZiplistObject();
}
dbAdd(c->db,key,zobj);
}
...
int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);
...
}
robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
robj *o;
zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST;
return o;
}
robj *createZsetZiplistObject(void) {
unsigned char *zl = ziplistNew();
robj *o = createObject(OBJ_ZSET,zl);
o->encoding = OBJ_ENCODING_ZIPLIST;
return o;
}
int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
...
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
....
return 1;
} else if (!xx) {
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
sdslen(ele) > server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*out_flags |= ZADD_OUT_ADDED;
return 1;
} else {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
....
} else {
serverPanic("Unknown sorted set encoding");
}
return 0;
}
ziplist如何做到有序存储
小结
ZSet为了兼顾内存占用和性能,使用了两种编码方式,当数据量小的时候,采用ziplist实现,此时内存占用很小,但是由于数据量也很小,因此性能影响不大。
当数据量增大时,为了性能考虑,采用了HT+SkipList编码实现,此时内存占用很大,但是性能很高。
Hash对象
当超过限制后,底层编码会变成HT
源码
void hsetCommand(client *c) {
int i, created = 0;
robj *o;
if ((c->argc % 2) == 1) {
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",c->cmd->name);
return;
}
if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
hashTypeTryConversion(o,c->argv,2,c->argc-1);
for (i = 2; i < c->argc; i += 2)
created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);
...
}
- hashTypeLookupWriteOrCreate—判断hash的key是否存在,不存在就创建一个新的,默认采用ziplist编码
robj *hashTypeLookupWriteOrCreate(client *c, robj *key) {
robj *o = lookupKeyWrite(c->db,key);
if (checkType(c,o,OBJ_HASH)) return NULL;
if (o == NULL) {
o = createHashObject();
dbAdd(c->db,key,o);
}
return o;
}
- createHashObject—创建一个默认hash对象
robj *createHashObject(void) {
unsigned char *zl = ziplistNew();
robj *o = createObject(OBJ_HASH, zl);
o->encoding = OBJ_ENCODING_ZIPLIST;
return o;
}
- hashTypeTryConversion—处理hash的编码转换
void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
int i;
if (o->encoding != OBJ_ENCODING_ZIPLIST) return;
for (i = start; i <= end; i++) {
if (sdsEncodedObject(argv[i]) &&
sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
{
hashTypeConvert(o, OBJ_ENCODING_HT);
break;
}
}
}
int hashTypeSet(robj *o, sds field, sds value, int flags) {
int update = 0;
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl, *fptr, *vptr;
zl = o->ptr;
fptr = ziplistIndex(zl, ZIPLIST_HEAD);
if (fptr != NULL) {
fptr = ziplistFind(zl, fptr, (unsigned char*)field, sdslen(field), 1);
if (fptr != NULL) {
vptr = ziplistNext(zl, fptr);
serverAssert(vptr != NULL);
update = 1;
zl = ziplistReplace(zl, vptr, (unsigned char*)value,
sdslen(value));
}
}
if (!update) {
zl = ziplistPush(zl, (unsigned char*)field, sdslen(field),
ZIPLIST_TAIL);
zl = ziplistPush(zl, (unsigned char*)value, sdslen(value),
ZIPLIST_TAIL);
}
o->ptr = zl;
if (hashTypeLength(o) > server.hash_max_ziplist_entries)
hashTypeConvert(o, OBJ_ENCODING_HT);
} else if (o->encoding == OBJ_ENCODING_HT) {
...
} else {
serverPanic("Unknown hash encoding");
}
if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
return update;
}
|