一、简介
rdb是redis持久化的一种方式,存储的是redis服务器在某个时刻的所有内存数据。在redis启动时可以直接解析加载到内存以恢复数据。 触发时机:
- save命令
- bgsave命令
- serverCron定时任务,满足持久化条件时
- 主从全量复制落盘方式
- aof重写(aof和rdb混合使用方式)
以下以redis6.2.6代码为例 整个rdb文件分为三部分,头部,数据部分,结尾。 本例操作:
127.0.0.1:6379> hset student name zhangsan age 18 class 1-1
(integer) 3
127.0.0.1:6379> lpush mylist c c++ python go java
(integer) 5
127.0.0.1:6379> set name lisi ex 30
OK
127.0.0.1:6379> save
OK
127.0.0.1:6379>
二、头
主要是一些版本信息,魔术信息。
2.1 魔术值
#define RDB_VERSION 9
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
2.2 版本信息
#define REDIS_VERSION "6.2.6"
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
而对于数据存储格式为如下图,len:value, 而len又经过了变长的编码处理,进一步节省空间。
头部数据都是一些附属信息,所以类型为RDB_OPCODE_AUX
#define RDB_OPCODE_AUX 250
ssize_t rdbSaveAuxField(rio *rdb, void *key, size_t keylen, void *val, size_t vallen) {
ssize_t ret, len = 0;
if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1;
len += ret;
if ((ret = rdbSaveRawString(rdb,key,keylen)) == -1) return -1;
len += ret;
if ((ret = rdbSaveRawString(rdb,val,vallen)) == -1) return -1;
len += ret;
return len;
}
2.3 操作系统位数
表示操作系统的位数,32位还是64位的操作系统。
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
...
if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
对于整数值,先将整数值,转换位字符串形式,比如一个字节表示的64 转换为两个字节的字符串"64",这样就都是统一的调用rdbSaveAuxField函数进行处理。
#define LONG_STR_SIZE 21
ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
char buf[LONG_STR_SIZE];
int vlen = ll2string(buf,sizeof(buf),val);
return rdbSaveAuxField(rdb,key,strlen(key),buf,vlen);
}
而为了减少存储的数据空间,当value是数值型的字符串时,将尝试将其进行整数编码,如果编码成功,则直接写入数字,从而节省存储空间。当字符串长度小于11字节时,尝试进行整数编码。 为啥是11字节呢? 因为最长整数编码是使用4字节,4字节表示范围[-2147483648,2147483647],转换为字符串加上负号就刚好11字节。
ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
int enclen;
ssize_t n, nwritten = 0;
if (len <= 11) {
unsigned char buf[5];
if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1;
return enclen;
}
}
...
}
从下图可以看出,数值编码都是11开头的,而长度编码都是10,01,00开头的。
2.4 rdb生成时间戳
if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
通过上面的验证,数值经过整数编码后确实是小端序。
2.5 使用的总内存
if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
size_t zmalloc_used_memory(void) {
size_t um;
atomicGet(used_memory,um);
return um;
}
其中内存的单位是字节
2.6 可选部分
当前redis就是一个单独的master,没有主从,所以这些相关信息也没有。
if (rsi) {
if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
== -1) return -1;
if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)
== -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
== -1) return -1;
}
2.7 aof混用标志
结合rdb和aof的优点,可以将他们合并到一个文件里,可以通过配置aof-use-rdb-preamble yes 开启(此版本默认开启),则持久化文件中前半部分为rdb内容,后半部分为aof内存。 此标志还需要结合具体操作进行区分,因为我这里执行的是save生成的rdb文件,所以整个文件中只有rdb内容,没有混合aof,所以这里aof-preamble为0。
int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0;
...
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
2.8 module信息
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
遍历整个modules,从中查找需要存储的对象
ssize_t rdbSaveModulesAux(rio *rdb, int when) {
size_t total_written = 0;
dictIterator *di = dictGetIterator(modules);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
struct RedisModule *module = dictGetVal(de);
listIter li;
listNode *ln;
listRewind(module->types,&li);
while((ln = listNext(&li))) {
moduleType *mt = ln->value;
if (!mt->aux_save || !(mt->aux_save_triggers & when))
continue;
ssize_t ret = rdbSaveSingleModuleAux(rdb, when, mt);
if (ret==-1) {
dictReleaseIterator(di);
return -1;
}
total_written += ret;
}
}
dictReleaseIterator(di);
return total_written;
}
调用module中具体的自己的save函数进行保存。
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
RedisModuleIO io;
int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX);
if (retval == -1) return -1;
moduleInitIOContext(io,mt,rdb,NULL);
io.bytes += retval;
retval = rdbSaveLen(rdb,mt->id);
if (retval == -1) return -1;
io.bytes += retval;
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_UINT);
if (retval == -1) return -1;
io.bytes += retval;
retval = rdbSaveLen(rdb,when);
if (retval == -1) return -1;
io.bytes += retval;
mt->aux_save(&io,when);
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
if (retval == -1)
io.error = 1;
else
io.bytes += retval;
if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);
}
if (io.error)
return -1;
return io.bytes;
}
三、身体
逐一遍历所有的数据库,获取所有的数据,然后根据不同类型的值进行编码,写入文件中。
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
...
}
3.1 选择数据库
先选择数据库,后续的数据则能知道具体是写入哪个数据库的。
#define RDB_OPCODE_SELECTDB 254
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(rdb,j) == -1) goto werr;
3.2 字典中key的总个数
因为有了总个数,加载时,就能提前为hash分配好适合的大小,而不用频繁的rehash。 其中存储了正常字典的key个数,以及过期字典的key个数。
#define RDB_OPCODE_RESIZEDB 251
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
3.3 迭代字典中的每一项
di = dictGetSafeIterator(d);
...
while((de = dictNext(di)) != NULL) {
...
}
dictReleaseIterator(di);
di = NULL;
3.3.1 获取key,value,expire
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
3.3.2 保存键值对
将获取到的key-value和过期时间写入文件。
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
3.3.2.1 保存过期时间
如果当前key没有设置过期时间,则跳过。
#define RDB_OPCODE_EXPIRETIME_MS 252
if (expiretime != -1) {
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}
存储的是小端序的过期时间点的时间戳。
int rdbSaveMillisecondTime(rio *rdb, long long t) {
int64_t t64 = (int64_t) t;
memrev64ifbe(&t64);
return rdbWriteRaw(rdb,&t64,8);
}
3.3.2.2 保存lru信息
如果配置了淘汰策略使用lru策略,则将lru时间戳保存,这个时间戳是uint64_t类型,直接通过整数编码方式进行存储,这样节省空间。
#define RDB_OPCODE_IDLE 248
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
...
if (savelru) {
uint64_t idletime = estimateObjectIdleTime(val);
idletime /= 1000;
if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
if (rdbSaveLen(rdb,idletime) == -1) return -1;
}
3.3.2.3 保存lfu信息
对于配置了lfu的淘汰策略,只是保存了一个字节的计数。
#define RDB_OPCODE_FREQ 249
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
...
if (savelfu) {
uint8_t buf[1];
buf[0] = LFUDecrAndReturn(val);
if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
}
3.3.2.4 保存键值对
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key) == -1) return -1;
根据值对象的不同类型以及不同的编码方式,设置对应的类型
int rdbSaveObjectType(rio *rdb, robj *o) {
switch (o->type) {
case OBJ_STRING:
return rdbSaveType(rdb,RDB_TYPE_STRING);
case OBJ_LIST:
if (o->encoding == OBJ_ENCODING_QUICKLIST)
return rdbSaveType(rdb,RDB_TYPE_LIST_QUICKLIST);
else
serverPanic("Unknown list encoding");
case OBJ_SET:
if (o->encoding == OBJ_ENCODING_INTSET)
return rdbSaveType(rdb,RDB_TYPE_SET_INTSET);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_SET);
else
serverPanic("Unknown set encoding");
case OBJ_ZSET:
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return rdbSaveType(rdb,RDB_TYPE_ZSET_ZIPLIST);
else if (o->encoding == OBJ_ENCODING_SKIPLIST)
return rdbSaveType(rdb,RDB_TYPE_ZSET_2);
else
serverPanic("Unknown sorted set encoding");
case OBJ_HASH:
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return rdbSaveType(rdb,RDB_TYPE_HASH_ZIPLIST);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_HASH);
else
serverPanic("Unknown hash encoding");
case OBJ_STREAM:
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
default:
serverPanic("Unknown object type");
}
return -1;
}
3.3.2.4.1 String类型的值
#define RDB_TYPE_STRING 0
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
ssize_t n = 0, nwritten = 0;
if (o->type == OBJ_STRING) {
if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
nwritten += n;
}
...
}
3.3.2.4.2 List类型的值
#define RDB_TYPE_LIST_QUICKLIST 14
可以看出list类型的编码方式只能是quicklist方式编码,而quicklist是一个以ziplist为节点的双向链表。 其中ziplist节点可以压缩或者不压缩。 其中ziplsit可参考redis之ziplist
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
quicklistNode *node = ql->head;
if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;
nwritten += n;
while(node) {
if (quicklistNodeIsCompressed(node)) {
void *data;
size_t compress_len = quicklistGetLzf(node, &data);
if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;
nwritten += n;
} else {
if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;
nwritten += n;
}
node = node->next;
}
} else {
serverPanic("Unknown list encoding");
}
- 压缩节点
对于压缩节点,存储了一个字节的标识,然后存储了压缩后的长度以及压缩前的长度,最后是压缩后的数据。
ssize_t rdbSaveLzfBlob(rio *rdb, void *data, size_t compress_len,
size_t original_len) {
unsigned char byte;
ssize_t n, nwritten = 0;
byte = (RDB_ENCVAL<<6)|RDB_ENC_LZF;
if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr;
nwritten += n;
if ((n = rdbSaveLen(rdb,compress_len)) == -1) goto writeerr;
nwritten += n;
if ((n = rdbSaveLen(rdb,original_len)) == -1) goto writeerr;
nwritten += n;
if ((n = rdbWriteRaw(rdb,data,compress_len)) == -1) goto writeerr;
nwritten += n;
return nwritten;
writeerr:
return -1;
}
- 非压缩节点
ziplist本身就是一个连续的空间,所以当作一个字节字符串来存储。
3.3.2.4.3 Set类型的值
对于set类型的数据,其中编码方式有两种,hash和intset。 其中hash可参考redis之hash intset可参考redis之intset
因为set类型,如果使用hash编码,只保存了key,没有保存value。
else if (o->type == OBJ_SET) {
if (o->encoding == OBJ_ENCODING_HT) {
dict *set = o->ptr;
dictIterator *di = dictGetIterator(set);
dictEntry *de;
if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) {
dictReleaseIterator(di);
return -1;
}
nwritten += n;
while((de = dictNext(di)) != NULL) {
sds ele = dictGetKey(de);
if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
== -1)
{
dictReleaseIterator(di);
return -1;
}
nwritten += n;
}
dictReleaseIterator(di);
} else if (o->encoding == OBJ_ENCODING_INTSET) {
size_t l = intsetBlobLen((intset*)o->ptr);
if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
nwritten += n;
} else {
serverPanic("Unknown set encoding");
}
}
而对于intset对象,本身就是一个连续的整数数组,所以元素非常少时,才能进行整数编码存储,否则就是直接的intset对象数据。
3.3.2.4.4 Zset类型的值
而对于有序集合,编码方式有ziplist和skiplist。对于压缩列表上面已经涉及到了,这里就不再说,主要看skiplist如何存储的。skiplist中保存了key,以及key的一个double的score值。
else if (o->type == OBJ_ZSET) {
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
size_t l = ziplistBlobLen((unsigned char*)o->ptr);
if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
nwritten += n;
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
zskiplist *zsl = zs->zsl;
if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;
nwritten += n;
zskiplistNode *zn = zsl->tail;
while (zn != NULL) {
if ((n = rdbSaveRawString(rdb,
(unsigned char*)zn->ele,sdslen(zn->ele))) == -1)
{
return -1;
}
nwritten += n;
if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1)
return -1;
nwritten += n;
zn = zn->backward;
}
} else {
serverPanic("Unknown sorted set encoding");
}
}
score存储为小端序,8字节或者更大。
int rdbSaveBinaryDoubleValue(rio *rdb, double val) {
memrev64ifbe(&val);
return rdbWriteRaw(rdb,&val,sizeof(val));
}
3.3.2.4.5 Hash类型的值
hash类型有两种存储方式,一种是ziplist, 另一种为真正的hash结构。而ziplist结构上面已经涉及到,这里就不再赘述。
#define RDB_TYPE_HASH 4
...
#define RDB_TYPE_HASH_ZIPLIST 13
case OBJ_HASH:
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return rdbSaveType(rdb,RDB_TYPE_HASH_ZIPLIST);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_HASH);
else
serverPanic("Unknown hash encoding");
else if (o->type == OBJ_HASH) {
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
size_t l = ziplistBlobLen((unsigned char*)o->ptr);
if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
nwritten += n;
}
else if (o->type == OBJ_HASH) {
...
else if (o->encoding == OBJ_ENCODING_HT) {
dictIterator *di = dictGetIterator(o->ptr);
dictEntry *de;
if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) {
dictReleaseIterator(di);
return -1;
}
nwritten += n;
while((de = dictNext(di)) != NULL) {
sds field = dictGetKey(de);
sds value = dictGetVal(de);
if ((n = rdbSaveRawString(rdb,(unsigned char*)field,
sdslen(field))) == -1)
{
dictReleaseIterator(di);
return -1;
}
nwritten += n;
if ((n = rdbSaveRawString(rdb,(unsigned char*)value,
sdslen(value))) == -1)
{
dictReleaseIterator(di);
return -1;
}
nwritten += n;
}
dictReleaseIterator(di);
} else {
serverPanic("Unknown hash encoding");
}
key和value都有可能是数字类型,能够编码成整数,所以存储后的数据各种类型的都有。
3.3.2.4.6 Stream类型的值
stream类型使用的rax树进行组织存储的,具体的rax结构可参考redis之radix tree 而实际数据是通过listpack进行存储的,listpack可参考redis之listpack
else if (o->type == OBJ_STREAM) {
stream *s = o->ptr;
rax *rax = s->rax;
if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
nwritten += n;
raxIterator ri;
raxStart(&ri,rax);
raxSeek(&ri,"^",NULL,0);
while (raxNext(&ri)) {
unsigned char *lp = ri.data;
size_t lp_bytes = lpBytes(lp);
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
}
raxStop(&ri);
if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1;
nwritten += n;
if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1;
nwritten += n;
if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
nwritten += n;
size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0;
if ((n = rdbSaveLen(rdb,num_cgroups)) == -1) return -1;
nwritten += n;
if (num_cgroups) {
raxStart(&ri,s->cgroups);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamCG *cg = ri.data;
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
}
raxStop(&ri);
}
}
3.3.2.4.6 Module类型的值
else if (o->type == OBJ_MODULE) {
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
int retval = rdbSaveLen(rdb,mt->id);
if (retval == -1) return -1;
moduleInitIOContext(io,mt,rdb,key);
io.bytes += retval;
mt->rdb_save(&io,mv->value);
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
if (retval == -1)
io.error = 1;
else
io.bytes += retval;
if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);
}
return io.error ? -1 : (ssize_t)io.bytes;
}
对于module,需要moule内部自己的结构自己进行序列化到文件。
四、尾巴
8字节的校验和。在每次写入文件时,都在同步计算校验和。
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
在write前,会判断是否设置了update_cksum回调函数,设置了,则进行计算校验和。
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
while (len) {
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
if (r->write(r,buf,bytes_to_write) == 0) {
r->flags |= RIO_FLAG_WRITE_ERROR;
return 0;
}
buf = (char*)buf + bytes_to_write;
len -= bytes_to_write;
r->processed_bytes += bytes_to_write;
}
return 1;
}
rio句柄初始化时是不计算校验和的
static const rio rioFileIO = {
rioFileRead,
rioFileWrite,
rioFileTell,
rioFileFlush,
NULL,
0,
0,
0,
0,
{ { NULL, 0 } }
};
void rioInitWithFile(rio *r, FILE *fp) {
*r = rioFileIO;
r->io.file.fp = fp;
r->io.file.buffered = 0;
r->io.file.autosync = 0;
}
只有在redis.conf中rdbchecksum yes 配置开启时,才会设置相应的回调函数进行计算校验和。
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
r->cksum = crc64(r->cksum,buf,len);
}
而redis.conf中默认配置是开的。从注释中可知,redis5就有了crc64校验了,但是在生成或者加载rdb文件时会损失10%性能,如果追求最大性能可以禁用计算校验和。
rdbchecksum yes
如果没有配置,则默认是开启的。
standardConfig configs[] = {
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
...
}
五、压缩
对于很大的数据,可以通过压缩的方式减少数据的大小,redis可以通过配置rdbcompression yes 使用lzf压缩算法进行压缩数据。并且默认是开启的。
配置文件中默认是开启的
rdbcompression yes
如果配置文件中未配置,代码中也是默认开启的。
standardConfig configs[] = {
...
createBoolConfig("rdbcompression", NULL, MODIFIABLE_CONFIG, server.rdb_compression, 1, NULL, NULL),
...
};
5.1 为什么我的rdb磁盘数据没有被压缩呢?
使用压缩算法进行压缩数据时,如果数据长度太短,压缩后的数据可能会比原始数据还长,这样达不到压缩的效果,还白白浪费了CPU的计算能力,所以需要有一个临界点,只有长度大于临界点时才进行压缩。而这个临界点长度就是20字节。
if (server.rdb_compression && len > 20) {
n = rdbSaveLzfStringObject(rdb,s,len);
if (n == -1) return -1;
if (n > 0) return n;
}
|