Redis 核心原理
Redis 数据结构
1. 动态字符串 SDS
我们都知道Redis中保存的key是字符串,value往往是字符串或者字符串的集合。可见字符串是Redis中最常用的一种数据结构
Redis没有直接使用C语言中的字符串,因为C语言字符串中存在很多问题
Redis 构建了一种新的字符串结构,称为 简单动态字符串,简称SDS
? ·
本质是一个结构体,源码如下:
struct __attribute__((__packed__)) sdshdr8 {
uint8_t len;
uint8_t alloc;
unsigned char flags;
char buf[];
};
内存连续
SDS之所以叫做动态字符串,是因为他具备动态扩容能力。
- 如果新字符串小于1M,则新空间为扩容后字符串长度的两倍+1
- 如果新字符串大于1M,则新空间为扩容后字符串长度+1M+1,称为内存预分配
优点:
- 获取字符串长度的时间复杂度为O(1)
- 支持动态扩容
- 减少内存分配次数
- 二进制安全
2. IntSet
IntSet 是 Redis中set 集合的一种实现方式,一句整数数组来实现,并且具备长度可变,有序等特征
typedef struct intset{
uint32_t encoding;
uint32_t length;
int8_t contents[];
} intset;
为了方便查找,Redis 会将intset 中所有的整数按照升序一次保存在contents数组中。
IntSet的升级
当插入的数字超过 encoding 的范围时,会自动升级编码方式到合适的大小
- 升级编码
- 倒序依次将数组汇总的元素拷贝到扩容后的正确位置(倒序扩容)
- 将待添加的元素放入数组末尾
- 最终修改 intset 的头信息
IntSet 插入方法源码
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
if (success) *success = 1;
if (valenc > intrev32ifbe(is->encoding)) {
return intsetUpgradeAndAdd(is,value);
} else {
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
return is;
}
is = intsetResize(is,intrev32ifbe(is->length)+1);
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
}
_intsetSet(is,pos,value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}
IntSet 扩容源码:
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
uint8_t curenc = intrev32ifbe(is->encoding);
uint8_t newenc = _intsetValueEncoding(value);
int length = intrev32ifbe(is->length);
int prepend = value < 0 ? 1 : 0;
is->encoding = intrev32ifbe(newenc);
is = intsetResize(is,intrev32ifbe(is->length)+1);
while(length--)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));
if (prepend)
_intsetSet(is,0,value);
else
_intsetSet(is,intrev32ifbe(is->length),value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}
IntSet 可以看作是特殊的整数数组,具备一些特点:
- Redis 会确保IntSet中的元素唯一,有序
- 具备类型升级机制,可以节省内存空间
- 底层采用二分查找的方法
3. Dict(字典)
我们知道Redis 是一个键值型的数据库,我们可以根据键实现快速的增删改查。而键与值的映射关系是通过 Dict 来实现的
Dict 由三个部分组成,分别是:哈希表、哈希节点、字典
源码:
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;
typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
int (*expandAllowed)(size_t moreMem, double usedRatio);
} dictType;
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
long rehashidx;
int16_t pauserehash;
} dict;
当我们向Dirct 添加键值对时,Redis 首先根据 key 计算出 hash 值,然后利用 h & sizemask 来计算元素应该存储到数组中的哪个索引位置。
Dict 的扩容:
Dict 中的 HashTable 就是数组结合单向链表的实现,当集合中元素较多时,必然导致哈希冲突增多,链表过长,则查询效率会大大降低
Dict在每次更新键值对时会检查负载因子,满足以下两种情况时会触发哈希表扩容
- 哈希表的LoadFactory >= 1 ,并且服务器没有执行 bgsave或bgrewriteaof等后台进程
- 哈希表的 LoadFactory > 5
扩容源码:
static int _dictExpandIfNeeded(dict *d)
{
if (dictIsRehashing(d)) return DICT_OK;
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio) &&
dictTypeExpandAllowed(d))
{
return dictExpand(d, d->ht[0].used + 1);
}
return DICT_OK;
}
Dict 的收缩:
Dict处理扩容以外,每次删除元素时,也会对负载因子做检查,当LoadFactor < 0.1 时,会做哈希表的收缩
相关源码:
int hashTypeDelete(robj *o, sds field) {
int deleted = 0;
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl, *fptr;
zl = o->ptr;
fptr = ziplistIndex(zl, ZIPLIST_HEAD);
if (fptr != NULL) {
fptr = ziplistFind(zl, fptr, (unsigned char*)field, sdslen(field), 1);
if (fptr != NULL) {
zl = ziplistDelete(zl,&fptr);
zl = ziplistDelete(zl,&fptr);
o->ptr = zl;
deleted = 1;
}
}
} else if (o->encoding == OBJ_ENCODING_HT) {
if (dictDelete((dict*)o->ptr, field) == C_OK) {
deleted = 1;
if (htNeedsResize(o->ptr)) dictResize(o->ptr);
}
} else {
serverPanic("Unknown hash encoding");
}
return deleted;
}
int htNeedsResize(dict *dict) {
long long size, used;
size = dictSlots(dict);
used = dictSize(dict);
return (size > DICT_HT_INITIAL_SIZE &&
(used*100/size < HASHTABLE_MIN_FILL));
}
int dictResize(dict *d)
{
unsigned long minimal;
if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
minimal = d->ht[0].used;
if (minimal < DICT_HT_INITIAL_SIZE)
minimal = DICT_HT_INITIAL_SIZE;
return dictExpand(d, minimal);
}
扩容或者创建 hash 表的源码:
int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
{
if (malloc_failed) *malloc_failed = 0;
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
dictht n;
unsigned long realsize = _dictNextPower(size);
if (realsize < size || realsize * sizeof(dictEntry*) < realsize)
return DICT_ERR;
if (realsize == d->ht[0].size) return DICT_ERR;
n.size = realsize;
n.sizemask = realsize-1;
if (malloc_failed) {
n.table = ztrycalloc(realsize*sizeof(dictEntry*));
*malloc_failed = n.table == NULL;
if (*malloc_failed)
return DICT_ERR;
} else
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
d->ht[1] = n;
d->rehashidx = 0;
return DICT_OK;
}
Dict 中的 rehash:
不管是扩容还是收缩,必定会创建一个新的哈希表,导致哈希表的size 和sizemask 发生变化,而key的查询与 sizemask 有关。因此必须对哈希表中每个key重新计算索引,插入新的哈希表,这个过程称为 rehash。过程如下:
- 计算新的 realeSize,值取决于当前是扩容还是收缩
- 如果是扩容,则新的size 为第一个大于等于 dict.ht[0].used + 1 的 2^n
- 如果是收缩,这为第一个大于等于 dict.ht[0].used 的 2^n (不小于4)
- 按照新的 realeSize 来分配内存空间,创建dictht,并赋值给 dict.ht[1]
- 表示 dict.rehashidx = 0 表示 rehash已经开始了
- 每次执行增删改查操作时,都检查一下 rehashidx 是否大于1,如果是则将dict.ht[0].table[rehashidx] 中的entry 链表 rehash到 dict.ht[1] 中去,并 rehashidx ++ 。直到完成所有迁移
- 将dict.ht[1] 赋值给 dict.ht[0],将dict.ht[1] 初始化为空的哈希表释放原来的内存
- rehashidx = -1时表示rehash 完成
- 在rehash 过程中,新增操作直接到 ht[1] 中去,查询、修改、删除会依次到两个哈希表中去查询。
4. Ziplist
ZipList 是 一种特殊的“双端链表”,由一系列特殊编码的连接内存块组成。可以在任意一端任意进行压入/弹出操作,并且该操作的时间为O(1)
ZipListEntry 定义:
typedef struct zlentry {
unsigned int prevrawlensize;
unsigned int prevrawlen;
unsigned int lensize;
unsigned int len;
unsigned int headersize;
unsigned char encoding;
unsigned char *p;
} zlentry;
Encoding编码:
ZiplistEntry 中的 encoding 编码分为字符串和整数两种
- 字符串:如果 encoding 是以 “00”、“01”、“10” 开头,则证明 content 是字符串
- 如果encoding 是以 11 开始,则证明content 是整数,而且encoding 固定只占用一个字节
ZipList 的连锁更新问题:
ZipList 的每个Entry 都 包含previouts_entry_length 来记录上一个节点的大小,长度是1个或5个字节
- 如果前一个字节长度小于 254 字节,则采用一个字节来保存这个长度值
- 如果前一个字节长度大于 254 字节,则采用五个字节来保存。
新增和删除都有可能发送连锁更新。
ZipList 特征:
- 压缩列表的可以看作一种连续内存空间的双向链表
- 列表的接地那之间不是通过指针连接,而是记录上一节点和本节点长度来寻址,占用内存较低
- 如果列表数据过多,导致链表过长,可能影响查询性能
- 增或删较大数据时可能发生连续更新问题
5. QuickList
ZipList 存在的问题:
问题1:ZipList 虽然节省内容,但是申请内存必须是连续空间,如果内存占用较多,申请内存效率很低,怎么办?
- 为了解决这个问题,我们必须限制 ZipList 的长度和 Entry 的大小
问题2:但是我们要存储大量数据,超出ZipList 最佳的上限怎么办?
问题3:数据拆分后比较分散,不便于管理,多个 ZipList 如何建立联系
- QuickList 数据结构,双向链表,链表中每一个节点都是 ZipList
list-max-ziplist-size 配置
QuickList 结构:
typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count;
unsigned long len;
int fill : QL_FILL_BITS;
unsigned int compress : QL_COMP_BITS;
unsigned int bookmark_count: QL_BM_BITS;
quicklistBookmark bookmarks[];
} quicklist;
QuickListNode 结构:
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl;
unsigned int sz;
unsigned int count : 16;
unsigned int encoding : 2;
unsigned int container : 2;
unsigned int recompress : 1;
unsigned int attempted_compress : 1;
unsigned int extra : 10;
} quicklistNode;
QuickList 特点:
- 是一个节点为 ZipList 的双端链表
- 节点采用ZipList,解决了传统链表的内存占用问题
- 控制了ZipList大小,解决联系内存空间申请效率问题
- 中间节点能压缩,进一步节省内存
6. SkipList
SkipList(跳表)首先是链表,单与传统链表相比有几点差异
- 元素按照升序排列存储
- 节点可能包含多个指针,指针跨度不同
zskiplist结构:
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
zskiplistNode 结构:
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned long span;
} level[];
} zskiplistNode;
特点:
-
跳跃表是一个双向链表,每个节点都包含score和ele值 -
节点按照score值排序,score值一样则按照ele字典排序 -
每个节点都可以包含多层指针,层数是1到32之间的随机数 -
不同层指针到下一个节点的跨度不同,层级越高,跨度越大 -
增删改查效率与红黑树基本一致,实现却更简单
7. RedisObject
Redis中的任意数据类型的键和值都会被封装为一个RedisObject,也叫做Redis对象,源码如下:
Redis的编码方式:
Redis中会根据存储的数据类型不同,选择不同的编码方式,共包含11种不同类型:
编号 | 编码方式 | 说明 |
---|
0 | OBJ_ENCODING_RAW | raw编码动态字符串 | 1 | OBJ_ENCODING_INT | long类型的整数的字符串 | 2 | OBJ_ENCODING_HT | hash表(字典dict) | 3 | OBJ_ENCODING_ZIPMAP | 已废弃 | 4 | OBJ_ENCODING_LINKEDLIST | 双端链表 | 5 | OBJ_ENCODING_ZIPLIST | 压缩列表 | 6 | OBJ_ENCODING_INTSET | 整数集合 | 7 | OBJ_ENCODING_SKIPLIST | 跳表 | 8 | OBJ_ENCODING_EMBSTR | embstr的动态字符串 | 9 | OBJ_ENCODING_QUICKLIST | 快速列表 | 10 | OBJ_ENCODING_STREAM | Stream流 |
五种数据结构:
Redis中会根据存储的数据类型不同,选择不同的编码方式。每种数据类型的使用的编码方式如下:
数据类型 | 编码方式 |
---|
OBJ_STRING | int、embstr、raw | OBJ_LIST | LinkedList和ZipList(3.2以前)、QuickList(3.2以后) | OBJ_SET | intset、HT | OBJ_ZSET | ZipList、HT、SkipList | OBJ_HASH | ZipList、HT |
五种数据类型
1. String
String 是 Redis 中最常见的数据存储类型
-
其基本编码方式是 RAW,基于简单动态字符串(SDS)实现,存储上限为 512mb -
如果存储的SDS长度小于44(44 字节时分配空间刚好为 64 字节复合alloc 算法)字节,则会采用EMBSTR编码,此时object head与SDS是一段连续空间。申请内存时只需要调用一次内存分配函数,效率更高。 -
如果存储的字符串是整数值,并且大小在LONG_MAX范围内,则会采用INT编码:直接将数据保存在RedisObject的ptr指针位置(刚好8字节),不再需要SDS了。
总结:
2. List
Redis的List类型可以从首、尾操作列表中的元素:
-
LinkedList :普通链表,可以从双端访问,内存占用较高,内存碎片较多 -
ZipList :压缩列表,可以从双端访问,内存占用低,存储上限低 -
QuickList:LinkedList + ZipList,可以从双端访问,内存占用较低,包含多个ZipList,存储上限高
Redis的List结构类似一个双端链表,可以从首、尾操作列表中的元素:
- 在3.2版本之前,Redis采用ZipList和LinkedList来实现List,当元素数量小于512并且元素大小小于64字节时采用ZipList编码,超过则采用LinkedList编码。
- 在3.2版本之后,Redis统一采用QuickList来实现List:
void pushGenericCommand(client *c, int where, int xx) {
int j;
robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
if (checkType(c,lobj,OBJ_LIST)) return;
if (!lobj) {
if (xx) {
addReply(c, shared.czero);
return;
}
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
dbAdd(c->db,c->argv[1],lobj);
}
}
robj *createQuicklistObject(void) {
quicklist *l = quicklistCreate();
robj *o = createObject(OBJ_LIST,l);
o->encoding = OBJ_ENCODING_QUICKLIST;
return o;
}
3. Set
Set 是 Redis 中的单列集合,满足一下特点:
- 不保证有序性
- 保证元素唯一(可以判断元素是否存在)
- 求交集、并集、差集
创建 Set 源码:
robj *setTypeCreate(sds value) {
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
return createIntsetObject();
return createSetObject();
}
robj *createIntsetObject(void) {
intset *is = intsetNew();
robj *o = createObject(OBJ_SET,is);
o->encoding = OBJ_ENCODING_INTSET;
return o;
}
robj *createSetObject(void) {
dict *d = dictCreate(&setDictType,NULL);
robj *o = createObject(OBJ_SET,d);
o->encoding = OBJ_ENCODING_HT;
return o;
}
添加数据源码:
int setTypeAdd(robj *subject, sds value) {
long long llval;
if (subject->encoding == OBJ_ENCODING_HT) {
dict *ht = subject->ptr;
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
} else if (subject->encoding == OBJ_ENCODING_INTSET) {
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
size_t max_entries = server.set_max_intset_entries;
if (max_entries >= 1<<30) max_entries = 1<<30;
if (intsetLen(subject->ptr) > max_entries)
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
setTypeConvert(subject,OBJ_ENCODING_HT);
serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
return 1;
}
} else {
serverPanic("Unknown set encoding");
}
return 0;
}
流程:
4. ZSet
ZSet 也就是 SortedSet ,其中每一个元素都需要指定一个score 值和 member 值
- 可以根据 score 值排序
- member 必须唯一,后添加的 member 会覆盖前面的 score 值
- 可以根据 member 查询分数
因此,zset底层数据结构必须满足键值存储、键必须唯一、可排序这几个需求。之前学习的哪种编码结构可以满足?
源码:
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
robj *o;
zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST;
return o;
}
缺点很明显:占用空间大,于是:
当元素数量不多时,HT和SkipList的优势不明显,而且更耗内存。因此zset还会采用ZipList结构来节省内存,不过需要同时满足两个条件:
-
元素数量小于zset_max_ziplist_entries,默认值128 -
每个元素都小于zset_max_ziplist_value字节,默认值64
源码:
int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
return 1;
} else if (!xx) {
if (zzlLength(zobj->ptr)+1 > server.zset_max_ziplist_entries
|| sdslen(ele) > server.zset_max_ziplist_value
|| !ziplistSafeToAdd(zobj->ptr, sdslen(ele)))
{
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
} else {
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (newscore) *newscore = score;
*out_flags |= ZADD_OUT_ADDED;
return 1;
}
} else {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
}
if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
} else {
serverPanic("Unknown sorted set encoding");
}
return 0;
}
5. Hash
Hash结构与Redis中的Zset非常类似:
区别如下:
因此,Hash底层采用的编码与Zset也基本一致,只需要把排序有关的SkipList去掉即可:
Redis 网络模型
用户空间和内核空间
任何Linux发行版,其系统内核都是Linux。我们的应用都需要通过Linux内核与硬件交互。为了避免用户应用导致冲突甚至内核崩溃,用户应用与内核是分离的:
Linux系统为了提高IO效率,会在用户空间和内核空间都加入缓冲区:
阻塞IO
顾名思义,阻塞IO就是两个阶段都必须阻塞等待:
阶段一:
-
用户进程尝试读取数据(比如网卡数据) -
此时数据尚未到达,内核需要等待数据 -
此时用户进程也处于阻塞状态
阶段二:
-
数据到达并拷贝到内核缓冲区,代表已就绪 -
将内核数据拷贝到用户缓冲区 -
拷贝过程中,用户进程依然阻塞等待 -
拷贝完成,用户进程解除阻塞,处理数据
可以看到,阻塞IO模型中,用户进程在两个阶段都是阻塞状态。
非阻塞IO
顾名思义,非阻塞IO的recvfrom操作会立即返回结果而不是阻塞用户进程。
阶段一:
-
用户进程尝试读取数据(比如网卡数据) -
此时数据尚未到达,内核需要等待数据 -
返回异常给用户进程 -
用户进程拿到error后,再次尝试读取 -
循环往复,直到数据就绪
阶段二:
-
将内核数据拷贝到用户缓冲区 -
拷贝过程中,用户进程依然阻塞等待 -
拷贝完成,用户进程解除阻塞,处理数据
可以看到,非阻塞IO模型中,用户进程在第一个阶段是非阻塞,第二个阶段是阻塞状态。虽然是非阻塞,但性能并没有得到提高。而且忙等机制会导致CPU空转,CPU使用率暴增。
IO多路复用
无论是阻塞IO还是非阻塞IO,用户应用在一阶段都需要调用recvfrom来获取数据,差别在于无数据时的处理方案:
而在单线程情况下,只能依次处理IO事件,如果正在处理的IO事件恰好未就绪(数据不可读或不可写),线程就会被阻塞,所有IO事件都必须等待,性能自然会很差。
就比如服务员给顾客点餐,分两步:
-
顾客思考要吃什么(等待数据就绪) -
顾客想好了,开始点餐(读取数据)
要提高效率有几种办法?
- 方案一:增加更多服务员(多线程)
- 方案二:不排队,谁想好了吃什么(数据就绪了),服务员就给谁点餐(用户应用就去读取数据)
那么问题来了:用户进程如何知道内核中数据是否就绪呢?
文件描述符(File Descriptor):简称FD,是一个从0 开始的无符号整数,用来关联Linux中的一个文件。在Linux中,一切皆文件,例如常规文件、视频、硬件设备等,当然也包括网络套接字(Socket)。
IO多路复用:是利用单个线程来同时监听多个FD,并在某个FD可读、可写时得到通知,从而避免无效的等待,充分利用CPU资源。
阶段一:
-
用户进程调用select,指定要监听的FD集合 -
内核监听FD对应的多个socket -
任意一个或多个socket数据就绪则返回readable -
此过程中用户进程阻塞
阶段二:
-
用户进程找到就绪的socket -
依次调用recvfrom读取数据 -
内核将数据拷贝到用户空间 -
用户进程处理数据
IO多路复用是利用单个线程来同时监听多个FD,并在某个FD可读、可写时得到通知,从而避免无效的等待,充分利用CPU资源。不过监听FD的方式、通知的方式又有多种实现,常见的有:
差异:
IO多路复用 - select
typedef long int __fd_mask;
typedef struct {
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
} fd_set;
int select(
int nfds,
fd_set *readfds,
fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout
);
按照比特位存储要监听的 fd
select模式存在的问题:
-
需要将整个fd_set从用户空间拷贝到内核空间,select结束还要再次拷贝回用户空间 -
select无法得知具体是哪个fd就绪,需要遍历整个fd_set -
fd_set监听的fd数量不能超过1024
IO多路复用 - poll
#define POLLIN
#define POLLOUT
#define POLLERR
#define POLLNVAL
struct pollfd {
int fd;
short int events;
short int revents;
};
int poll(
struct pollfd *fds,
nfds_t nfds,
int timeout
);
poll模式对select模式做了简单改进,但性能提升不明显,部分关键代码如下:
IO流程:
-
创建pollfd数组,向其中添加关注的fd信息,数组大小自定义 -
调用poll函数,将pollfd数组拷贝到内核空间,转链表存储,无上限 -
内核遍历fd,判断是否就绪 -
数据就绪或超时后,拷贝pollfd数组到用户空间,返回就绪fd数量n -
用户进程判断n是否大于0 -
大于0则遍历pollfd数组,找到就绪的fd
与select对比:
IO多路复用 - epoll
epoll模式是对select和poll的改进,它提供了三个函数:
struct eventpoll {
struct rb_root rbr;
struct list_head rdlist;
};
int epoll_create(int size);
int epoll_ctl(
int epfd,
int op,
int fd,
struct epoll_event *event
);
int epoll_wait(
int epfd,
struct epoll_event *events,
int maxevents,
int timeout
);
总结:
select模式存在的三个问题:
-
能监听的FD最大不超过1024 -
每次select都需要把所有要监听的FD都拷贝到内核空间 -
每次都要遍历所有FD来判断就绪状态
poll模式的问题:
- poll利用链表解决了select中监听FD上限的问题,但依然要遍历所有FD,如果监听较多,性能会下降
epoll模式中如何解决这些问题的?
-
基于epoll实例中的红黑树保存要监听的FD,理论上无上限,而且增删改查效率都非常高 -
每个FD只需要执行一次epoll_ctl添加到红黑树,以后每次epol_wait无需传递任何参数,无需重复拷贝FD到内核空间 -
利用ep_poll_callback机制来监听FD状态,无需遍历所有FD,因此性能不会随监听的FD数量增多而下降
事件通知机制:
当FD有数据可读时,我们调用epoll_wait(或者select、poll)可以得到通知。但是事件通知的模式有两种:
举个栗子:
①假设一个客户端socket对应的FD已经注册到了epoll实例中
②客户端socket发送了2kb的数据
③服务端调用epoll_wait,得到通知说FD就绪
④服务端从FD读取了1kb数据
⑤回到步骤3(再次调用epoll_wait,形成循环)
结果:
结论:
select 和 poll 仅支持LT模式,epoll可以自由选择LT和ET两种模式
IO多路复用-web服务流程
信号驱动 IO
信号驱动IO是与内核建立SIGIO的信号关联并设置回调,当内核有FD就绪时,会发出SIGIO信号通知用户,期间用户应用可以执行其它业务,无需阻塞等待。
阶段一:
①用户进程调用sigaction,注册信号处理函数
②内核返回成功,开始监听FD
③用户进程不阻塞等待,可以执行其它业务
④当内核数据就绪后,回调用户进程的SIGIO处理函数
阶段二:
①收到SIGIO回调信号
②调用recvfrom,读取
③内核将数据拷贝到用户空间
④用户进程处理数据
当有大量IO操作时,信号较多,SIGIO处理函数不能及时处理可能导致信号队列溢出,而且内核空间与用户空间的频繁信号交互性能也较低。
异步IO
异步IO的整个过程都是非阻塞的,用户进程调用完异步API后就可以去做其它事情,内核等待数据就绪并拷贝到用户空间后才会递交信号,通知用户进程。
阶段一:
①用户进程调用aio_read,创建信号回调函数
②内核等待数据就绪
③用户进程无需阻塞,可以做任何事情
阶段二:
①内核数据就绪
②内核数据拷贝到用户缓冲区
③拷贝完成,内核递交信号触发aio_read中的回调函数
④用户进程处理数据
可以看到,异步IO模型中,用户进程在两个阶段都是非阻塞状态
还是存在问题的,在并发量过高的情况下,用户请求通知内核完成数据拷贝,内核积累的IO读写的任务会越来越多,这样很有可能导致系统崩溃的情况
同步和异步
IO操作是同步还是异步,关键看数据在内核空间与用户空间的拷贝过程(数据读写的IO操作),也就是阶段二是同步还是异步:
Redis 网络模型
Redis 到底是单线程还是多线程?
- 如果仅仅是Redis的核心业务部分(命令处理),答案是单线程
- 如果聊的是整个Redis,那么答案就是多线程
在Redis版本迭代过程中,在两个重要的时间节点上引入了多线程的支持
- Redis 4.0:引入多线程异步处理一些耗时较长的任务,例如异步删除命令 unlink
- Redis 6.0:在核心网络模型中引入 多线程,进一步提高对于多核CPU的利用率
为什么Redis 要选择单线程?
- 抛开持久化不谈,Redis是纯内存操作,执行速度非常快,它的性能瓶颈是网络延迟而不是执行速度,因此多线程并不会带来巨大的性能提升。
- 多线程会导致过多的上下文切换,带来不必要的开销
- 引入多线程会面临线程安全问题,必然要引入线程锁这样的安全手段,实现复杂度增高,而且性能也会大打折扣
Redis通过IO多路复用来提高网络性能,并且支持各种不同的多路复用实现,并且将这些实现进行封装, 提供了统一的高性能事件库API库 AE:
来看下Redis单线程网络模型的整个流程:
readQueryFromClient 源码:
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
long int qblen = sdslen(c->querybuf);
connRead(c->conn, c->querybuf+qblen, readlen);
processInputBuffer(c);
processCommand(c);
}
int processCommand(client *c) {
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
c->cmd->proc(c);
addReply(c,shared.pong);
}
void addReply(client *c, robj *obj) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
listAddNodeHead(server.clients_pending_write,c);
}
Redis 6.0版本中引入了多线程,目的是为了提高IO读写效率。因此在解析客户端命令、写响应结果时采用了多线程。核心的命令执行、IO多路复用模块依然是由主线程执行。
Redis 通信协议
RESP 协议
Redis是一个CS架构的软件,通信一般分两步(不包括pipeline和PubSub):
-
客户端(client)向服务端(server)发送一条命令 -
服务端解析并执行命令,返回响应结果给客户端
因此客户端发送命令的格式、服务端响应结果的格式必须有一个规范,这个规范就是通信协议。
而在Redis中采用的是RESP(Redis Serialization Protocol)协议:
数据类型:
在RESP中,通过首字节的字符来区分不同数据类型,常用的数据类型包括5种:
-
单行字符串:首字节是 ‘+’ ,后面跟上单行字符串,以CRLF( “\r\n” )结尾。例如返回"OK": “+OK\r\n” -
错误(Errors):首字节是 ‘-’ ,与单行字符串格式一样,只是字符串是异常信息,例如:“-Error message\r\n” -
数值:首字节是 ‘:’ ,后面跟上数字格式的字符串,以CRLF结尾。例如:“:10\r\n” -
多行字符串:首字节是 ‘$’ ,表示二进制安全的字符串,最大支持512MB:
-
数组:首字节是 ‘*****’,后面跟上数组元素个数,再跟上元素,元素数据类型不限:
模拟Redis 客户端
public class RedisDemo {
static Socket s; static PrintWriter writer; static BufferedReader reader;
public static void main(String[] args) throws IOException {
String host = "192.168.150.101";
int port = 6379;
s = new Socket(host, port);
reader = new BufferedReader(new InputStreamReader(s.getInputStream(), StandardCharsets.UTF_8));
writer =new PrintWriter(s.getOutputStream());
sendRequest();
Object obj = handleResponse();
System.out.println(obj);
if (reader != null) reader.close();
if (writer != null) writer.close();
if (s != null) s.close();
}
private static Object handleResponse() {
try {
char prefix = (char) reader.read();
switch (prefix) {
case '+':
return reader.readLine();
case '-':
throw new RuntimeException(reader.readLine());
case ':':
return Integer.valueOf(reader.readLine());
case '$':
int length = Integer.parseInt(reader.readLine());
if(length == 0 || length == -1) return "";
return reader.readLine();
case '*':
return readBulkString();
default:
return null;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static void sendRequest() {
writer.println("*3");
writer.println("$3");
writer.println("set");
writer.println("$4");
writer.println("name");
writer.println("$6");
writer.println("虎哥");
writer.flush();
}
private static List<Object> readBulkString() throws IOException {
int size = Integer.parseInt(reader.readLine());
if(size == 0 || size == -1){
return null;
}
List<Object> rs = new ArrayList<>(size);
for (int i = size; i > 0; i--) {
try {
rs.add(handleResponse());
} catch (Exception e) {
rs.add(e);
}
}
return rs;
}
}
Redis内存回收
Redis之所以性能强,最主要的原因就是基于内存存储。然而单节点的Redis其内存大小不宜过大,会影响持久化或主从同步性能。
我们可以通过修改配置文件来设置Redis的最大内存:
当内存使用达到上限时,就无法存储更多数据了。为了解决这个问题,Redis提供了一些策略实现内存回收:
内存过期策略:
这里有两个问题需要我们思考:
-
Redis是如何知道一个key是否过期呢? 可以利用亮哥哥Dict 分别记录 key-value 对以及 key-ttl 对 Redis本身是一个典型的key-value内存存储数据库,因此所有的key、value都保存在之前学习过的Dict结构中。不过在其database结构体中,有两个Dict:一个用来记录key-value;另一个用来记录key-TTL。 -
是不是TTL 到起后就立刻删除? 答:惰性删除、周期删除 惰性删除:顾明思议并不是在TTL到期后就立刻删除,而是在访问一个key的时候,检查该key的存活时间,如果已经过期才执行删除。 周期删除:顾明思议是通过一个定时任务,周期性的抽样部分过期的key,然后执行删除。执行周期有两种:
SLOW模式规则: ①执行频率受server.hz影响,默认为10,即每秒执行10次,每个执行周期100ms。 ②执行清理耗时不超过一次执行周期的25%.默认slow模式耗时不超过25ms ③逐个遍历db,逐个遍历db中的bucket,抽取20个key判断是否过期 ④如果没达到时间上限(25ms)并且过期key比例大于10%,再进行一次抽样,否则结束 FAST模式规则(过期key比例小于10%不执行 ): ①执行频率受beforeSleep()调用频率影响,但两次FAST模式间隔不低于2ms ②执行清理耗时不超过1ms ③逐个遍历db,逐个遍历db中的bucket,抽取20个key判断是否过期 ④如果没达到时间上限(1ms)并且过期key比例大于10%,再进行一次抽样,否则结束
内存淘汰策略:
内存淘汰:就是当Redis内存使用达到设置的上限时,主动挑选部分****key删除以释放更多内存的流程。Redis会在处理客户端命令的方法processCommand()中尝试做内存淘汰:
int processCommand(client *c) {
if (server.maxmemory && !server.lua_timedout) {
int out_of_memory = (performEvictions() == EVICT_FAIL);
if (out_of_memory && reject_cmd_on_oom) {
rejectCommand(c, shared.oomerr);
return C_OK;
}
}
}
Redis支持8种不同策略来选择要删除的key:
-
noeviction: 不淘汰任何key,但是内存满时不允许写入新数据,默认就是这种策略。 -
volatile-ttl: 对设置了TTL的key,比较key的剩余TTL值,TTL越小越先被淘汰 -
allkeys-random:对全体key ,随机进行淘汰。也就是直接从db->dict中随机挑选 -
volatile-random:对设置了TTL的key ,随机进行淘汰。也就是从db->expires中随机挑选。 -
allkeys-lru: 对全体key,基于LRU算法进行淘汰 -
volatile-lru: 对设置了TTL的key,基于LRU算法进行淘汰 -
allkeys-lfu: 对全体key,基于LFU算法进行淘汰 -
volatile-lfu: 对设置了TTL的key,基于LFI算法进行淘汰
比较容易混淆的有两个:
淘汰策略流程:
|