redis2.6开始在计算内存使用总量时,排除了如下
- slave的输出缓冲区
- aof缓冲区
- aof重写缓冲区
int freeMemoryIfNeeded(void) {
size_t mem_used, mem_tofree, mem_freed;
int slaves = listLength(server.slaves);
mem_used = zmalloc_used_memory();
if (slaves) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = listNodeValue(ln);
unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
if (obuf_bytes > mem_used)
mem_used = 0;
else
mem_used -= obuf_bytes;
}
}
if (server.aof_state != REDIS_AOF_OFF) {
mem_used -= sdslen(server.aof_buf);
mem_used -= aofRewriteBufferSize();
}
if (mem_used <= server.maxmemory) return REDIS_OK;
if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
return REDIS_ERR;
mem_tofree = mem_used - server.maxmemory;
mem_freed = 0;
while (mem_freed < mem_tofree) {
int j, k, keys_freed = 0;
for (j = 0; j < server.dbnum; j++) {
...
}
if (!keys_freed) return REDIS_ERR;
}
return REDIS_OK;
}
redis3.0.0开始对于lru的算法有所调整 16个元素的淘汰池,每次随机筛选sample个元素到淘汰池中,然后从淘汰池中淘汰最近最近未访问的key
1. 结构增加evictionPoolEntry
#define REDIS_EVICTION_POOL_SIZE 16
struct evictionPoolEntry {
unsigned long long idle;
sds key;
};
typedef struct redisDb {
dict *dict;
dict *expires;
dict *blocking_keys;
dict *ready_keys;
dict *watched_keys;
struct evictionPoolEntry *eviction_pool;
int id;
long long avg_ttl;
} redisDb;
2. 初始化时建立淘汰池
void initServer(void) {
...
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
...
server.db[j].eviction_pool = evictionPoolAlloc();
...
}
...
}
struct evictionPoolEntry *evictionPoolAlloc(void) {
struct evictionPoolEntry *ep;
int j;
ep = zmalloc(sizeof(*ep)*REDIS_EVICTION_POOL_SIZE);
for (j = 0; j < REDIS_EVICTION_POOL_SIZE; j++) {
ep[j].idle = 0;
ep[j].key = NULL;
}
return ep;
}
3. 更新淘汰池
3.1 筛选samples个key
和随机有一点不同,找到一个有数据的槽时,取当前槽下的所有节点,并且移动到下一个节点继续,当连续空的槽个数大于等于5个并且大于需要筛选的个数时,才进行下一次的随机
unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {
unsigned int j;
unsigned int tables;
unsigned int stored = 0, maxsizemask;
unsigned int maxsteps;
if (dictSize(d) < count) count = dictSize(d);
maxsteps = count*10;
for (j = 0; j < count; j++) {
if (dictIsRehashing(d))
_dictRehashStep(d);
else
break;
}
tables = dictIsRehashing(d) ? 2 : 1;
maxsizemask = d->ht[0].sizemask;
if (tables > 1 && maxsizemask < d->ht[1].sizemask)
maxsizemask = d->ht[1].sizemask;
unsigned int i = random() & maxsizemask;
unsigned int emptylen = 0;
while(stored < count && maxsteps--) {
for (j = 0; j < tables; j++) {
if (tables == 2 && j == 0 && i < (unsigned int) d->rehashidx) {
if (i >= d->ht[1].size) i = d->rehashidx;
continue;
}
if (i >= d->ht[j].size) continue;
dictEntry *he = d->ht[j].table[i];
if (he == NULL) {
emptylen++;
if (emptylen >= 5 && emptylen > count) {
i = random() & maxsizemask;
emptylen = 0;
}
} else {
emptylen = 0;
while (he) {
*des = he;
des++;
he = he->next;
stored++;
if (stored == count) return stored;
}
}
}
i = (i+1) & maxsizemask;
}
return stored;
}
比如需要筛选5个元素,并且整个数据如下结构(忽略了hash正在重建的过程)
...
unsigned int i = random() & maxsizemask;
...
比如i=1
- 判断当前槽下是否有元素
当前i=1的槽下有元素,则将当前下所有的节点(在sample个数以内)
- 取key1, key6,key3, stored=3
- 依然不够5个,继续
- 位置向下移一个位置
i = (i+1) & maxsizemask; - i = 2 无元素 , emptylen=1
- 继续
- 位置向下移一个位置
i = (i+1) & maxsizemask; - i = 3 无元素, emptylen=2
- 继续
- 位置向下移一个位置
i = (i+1) & maxsizemask; - i = 4 无元素, emptylen=3
- 继续
- 位置向下移一个位置
i = (i+1) & maxsizemask; - i = 5 有元素, emptylen=0
- 取 key8, stored=4
- 还不够5个继续
- 位置向下移一个位置
i = (i+1) & maxsizemask; - i = 6 无元素, emptylen=1
- 继续
- 位置向下移一个位置
i = (i+1) & maxsizemask; - i = 7 无元素, emptylen=2
- 继续
- 位置向下移一个位置
i = (i+1) & maxsizemask; - i = 8 无元素, emptylen=3
- 继续
- 位置向下移一个位置
- 位置向下移一个位置
i = (i+1) & maxsizemask; - i = 10 无元素, emptylen=5
- 重写随机位置, i = 4, emptylen=0
i = random() & maxsizemask; - 位置向下移一个位置
i = (i+1) & maxsizemask; - i = 5, 有元素
- 取key8, stored=5, 取够了,退出
最终取到的元素有key1,key6,key3,key8,key8 此函数:
不保证不重复 不保证个数一定是sample个,while循环还有maxsteps控制(count*10) - 比dictGetRandomKey函数更快
3.2 将筛选的key加入淘汰池中
淘汰池中按照lru值的大小升序排序,每次淘汰最后一个元素则可。
#define EVICTION_SAMPLES_ARRAY_SIZE 16
void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
int j, k, count;
dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE];
dictEntry **samples;
if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) {
samples = _samples;
} else {
samples = zmalloc(sizeof(samples[0])*server.maxmemory_samples);
}
count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
for (j = 0; j < count; j++) {
unsigned long long idle;
sds key;
robj *o;
dictEntry *de;
de = samples[j];
key = dictGetKey(de);
if (sampledict != keydict) de = dictFind(keydict, key);
o = dictGetVal(de);
idle = estimateObjectIdleTime(o);
k = 0;
while (k < REDIS_EVICTION_POOL_SIZE &&
pool[k].key &&
pool[k].idle < idle) k++;
if (k == 0 && pool[REDIS_EVICTION_POOL_SIZE-1].key != NULL) {
continue;
} else if (k < REDIS_EVICTION_POOL_SIZE && pool[k].key == NULL) {
} else {
if (pool[REDIS_EVICTION_POOL_SIZE-1].key == NULL) {
memmove(pool+k+1,pool+k,
sizeof(pool[0])*(REDIS_EVICTION_POOL_SIZE-k-1));
} else {
k--;
sdsfree(pool[0].key);
memmove(pool,pool+1,sizeof(pool[0])*k);
}
}
pool[k].key = sdsdup(key);
pool[k].idle = idle;
}
if (samples != _samples) zfree(samples);
}
筛选出来的key的lru值和当前淘汰池中的lru值有三种关系
-
比最小值还小 不加入淘汰池, 比如key1的lru值为50,比最小的100还小 -
最大的值,并且淘汰池未满(即插入的是一个空位置) 直接插入,比如key6计算的lru为900 -
插入在已有数据的中间位置,并且淘汰池未满 将插入位置后的所有数据往后移动,以腾出插入位置, 比如key3的lru为250 -
插入在已有数据的中间位置,并且淘汰池已经满了 将淘汰池中的第一个元素删除,将插入位置以前的数据向前移动,以腾出插入位置 比如key8的lru值为450,并且此时的16个位置都满了
3.3 从淘汰池中获取一个淘汰元素
从后向前遍历,后面的lru值越大,未访问的时间就越长
for (k = REDIS_EVICTION_POOL_SIZE-1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
de = dictFind(dict,pool[k].key);
sdsfree(pool[k].key);
memmove(pool+k,pool+k+1,
sizeof(pool[0])*(REDIS_EVICTION_POOL_SIZE-k-1));
pool[REDIS_EVICTION_POOL_SIZE-1].key = NULL;
pool[REDIS_EVICTION_POOL_SIZE-1].idle = 0;
if (de) {
bestkey = dictGetKey(de);
break;
} else {
continue;
}
}
在从淘汰池中取一个元素后,将会判断此key是否存在,因为整个淘汰池在整个redis运行过程中都存在,可能其中某些值已经被删除了
3.4 删除淘汰元素
if (bestkey) {
long long delta;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
propagateExpire(db,keyobj);
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
dbDelete(db,keyobj);
latencyEndMonitor(eviction_latency);
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
latencyRemoveNestedEvent(latency,eviction_latency);
delta -= (long long) zmalloc_used_memory();
mem_freed += delta;
server.stat_evictedkeys++;
notifyKeyspaceEvent(REDIS_NOTIFY_EVICTED, "evicted",
keyobj, db->id);
decrRefCount(keyobj);
keys_freed++;
if (slaves) flushSlavesOutputBuffers();
}
在删除操作中也比原先多了很多工作
- 计算了删除过程中执行的时间
- 增加执行时间监控,如果配置了阈值,并且超过了,则会发送一个事件
- 调用 notifyKeyspaceEvent
- 给slaves发送数据(如果有的话)
|