1.Memcached概述
memcached是一个高性能的分布式内存缓存服务器,memcached在Linux上可以通过yum命令安装,这样方便很多,在生产环境下建议用Linux系统,memcached使用libevent这个库在Linux系统上才能发挥它的高性能。它的分布式其实在服务端是不具有分布式的特征的,是依靠客户端的分布式算法进行了分布式,memcached是一个纯内存型的数据库,这样在读写速度上相对来说比较快。
?
MemCache虽然被称为分布式缓存,但是MemCache本身完全不具备分布式的功能,MemCache集群之间不会相互通信(与之形成对比的,比如JBoss Cache,某台服务器有缓存数据更新时,会通知集群中其他机器更新缓存或清除缓存数据),所谓的分布式,完全依赖于客户端程序的实现,就像上面这张图的流程一样。
命 令 | 作 用 | get | 返回Key对应的Value值 | add | 添加一个Key值,没有则添加成功并提示STORED,有则失败并提示NOT_STORED | set | 无条件地设置一个Key值,没有就增加,有就覆盖,操作成功提示STORED | replace | 按照相应的Key值替换数据,如果Key值不存在则会操作失败 | stats | 返回MemCache通用统计信息 | stats items | 返回各个slab中item的数目和最老的item的年龄 | stats slabs | 返回MemCache运行期间创建的每个slab的信息 | version | 返回当前MemCache版本号 | flush_all | 清空所有键值,但不会删除items,所以此时MemCache依旧占用内存 | quit | 关闭连接 |
2.memcached内存管理
MemCache的数据存放在内存中,存放在内存中认为意味着几点:
1)访问数据的速度比传统的关系型数据库要快,因为Oracle、MySQL这些传统的关系型数据库为了保持数据的持久性,数据存放在硬盘中,IO操作速度慢
2)MemCache的数据存放在内存中同时意味着只要MemCache重启了,数据就会消失
3)既然MemCache的数据存放在内存中,那么势必受到机器位数的限制,32位机器最多只能使用2GB的内存空间,64位机器可以认为没有上限
MemCache采用的内存分配方式是固定空间分配
?
这里面涉及了slab_class、slab、page、chunk四个概念,它们之间的关系是:
1)MemCache将内存空间分为一组slab
2)每个slab下又有若干个page,每个page默认是1M,如果一个slab占用100M内存的话,那么这个slab下应该有100个page
3)每个page里面包含一组chunk,chunk是真正存放数据的地方,同一个slab里面的chunk的大小是固定的
4)有相同大小chunk的slab被组织在一起,称为slab_class
MemCache内存分配的方式称为allocator,slab的数量是有限的,几个、十几个或者几十个,这个和启动参数的配置相关。MemCache中的value过来存放的地方是由value的大小决定的,value总是会被存放到与chunk大小最接近的一个slab中,比如slab[1]的chunk大小为80字节、slab[2]的chunk大小为100字节、slab[3]的chunk大小为128字节(相邻slab内的chunk基本以1.25为比例进行增长,MemCache启动时可以用-f指定这个比例),那么过来一个88字节的value,这个value将被放到2号slab中。放slab的时候,首先slab要申请内存,申请内存是以page为单位的,所以在放入第一个数据的时候,无论大小为多少,都会有1M大小的page被分配给该slab。申请到page后,slab会将这个page的内存按chunk的大小进行切分,这样就变成了一个chunk数组,最后从这个chunk数组中选择一个用于存储数据。
如果这个slab中没有chunk可以分配了怎么办,如果MemCache启动没有追加-M(禁止LRU,这种情况下内存不够会报Out Of Memory错误),那么MemCache会把这个slab中最近最少使用的chunk中的数据清理掉,然后放上最新的数据。针对MemCache的内存分配及回收算法,总结三点:
1)MemCache的内存分配chunk里面会有内存浪费,88字节的value分配在128字节(紧接着大的用)的chunk中,就损失了30字节,但是这也避免了管理内存碎片的问题
MemCache的LRU算法不是针对全局的,是针对slab的
2)应该可以理解为什么MemCache存放的value大小是限制的,因为一个新数据过来,3)slab会先以page为单位申请一块内存,申请的内存最多就只有1M,所以value大小自然不能大于1M了
2.1MemCache的特性和限制
- 序号
|
- 限制描述
|
- 1
|
- MemCache中可以保存的item数据量是没有限制的,只要内存足够
|
- 2
|
- MemCache单进程在32位机中最大使用内存为2G,64位机则没有限制
|
- 3
|
- Key最大为250个字节,超过该长度无法存储
|
- 4
|
- 单个item最大数据是1MB,超过1MB的数据不予存储
|
- 5
|
- MemCache服务端是不安全的,比如已知某个MemCache节点,可以直接telnet过去,并通过flush_all让已经存在的键值对立即失效
|
- 6
|
- 不能够遍历MemCache中所有的item,因为这个操作的速度相对缓慢且会阻塞其他的操作
|
- 7
|
- MemCache的高性能源自于两阶段哈希结构:第一阶段在客户端,通过Hash算法根据Key值算出一个节点;第二阶段在服务端,通过一个内部的Hash算法,查找真正的item并返回给客户端。从实现的角度看,MemCache是一个非阻塞的、基于事件的服务器程序
|
2.2 slab内存管理
2.2.1 item 数据存储节点
typedef struct _stritem {
??? /* Protected by LRU locks */
??? //一个item的地址, 主要用于LRU链和freelist链
??? struct _stritem *next;
??? //下一个item的地址,主要用于LRU链和freelist链
??? struct _stritem *prev;
??? /* Rest are protected by an item lock */
??? //用于记录哈希表槽中下一个item节点的地址
??? struct _stritem *h_next;??? /* hash chain next */
??? //最近访问时间
??? rel_time_t????? time;?????? /* least recent access */
??? //缓存过期时间
??? rel_time_t????? exptime;??? /* expire time */
??? int???????????? nbytes;???? /* size of data */
??? //当前item被引用的次数,用于判断item是否被其它的线程在操作中
??? //refcount == 1的情况下该节点才可以被删除
??? unsigned short? refcount;
??? uint8_t???????? nsuffix;??? /* length of flags-and-length string */
??? uint8_t???????? it_flags;?? /* ITEM_* above */
??? //记录该item节点位于哪个slabclass_t中
??? uint8_t???????? slabs_clsid;/* which slab class we're in */
??? uint8_t?? ??????nkey;?????? /* key length, w/terminating null and padding */
??? /* this odd type prevents type-punning issues when we do
???? * the little shuffle to save space when not using CAS. */
??? union {
??????? uint64_t cas;
??????? char end;
??? } data[];
??? /* if it_flags & ITEM_CAS we have 8 bytes CAS */
??? /* then null-terminated key */
??? /* then " flags length\r\n" (no terminating null) */
??? /* then data with terminating \r\n (no terminating null; it's binary!) */
} item;
slab与chunk
slab是一块内存空间,默认大小为1M,memcached会把一个slab分割成一个个chunk, 这些被切割的小的内存块,主要用来存储item
slabclass
每个item的大小都可能不一样,item存储于chunk,如果chunk大小不够,则不足以分配给item使用,如果chunk过大,则太过于浪费内存空间。因此memcached采取的做法是,将slab切割成不同大小的chunk,这样就满足了不同大小item的存储。被划分不同大小chunk的slab的内存在memcached就是用slabclass这个结构体来表现的
typedef struct {
unsigned int size; /* sizes of items */
unsigned int perslab; /* how many items per slab */
void *slots; /* list of item ptrs */
unsigned int sl_curr; /* total free items in list */
unsigned int slabs; /* how many slabs were allocated for this class */
void **slab_list; /* array of slab pointers */
unsigned int list_size; /* size of prev array */
} slabclass_t;
?
1)slabclass数组初始化的时候,每个slabclass_t都会分配一个1M大小的slab,slab会被切分为N个小的内存块,这个小的内存块的大小取决于slabclass_t结构上的size的大小
2)每个slabclass_t都只存储一定大小范围的数据,并且下一个slabclass切割的chunk块大于前一个slabclass切割的chunk块大小
3)memcached中slabclass数组默认大小为64,slabclass切割块大小的增长因子默认是1.25
例如:slabclass[1]切割的chunk块大小为100字节,slabclass[2]为125,如果需要存储一个110字节的缓存,那么就需要到slabclass[2] 的空闲链表中获取一个空闲节点进行存储。
2.2.2 slabclass的初始化
slabs_init ?????? |---> slabs_preallocate ?????????????????????????? |---> do_slabs_newslab ?????????????????????????????????????????? |--->grow_slab_list ?????????????????????????????????????????? |--->split_slab_page_into_freelist ???????? |
2.2.3 item节点的分配流程
序号 | 描述 | 1 | 根据大小,找到合适的slabclass | 2 | slabclass空闲列表中是否有空闲的item节点,如果有直接分配item,用于存储内容 | 3 | 空闲列表没有空闲的item可以分配,会重新开辟一个slab(默认大小为1M)的内存块,然后切割slab并放入到空闲列表中,然后从空闲列表中获取节点 |
void *slabs_alloc(size_t size, unsigned int id, ??????? unsigned int flags) { ??? void *ret; ??? pthread_mutex_lock(&slabs_lock); ??? ret = do_slabs_alloc(size, id, flags); ??? pthread_mutex_unlock(&slabs_lock); ??? return ret; } /*@null@*/ static void *do_slabs_alloc(const size_t size, unsigned int id, ??????? unsigned int flags) { ??? slabclass_t *p; ??? void *ret = NULL; ??? item *it = NULL; ??? if (id < POWER_SMALLEST || id > power_largest) { ??????? MEMCACHED_SLABS_ALLOCATE_FAILED(size, 0); ??????? return NULL; ??? } ??? p = &slabclass[id]; ??? assert(p->sl_curr == 0 || (((item *)p->slots)->it_flags & ITEM_SLABBED)); ??? assert(size <= p->size); ??? /* 没有空闲slab时,需要重新分配*/ ??? if (p->sl_curr == 0 && flags != SLABS_ALLOC_NO_NEWPAGE) { ??????? do_slabs_newslab(id); ??? } ??? if (p->sl_curr != 0) { ??????? /* return off our freelist */ ??????? it = (item *)p->slots; ??????? p->slots = it->next; ??????? if (it->next) it->next->prev = 0; ??????? /* Kill flag and initialize refcount here for lock safety in slab ???????? * mover's freeness detection. */ ??????? it->it_flags &= ~ITEM_SLABBED; ??????? it->refcount = 1; ??????? p->sl_curr--; ??????? ret = (void *)it; ??? } else { ??????? ret = NULL; ??? } ??? if (ret) { ??????? MEMCACHED_SLABS_ALLOCATE(size, id, p->size, ret); ??? } else { ??????? MEMCACHED_SLABS_ALLOCATE_FAILED(size, id); ??? } ??? return ret; } |
2.2.4 item节点的释放
释放一个item节点,并不会free内存空间,而是将item节点归还到slabclass的空闲列表中
void slabs_free(void *ptr, size_t size, unsigned int id) { ??? pthread_mutex_lock(&slabs_lock); ??? do_slabs_free(ptr, size, id); ??? pthread_mutex_unlock(&slabs_lock); } static void do_slabs_free(void *ptr, const size_t size, unsigned int id) { ??? slabclass_t *p; ??? item *it; ??? assert(id >= POWER_SMALLEST && id <= power_largest); ??? if (id < POWER_SMALLEST || id > power_largest) ??????? return; ??? MEMCACHED_SLABS_FREE(size, id, ptr); ??? p = &slabclass[id]; ??? it = (item *)ptr; ??? if ((it->it_flags & ITEM_CHUNKED) == 0) { ??????? it->it_flags = ITEM_SLABBED; ??????? it->slabs_clsid = id; ??????? it->prev = 0; ??????? it->next = p->slots; ??????? if (it->next) it->next->prev = it; ??????? p->slots = it; ??????? p->sl_curr++; ??? } else { ??????? do_slabs_free_chunked(it, size); ??? } ??? return; } |
3 Memcached网络模型
1 | Memcached主要是基于Libevent 网络事件库进行开发的 | 2 | Memcached的网络模型分为两部分:主线程和工作线程。主线程主要用来接收客户端的连接信息;工作线程主要用来接管客户端连接,处理具体的业务逻辑 | 3 | 主线程和工作线程之间主要是通过pipe管道来进行通信。当主线程接收到客户端的连接的时候,会通过轮询的方式选择一个工作线程,然后向该工作线程的管道pipe写数据。工作线程监听到管道中有数据写入的时候,就会触发代码逻辑去接管客户端的连接 | 4 | 每个工作线程也是基于Libevent的事件机制,当客户端有数据写入的时候,就会触发读取的操作 |
"主线程统一accept/dispatch子线程"的基础设施:主线程创建多个子线程(这些子线程也称为worker线程),每一个线程都维持自己的事件循环,即每个线程都有自己的epoll,并且都会调用epoll_wait函数进入事件监听状态。每一个worker线程(子线程)和主线程之间都用一条管道相互通信。每一个子线程都监听自己对应那条管道的读端。当主线程想和某一个worker线程进行通信,直接往对应的那条管道写入数据即可。
"主线程统一accept/dispatch子线程"模型的工作流程:主线程负责监听进程对外的TCP监听端口。当客户端申请连接connect到进程的时候,主线程负责接收accept客户端的连接请求。然后主线程选择其中一个worker线程,把客户端fd通过对应的管道传给worker线程。worker线程得到客户端的fd后负责和这个客户端进行一切的通信。
?
1. memcached使用libevent作为进行事件监听;
2.memcached往管道里面写的内容不是fd,而是一个简单的字符。每一个worker线程都维护一个CQ队列,主线程把fd和一些信息写入一个CQ_ITEM里面,然后主线程往worker线程的CQ队列里面push这个CQ_ITEM。接着主线程使用管道通知worker线程:“唤醒work线程处理新的链接请求”。
3.1 CQ_ITEM
memcached每一个worker线程都有一个CQ队列,主线程accept到新客户端后,就把新客户端的信息封装成一个CQ_ITEM,然后push到选定线程的CQ队列中
typedef struct conn_queue_item CQ_ITEM; struct conn_queue_item { ??? int?????????????? sfd; ??? enum conn_states? init_state; ??? int?????????????? event_flags; ??? int?????????????? read_buffer_size; ??? enum network_transport???? transport; ??? enum conn_queue_item_modes mode; ??? conn *c; ??? void??? *ssl; ??? io_pending_t *io; // IO when used for deferred IO handling. ??? STAILQ_ENTRY(conn_queue_item) i_next; }; /* A connection queue. */ typedef struct conn_queue CQ; struct conn_queue { ??? STAILQ_HEAD(conn_ev_head, conn_queue_item) head; ??? pthread_mutex_t lock; ??? cache_t *cache; /* freelisted objects */ }; |
?可以看到结构体conn_queue(即CQ队列结构体)有一个pthread_mutex_t类型变量lock,这说明主线程往某个worker线程的CQ队列里面push一个CQ_ITEM的时候必然要加锁的。下面是初始化CQ队列,以及push、pop一个CQ_ITEM的代码
static void cq_init(CQ *cq) { ??? pthread_mutex_init(&cq->lock, NULL); ??? cq->head = NULL; ??? cq->tail = NULL; } static CQ_ITEM *cq_pop(CQ *cq) { ??? CQ_ITEM *item; ??? pthread_mutex_lock(&cq->lock); ??? item = cq->head; ??? if (NULL != item) { ??????? cq->head = item->next; ??????? if (NULL == cq->head) ??????????? cq->tail = NULL; ??? } ??? pthread_mutex_unlock(&cq->lock); ??? return item; } /* ?* Adds an item to a connection queue. ?*/ static void cq_push(CQ *cq, CQ_ITEM *item) { ??? item->next = NULL; ??? pthread_mutex_lock(&cq->lock); ??? if (NULL == cq->tail) ??????? cq->head = item; ??? else ??????? cq->tail->next = item; ??? cq->tail = item; ??? pthread_mutex_unlock(&cq->lock); } |
?
3.2 线程模型
?
?
3.2.1 主线程初始化逻辑
Memcached主线程的初始化逻辑比较简单,主要作用是启动监听的master线程和工作的worker线程。,其中启动worker线程通过memcached_thread_init函数进行实现,这部分逻辑分析在worker线程初始化当中进行分析,这里主要分析监听的master线程。整个master线程的启动过程就是socket的server端初始化结合libevent的初始化。整个过程如下:
1)server_sockets,该方法主要是遍历所有listen的socket列表并逐个进行绑定。
2)server_socket,该方法主要是操作单个socket到listen状态。
3)conn_new,将socket注册到libevent当中。
4)event_handler,监听socket的回调函数。
5)最后event_base_loop让整个libevent进行循环工作状态
int main (int argc, char **argv) { ?????? //检查libevent的版本是否足够新.1.3即可 ??? if (!sanitycheck()) { ??????? return EX_OSERR; ??? } ?????? //对memcached的关键设置取默认值 ??? settings_init(); ?????? ...//解析memcached启动参数 ?????? //main_base是一个struct event_base类型的全局变量 ??? main_base = event_init();//为主线程创建一个event_base ??? conn_init();//先不管,后面会说到 ?????? //创建settings.num_threads个worker线程,并且为每个worker线程创建一个CQ队列 ?????? //并为这些worker申请各自的event_base,worker线程然后进入事件循环中?????? ??? thread_init(settings.num_threads, main_base); ?????? //设置一个定时event(也叫超时event),定时(频率为一秒)更新current_time变量 ?????? //这个超时event是add到全局变量main_base里面的,所以主线程负责更新current_time(这是一个很重要的全局变量) ??? clock_handler(0, 0, 0); ??? /* create the listening socket, bind it, and init */ ??? if (settings.socketpath == NULL) { ??????? FILE *portnumber_file = NULL; ????????????? //创建监听客户端的socket ??????? if (settings.port && server_sockets(settings.port, tcp_transport,//tcp_transport是枚举类型 ?????????????????????????????????????????? portnumber_file)) { ??????????? vperror("failed to listen on TCP port %d", settings.port); ??????????? exit(EX_OSERR); ??????? } ???????????? ... ??? } ??? if (event_base_loop(main_base, 0) != 0) {//主线程进入事件循环 ??????? retval = EXIT_FAILURE; ??? } ??? return retval; } |
解析参数并把遍历所有的监听socket进行绑定。执行方法server_socket(p, the_port, transport, portnumber_file)
static int server_sockets(int port, enum network_transport transport, ????????????????????????? FILE *portnumber_file) { ??? if (settings.inter == NULL) { ??????? return server_socket(settings.inter, port, transport, portnumber_file); ??? } else { ??????? // tokenize them and bind to each one of them.. ??????? char *b; ??????? int ret = 0; ??????? char *list = strdup(settings.inter); ??????? for (char *p = strtok_r(list, ";,", &b); ??????????? ret |= server_socket(p, the_port, transport, portnumber_file); ??????? } ??????? free(list); ??????? return ret; ??? } } |
?针对单个listen的socket的初始化过程,这里主要做的事情是socket的相关初始化过程,主要是指设置socket相关的一些参数;进行socket的bind操作;通过方法conn_new关联socket和libevent当中
static int server_socket(const char *interface, ???????????????????????? int port, ???????????????????????? enum network_transport transport, ???????????????????????? FILE *portnumber_file) { ??? int sfd; ??? struct linger ling = {0, 0}; ??? struct addrinfo *ai; ??? struct addrinfo *next; ??? struct addrinfo hints = { .ai_flags = AI_PASSIVE, ????????????????????????????? .ai_family = AF_UNSPEC }; ??? char port_buf[NI_MAXSERV]; ??? int error; ??? int success = 0; ??? int flags =1; ??? for (next= ai; next; next= next->ai_next) { ??????? conn *listen_conn_add; ??????? if ((sfd = new_socket(next)) == -1) { ??????????? continue; ??????? } ??????? //todo 设置socket相关的属性,这里省略相关代码 ??????? // 绑定socket,省略相关代码 ??????? if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {} ??????? // 暂时只关心TCP协议的,忽略UDP协议实现 ??????? if (IS_UDP(transport)) { ??????? } else { ??????????? if (!(listen_conn_add = conn_new(sfd, conn_listening, ???????????????????????????????????????????? EV_READ | EV_PERSIST, 1, ???????????????????????????????????????????? transport, main_base))) { ??????????????? fprintf(stderr, "failed to create listening connection\n"); ?????????????? ?exit(EXIT_FAILURE); ??????????? } ??????????? listen_conn_add->next = listen_conn; ??????????? listen_conn = listen_conn_add; ??????? } ??? } ??? freeaddrinfo(ai); ??? /* Return zero iff we detected no errors in starting up connections */ ??? return success == 0; } |
conn_new内部就是执行libevent相关的配置,包括event_set和event_base_set,这里需要关注的是event_set当中绑定了回调函数event_handler,用于连接到来后执行的逻辑
conn *conn_new(const int sfd, enum conn_states init_state, ??????????????? const int event_flags, ??????????????? const int read_buffer_size, enum network_transport transport, ??????????????? struct event_base *base, void *ssl) { ??? conn *c; ??? assert(sfd >= 0 && sfd < max_fds); c = conns[sfd]; 。。。。。。。。。。。 ? ??event_set(&c->event, sfd, event_flags, event_handler, (void *)c); ??? event_base_set(base, &c->event); ??? c->ev_flags = event_flags; ??? if (event_add(&c->event, 0) == -1) { ??????? perror("event_add"); ??????? return NULL; ??? } ??? return c; } |
回调函数event_handler的核心在于drive_machine,这个函数是整个Memcached的状态转移中心,所有的操作都通过drive_machine进行驱动来实现的
void event_handler(const evutil_socket_t fd, const short which, void *arg) { ??? conn *c; ??? c = (conn *)arg; ??? assert(c != NULL); ??? c->which = which; ??? /* sanity */ ??? if (fd != c->sfd) { ??????? if (settings.verbose > 0) ??????????? fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n"); ??????? conn_close(c); ??????? return; ??? } ??? drive_machine(c); ??? /* wait for next event */ ??? return; } |
3.2.2 work线程初始化
memcached_thread_init主要用于工作线程worker的初始化,核心的三个操作主要是:
1)初始化master线程和worker线程通信的pipe管道,pipe(fds)。
2)setup_thread,主要用于设置工作线程libevent相关的参数。
3)create_worker,主要是启动工作线程开始循环处理工作
void memcached_thread_init(int nthreads, void *arg) { ??? int???????? i; ??? int???????? power; ??? threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); ? ??for (i = 0; i < nthreads; i++) { #ifdef HAVE_EVENTFD ??????? threads[i].notify_event_fd = eventfd(0, EFD_NONBLOCK); ??????? if (threads[i].notify_event_fd == -1) { ??????????? perror("failed creating eventfd for worker thread"); ??????????? exit(1); ??? ????} #else ??????? int fds[2]; ??????? if (pipe(fds)) { ??????????? perror("Can't create notify pipe"); ??????????? exit(1); ??????? } ??????? threads[i].notify_receive_fd = fds[0]; ??????? threads[i].notify_send_fd = fds[1]; #endif #ifdef EXTSTORE ???? ???threads[i].storage = arg; #endif ??????? setup_thread(&threads[i]); ??????? /* Reserve three fds for the libevent base, and two for the pipe */ ??????? stats_state.reserved_fds += 5; ??? } ??? /* Create threads after we've done all the libevent setup. */ ??? for (i = 0; i < nthreads; i++) { ??????? create_worker(worker_libevent, &threads[i]); ??? } ??? /* Wait for all the threads to set themselves up before returning. */ ??? pthread_mutex_lock(&init_lock); ??? wait_for_thread_registration(nthreads); ??? pthread_mutex_unlock(&init_lock); } |
setup_thread内部主要是初始化工作线程worker的libevent相关参数,这里我们重点关注包括:
1)回调函数thread_libevent_process。
2)初始化master线程和worker线程通信的队cq_init(me->new_conn_queue)
static void setup_thread(LIBEVENT_THREAD *me) {
me->base = event_init();
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
me->new_conn_queue = malloc(sizeof(struct conn_queue));
if (me->new_conn_queue == NULL) {
perror("Failed to allocate memory for connection queue");
exit(EXIT_FAILURE);
}
cq_init(me->new_conn_queue);
}
create_worker主要是启动工作线程worker使其开始工作就可以了。
create_worker(worker_libevent, &threads[i])传入函数是worker_libevent
通过pthread_create方法触发worker_libevent的工作,在worker_libevent方法内部通过event_base_loop最终使得libevent开始工作
static void create_worker(void *(*func)(void *), void *arg) { ??? pthread_attr_t? attr; ??? int???????????? ret; ??? pthread_attr_init(&attr); ??? if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) { ??????? fprintf(stderr, "Can't create thread: %s\n", ??????????????? strerror(ret)); ??????? exit(1); ??? } } static void *worker_libevent(void *arg) { ??? LIBEVENT_THREAD *me = arg; ??? /* Any per-thread setup can happen here; memcached_thread_init() will block until ???? * all threads have finished initializing. ???? */ ??? me->l = logger_create(); ??? me->lru_bump_buf = item_lru_bump_buf_create(); ??? if (me->l == NULL || me->lru_bump_buf == NULL) { ??????? abort(); ??? } ??? if (settings.drop_privileges) { ??????? drop_worker_privileges(); ??? } ?? ?register_thread_initialized(); ? ??event_base_loop(me->base, 0); ??? // same mechanism used to watch for all threads exiting. ?? ?register_thread_initialized(); ??? event_base_free(me->base); ??? return NULL; } |
3.2.3 主从线程通信
在master线程接受连接以后会触发drive_machine方法,其中master的状态为conn_listening,最终我们通过dispatch_conn_new方法实现master到worker的分发操作
static void drive_machine(conn *c) { ??? bool stop = false; ??? int sfd; ??? socklen_t addrlen; ??? struct sockaddr_storage addr; ??? int nreqs = settings.reqs_per_event; ??? int res; ??? const char *str; #ifdef HAVE_ACCEPT4 ??? static int? use_accept4 = 1; #else ??? static int? use_accept4 = 0; #endif ??? assert(c != NULL); ??? while (!stop) { ??????? switch(c->state) { ??????? case conn_listening: ??????????? addrlen = sizeof(addr); ??????????? sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); ?????????? // 中间省略一系列的socket相关的初始化工作??????????? ??????????? if (settings.maxconns_fast && ??????????? } else { ??????????????? dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, ???????????????????????????????????? DATA_BUFFER_SIZE, c->transport); ????????? ??} ??????????? stop = true; ??????????? break; |
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, ?????????????????????? int read_buffer_size, enum network_transport transport, void *ssl) { ??? CQ_ITEM *item = NULL; ??? LIBEVENT_THREAD *thread; ??? if (!settings.num_napi_ids) ??????? thread = select_thread_round_robin(); ??? else ??????? thread = select_thread_by_napi_id(sfd); ??? item = cqi_new(thread->ev_queue); ??? if (item == NULL) { ??????? close(sfd); ??????? /* given that malloc failed this may also fail, but let's try */ ??????? fprintf(stderr, "Failed to allocate memory for connection object\n"); ??????? return; ??? } ??? item->sfd = sfd; ??? item->init_state = init_state; ??? item->event_flags = event_flags; ??? item->read_buffer_size = read_buffer_size; ??? item->transport = transport; ??? item->mode = queue_new_conn; ??? item->ssl = ssl; ??? MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id); ??? notify_worker(thread, item); } static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) { ??? cq_push(t->ev_queue, item); #ifdef HAVE_EVENTFD ??? uint64_t u = 1; ??? if (write(t->notify_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { ??????? perror("failed writing to worker eventfd"); ??????? /* TODO: This is a fatal problem. Can it ever happen temporarily? */ ??? } #else ??? char buf[1] = "c"; ??? if (write(t->notify_send_fd, buf, 1) != 1) { ??????? perror("Failed writing to notify pipe"); ??????? /* TODO: This is a fatal problem. Can it ever happen temporarily? */ ??? } #endif } |
thread_libevent_process是worker线程接受master分发新来连接时候的回调函数,内部通过conn_new来处理新连接的到来,conn_new的内部操作就是把新连接的socket注册到worker线程的libevent当中。
static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) { ??? LIBEVENT_THREAD *me = arg; ??? CQ_ITEM *item; ??? conn *c; ??? uint64_t ev_count = 0; // max number of events to loop through this run. #ifdef HAVE_EVENTFD ??? if (read(fd, &ev_count, sizeof(uint64_t)) != sizeof(uint64_t)) { ??????? if (settings.verbose > 0) ??????????? fprintf(stderr, "Can't read from libevent pipe\n"); ??????? return; ??? } #else ??? char buf[MAX_PIPE_EVENTS]; ??? ev_count = read(fd, buf, MAX_PIPE_EVENTS); ??? if (ev_count == 0) { ??????? if (settings.verbose > 0) ??????????? fprintf(stderr, "Can't read from libevent pipe\n"); ??????? return; ??? } #endif ??? for (int x = 0; x < ev_count; x++) { ??????? item = cq_pop(me->ev_queue); ??????? if (item == NULL) { ??????????? return; ??????? } ??????? switch (item->mode) { ??????????? case queue_new_conn: ?????? ?????????c = conn_new(item->sfd, item->init_state, item->event_flags, ?????????????????????????????????? item->read_buffer_size, item->transport, ?????????????????????????????????? me->base, item->ssl); |
3.3 命令解析
memcached 的命令协议从直观逻辑上可以分为获取类型、变更类型、其他类型。但从实际处理层面区分,则可以细分为 get 类型、update 类型、delete 类型、算术类型、touch 类型、stats 类型,以及其他类型。对应的处理函数为,process_get_command, process_update_command, process_arithmetic_command, process_touch_command 等。每个处理函数能够处理不同的协议
?
?
工作线程监听到主线程的管道通知后,会从连接队列弹出一个新连接,然后就会创建一个 conn 结构体,注册该 conn 读事件,然后继续监听该连接上的 IO 事件。后续这个连接有命令进来时,工作线程会读取 client 发来的命令,进行解析并处理,最后返回响应。工作线程的主要处理逻辑也是在状态机中,一个名叫 drive_machine 的函数。整个处理流程如下:
序号 | 描述 | 1 | 当客户端和Memcached建立TCP连接后,Memcached会基于Libevent的event事件来监听客户端是否有可以读取的数据 | 2 | 当客户端有命令数据报文上报的时候,就会触发drive_machine方法中的conn_read这个Case,在进入这个状态之前经过conn_new_cmd->conn_waiting->conn_read的流程 | 3 | memcached通过try_read_network方法读取客户端的报文。如果读取失败,则返回conn_closing,去关闭客户端的连接;如果没有读取到任何数据,则会返回conn_waiting,继续等待客户端的事件到来,并且退出drive_machine的循环;如果数据读取成功,则会将状态转交给conn_parse_cmd处理,读取到的数据会存储在c->rbuf容器中 | 4 | conn_parse_cmd主要的工作就是用来解析命令。主要通过try_read_command这个方法来读取c->rbuf中的命令数据,通过\n来分隔数据报文的命令。如果c->buf内存块中的数据匹配不到\n,则返回继续等待客户端的命令数据报文到来conn_waiting;否则就会转交给process_command方法,来处理具体的命令(命令解析会通过\0符号来分隔) | 5 | process_command主要用来处理具体的命令。其中tokenize_command这个方法非常重要,将命令拆解成多个元素(KEY的最大长度250)。例如我们以get命令为例,最终会跳转到process_get_command这个命令 process_*_command这一系列就是处理具体的命令逻辑的 | 6 | 我们进入process_get_command,当获取数据处理完毕之后,会转交到conn_mwrite这个状态。如果获取数据失败,则关闭连接 | 7 | 进入conn_mwrite后,主要是通过transmit方法来向客户端提交数据。如果写数据失败,则关闭连接或退出drive_machine循环;如果写入成功,则又转交到conn_new_cmd这个状态 | 8 | conn_new_cmd这个状态主要是处理c->rbuf中剩余的命令。主要看一下reset_cmd_handler这个方法,这个方法会去判断c->rbytes中是否还有剩余的报文没处理,如果未处理,则转交到conn_parse_cmd(第四步)继续解析剩余命令;如果已经处理了,则转交到conn_waiting,等待新的事件到来。在转交之前,每次都会执行一次conn_shrink方法 | 9 | conn_shrink方法主要用来处理命令报文容器c->rbuf和输出内容的容器是否数据满了?是否需要扩大buffer的大小,是否需要移动内存块。接受命令报文的初始化内存块大小2048,最大8192 |
?1)memcached启动后,主线程监听并准备接受新连接接入。当有新连接接入时,主线程进入 conn_listening 状态,accept 新连接,并将新连接调度给工作线程。
2)Worker 线程监听管道,当收到主线程通过管道发送的消息后,工作线程中的连接进入 conn_new_cmd 状态,创建 conn 结构体,并做一些初始化重置操作,然后进入 conn_waiting 状态,注册读事件,并等待网络 IO。
3)有数据到来时,连接进入 conn_read 状态,读取网络数据。
4)读取成功后,就进入 conn_parse_cmd 状态,然后根据 Mc 协议解析指令。
5)对于读取指令,获取到 value 结果后,进入 conn_mwrite 状态。
6)对于变更指令,则进入 conn_nread,进行 value 的读取,读取到 value 后,对 key 进行变更,当变更完毕后,进入 conn_write,然后将结果写入缓冲。然后和读取指令一样,也进入 conn_mwrite 状态。
7)进入到 conn_mwrite 状态后,将结果响应发送给 client。发送响应完毕后,再次进入到 conn_new_cmd 状态,进行连接重置,准备下一次命令处理循环。
8)在读取、解析、处理、响应过程,遇到任何异常就进入 conn_closing,关闭连接
3.3.1 状态机解析
状态机 | 说明 | conn_new_cmd | 主线程通过调用 dispatch_conn_new,把新连接调度给工作线程后,worker 线程创建 conn 对象,这个连接初始状态就是 conn_new_cmd。除了通过新建连接进入 conn_new_cmd 状态之外,如果连接命令处理完毕,准备接受新指令时,也会将连接的状态设置为 conn_new_cmd 状态。 进入 conn_new_cmd 后,工作线程会调用 reset_cmd_handler 函数,重置 conn 的 cmd 和 substate 字段,并在必要时对连接 buf 进行收缩。因为连接在处理 client 来的命令时,对于写指令,需要分配较大的读 buf 来存待更新的 key value,而对于读指令,则需要分配较大的写 buf 来缓冲待发送给 client 的 value 结果。持续运行中,随着大 size value 的相关操作,这些缓冲会占用很多内存,所以需要设置一个阀值,超过阀值后就进行缓冲内存收缩,避免连接占用太多内存。在后端服务以及中间件开发中,这个操作很重要,因为线上服务的连接很容易达到万级别,如果一个连接占用几十 KB 以上的内存,后端系统仅连接就会占用数百 MB 甚至数 GB 以上的内存空间。 工作线程处理完 conn_new_cmd 状态的主要逻辑后,如果读缓冲区有数据可以读取,则进入 conn_parse_cmd 状态,否则就会进入到 conn_waiting 状态,等待网络数据进来。 | conn_waiting | 连接进入 conn_waiting 状态后,处理逻辑很简单,直接通过 update_event 函数注册读事件即可,之后会将连接状态更新为 conn_read。 | conn_read | 当工作线程监听到网络数据进来,连接就进入 conn_read 状态。对 conn_read 的处理,是通过 try_read_network 从 socket 中读取网络数据。如果读取失败,则进入 conn_closing 状态,关闭连接。如果没有读取到任何数据,则会返回 conn_waiting,继续等待 client 端的数据到来。如果读取数据成功,则会将读取的数据存入 conn 的 rbuf 缓冲,并进入 conn_parse_cmd 状态,准备解析 cmd | conn_parse_cmd | conn_parse_cmd 状态的处理逻辑就是解析命令。工作线程首先通过 try_read_command 读取连接的读缓冲,并通过 \n 来分隔数据报文的命令。如果命令首行长度大于 1024,关闭连接,这就意味着 key 长度加上其他各项命令字段的总长度要小于 1024 字节。当然对于 key,Mc 有个默认的最大长度,key_max_length,默认设置为 250 字节。校验完毕首行报文的长度,接下来会在 process_command 函数中对首行指令进行处理。 备注: conn_parse_cmd 的状态处理,只有读取到 \n,有了完整的命令首行协议,才会进入 process_command,否则会跳转到 conn_waiting,继续等待客户端的命令数据报文。在 process_command 处理中,如果是获取类命令,在获取到 key 对应的 value 后,则跳转到 conn_mwrite,准备写响应给连接缓冲。而对于 update 变更类型的指令,则需要继续读取 value 数据,此时连接会跳转到 conn_nread 状态。在 conn_parse_cmd 处理过程中,如果遇到任何失败,都会跳转到 conn_closing 关闭连接 | conn_write | 连接 conn_write 状态处理逻辑很简单,直接进入 conn_mwrite 状态。或者当 conn 的 iovused 为 0 或对于 udp 协议,将响应写入 conn 消息缓冲后,再进入 conn_mwrite 状态。 | conn_mwrite | 进入 conn_mwrite 状态后,工作线程将通过 transmit 来向客户端写数据。如果写数据失败,跳转到 conn_closing,关闭连接退出状态机。如果写数据成功,则跳转到 conn_new_cmd,准备下一次新指令的获取 | conn_closing | 最后一个 conn_closing 状态,前面提到过很多次,在任何状态的处理过程中,如果出现异常,就会进入到这个状态,关闭连接。 |
?
3.3.2 状态机源码分析
3.3.2.1 conn_new_cmd
conn_new_cmd内部通过reset_cmd_handler将状态设置为conn_parse_cmd,重新进入命令解析过程。重新进行一个大循环
??????? case conn_new_cmd: ??????????? /* Only process nreqs at a time to avoid starving other ?????????????? connections */ ??????????? --nreqs; ??????????? if (nreqs >= 0) { ??????????????? reset_cmd_handler(c); ??????????? } else if (c->resp_head) { ??????????????? // flush response pipe on yield. ??????????????? conn_set_state(c, conn_mwrite); ??????????? } else { ??????????????? pthread_mutex_lock(&c->thread->stats.mutex); ??????????????? c->thread->stats.conn_yields++; ??????????????? pthread_mutex_unlock(&c->thread->stats.mutex); ??????????????? if (c->rbytes > 0) { ????????? ??????????/* We have already read in data into the input buffer, ?????????????????????? so libevent will most likely not signal read events ?????????????????????? on the socket (unless more data is available. As a ?????????????????????? hack we should just put in a request to write data, ?????????????????????? because that should be possible ;-) ??????????????????? */ ??????????????????? if (!update_event(c, EV_WRITE | EV_PERSIST)) { ??????????????????????? if (settings.verbose > 0) ??????????????????????? ????fprintf(stderr, "Couldn't update event\n"); ??????????????????????? conn_set_state(c, conn_closing); ??????????????????????? break; ??????????????????? } ??????????????? } ??????????????? stop = true; ??????????? } static void reset_cmd_handler(conn *c) { ??? c->cmd = -1; ??? c->substate = bin_no_state; ??? if (c->item != NULL) { ??????? // TODO: Any other way to get here? ??????? // SASL auth was mistakenly using it. Nothing else should? ??????? if (c->item_malloced) { ??????????? free(c->item); ???? ???????c->item_malloced = false; ??????? } else { ??????????? item_remove(c->item); ??????? } ??????? c->item = NULL; ??? } ??? if (c->rbytes > 0) { ??????? conn_set_state(c, conn_parse_cmd); ??? } else if (c->resp_head) { ??????? conn_set_state(c, conn_mwrite); ??? } else { ??????? conn_set_state(c, conn_waiting); ??? } } |
?
客户端输入的第一条命令时,epoll触发了两次函数(epoll为水平触发,没有read的话会有第二次触发)
3.3.2.2 conn_waiting
??????? case conn_waiting: ??????????? rbuf_release(c); ??????????? if (!update_event(c, EV_READ | EV_PERSIST)) { ??????????????? if (settings.verbose > 0) ??????????????????? fprintf(stderr, "Couldn't update event\n"); ??????????????? conn_set_state(c, conn_closing); ??????????????? break; ??????????? } ??????????? conn_set_state(c, conn_read); ??????????? stop = true; ??????????? break; |
3.3.2.3 conn_read
memcached通过try_read_network方法读取客户端的报文。如果读取失败,则返回conn_closing,去关闭客户端的连接;如果没有读取到任何数据,则会返回conn_waiting,继续等待客户端的事件到来,并且退出drive_machine的循环;如果数据读取成功,则会将状态转交给conn_parse_cmd处理,读取到的数据会存储在c->rbuf容器中
??????? case conn_read: ??????????? if (!IS_UDP(c->transport)) { ??????????????? // Assign a read buffer if necessary. ??????????????? if (!rbuf_alloc(c)) { ??????????????????? // TODO: Some way to allow for temporary failures. ??????????????????? conn_set_state(c, conn_closing); ??????????????????? break; ??????????????? } ??????????????? res = try_read_network(c); ??????????? } else { ??????????????? // UDP connections always have a static buffer. ??????????????? res = try_read_udp(c); ??????????? } ??????????? switch (res) { ??????????? case READ_NO_DATA_RECEIVED: ??????????????? conn_set_state(c, conn_waiting); ??????????????? break; ??????????? case READ_DATA_RECEIVED: ??????????????? conn_set_state(c, conn_parse_cmd); ??????????????? break; ??????????? case READ_ERROR: ? ??????????????conn_set_state(c, conn_closing); ??????????????? break; ??????????? case READ_MEMORY_ERROR: /* Failed to allocate more memory */ ??????????????? /* State already set by try_read_network */ ??????????????? break; ??????????? } ??????????? break; |
3.3.2.4 conn_parse_cmd
这个方法主要是用来读取rbuf中的命令的。因为数据报文会有粘包和拆包的特性,所以只有等到命令行完整了才能进行解析。所有只有匹配到了\n符号,才能匹配一个完整的命令。在整个解析过程中,每次解析到\n符号就说明一个完整的命令了,然后就进入处理这个命令的过程,进行处理后返回客户端后再次解析。
??????? case conn_parse_cmd: ??????????? c->noreply = false; ??????????? if (c->try_read_command(c) == 0) { ??????????????? /* we need more data! */ ??????????????? if (c->resp_head) { ??????????????????? // Buffered responses waiting, flush in the meantime. ??????????????????? conn_set_state(c, conn_mwrite); ??????????????? } else { ???? ???????????????conn_set_state(c, conn_waiting); ??????????????? } ??????????? } ??????????? break; |
int try_read_command_ascii(conn *c) { ??? char *el, *cont; ??? if (c->rbytes == 0) ??????? return 0; ??? el = memchr(c->rcurr, '\n', c->rbytes); ??? if (!el) { ??????? if (c->rbytes > 2048) { ??????????? char *ptr = c->rcurr; ??????????? while (*ptr == ' ') { /* ignore leading whitespaces */ ??????????????? ++ptr; ??????????? } ??????????? if (ptr - c->rcurr > 100 || ??????????????? (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) { ??????????????? conn_set_state(c, conn_closing); ??????????????? return 1; ??????????? } ??????????? if (!c->rbuf_malloced) { ??????????????? if (!rbuf_switch_to_malloc(c)) { ??????????????????? conn_set_state(c, conn_closing); ??????????????????? return 1; ??????????????? } ??????????? } ??????? } ??????? return 0; ??? } ??? cont = el + 1; ??? if ((el - c->rcurr) > 1 && *(el - 1) == '\r') { ??????? el--; ??? } ??? *el = '\0'; ??? assert(cont <= (c->rcurr + c->rbytes)); ??? c->last_cmd_time = current_time; ??? process_command_ascii(c, c->rcurr); ??? c->rbytes -= (cont - c->rcurr); ??? c->rcurr = cont; ??? assert(c->rcurr <= (c->rbuf + c->rsize)); ??? return 1; } |
tokenize_command需要分析的下一个细节就是关于最后一个元素的问题,如果解析的命令个数没有达到max_tokens,最后一个元素内容为空,如果达到了max_tokens,最后一个元素时剩余的未解析字符串
static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) { ??? char *s, *e; ??? size_t ntokens = 0; ??? assert(command != NULL && tokens != NULL && max_tokens > 1); ??? size_t len = strlen(command); ??? unsigned int i = 0; ??? s = e = command; ??? for (i = 0; i < len; i++) { ??????? if (*e == ' ') { ??????????? if (s != e) { ??????????????? tokens[ntokens].value = s; ??????????????? tokens[ntokens].length = e - s; ??????????????? ntokens++; ??????????????? *e = '\0'; ??????????????? if (ntokens == max_tokens - 1) { ??????????????????? e++; ??????????????????? s = e; /* so we don't add an extra token */ ??????????????????? break; ??????????????? } ??????????? } ??????????? s = e + 1; ??????? } ??????? e++; ??? } ??? if (s != e) { ??????? tokens[ntokens].value = s; ??????? tokens[ntokens].length = e - s; ??????? ntokens++; ??? } ??? /* ???? * If we scanned the whole string, the terminal value pointer is null, ???? * otherwise it is the first unprocessed character. ???? */ ??? tokens[ntokens].value =? *e == '\0' ? NULL : e; ??? tokens[ntokens].length = 0; ??? ntokens++; ??? return ntokens; } |
void process_command_ascii(conn *c, char *command) { ??? token_t tokens[MAX_TOKENS]; ??? size_t ntokens; ??? int comm; ??? assert(c != NULL); ??? MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes); ??? if (settings.verbose > 1) ??????? fprintf(stderr, "<%d %s\n", c->sfd, command); ??? // Prep the response object for this query. ??? if (!resp_start(c)) { ??????? conn_set_state(c, conn_closing); ??????? return; ??? } ??? ntokens = tokenize_command(command, tokens, MAX_TOKENS); ??? // All commands need a minimum of two tokens: cmd and NULL finalizer ??? // There are also no valid commands shorter than two bytes. ??? if (ntokens < 2 || tokens[COMMAND_TOKEN].length < 2) { ??????? out_string(c, "ERROR"); ??????? return; ??? } ??? // Meta commands are all 2-char in length. ??? char first = tokens[COMMAND_TOKEN].value[0]; ??? if (first == 'm' && tokens[COMMAND_TOKEN].length == 2) { ??????? switch (tokens[COMMAND_TOKEN].value[1]) { ????????? ………………………………………….. ??? } else if (first == 'g') { ??????? // Various get commands are very common. ??????? WANT_TOKENS_MIN(ntokens, 3); ??????? if (strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) { ??????????? process_get_command(c, tokens, ntokens, false, false); ??????? } else if (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0) { ??????????? process_get_command(c, tokens, ntokens, true, false); ??????? } else if (strcmp(tokens[COMMAND_TOKEN].value, "gat") == 0) { ??????????? process_get_command(c, tokens, ntokens, false, true); ??????? } else if (strcmp(tokens[COMMAND_TOKEN].value, "gats") == 0) { ??????????? process_get_command(c, tokens, ntokens, true, true); ??????? } else { ??????????? out_string(c, "ERROR"); ??????? } ??? } else if (first == 's') { ??????? if (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) { ??????????? WANT_TOKENS_OR(ntokens, 6, 7); ??????????? process_update_command(c, tokens, ntokens, comm, false); ??????? } else if (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0) { ??????????? process_stat(c, tokens, ntokens); ??????? } else if (strcmp(tokens[COMMAND_TOKEN].value, "shutdown") == 0) { ??????????? process_shutdown_command(c, tokens, ntokens); ??????? } else if (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0) { ??????????? process_slabs_command(c, tokens, ntokens); ??????? } else { ??????????? out_string(c, "ERROR"); ??????? } ……………………………………….. |
process_command_ascii 根据tokens里存储的命令字段进行不同的操作,按照set 操作进行分析。
process_update_command函数申请分配一个item后,并没有直接把这个item插入到LRU队列和哈希表中,而不过用conn结构体的item成员指向这个申请得到的item,而且用ritem成员指向item结构体的数据域(这为了方便写入数据)。最后conn的状态改动为conn_nread
static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) { ? ……………………………………………………. ??? item *it; ??? assert(c != NULL); ??? set_noreply_maybe(c, tokens, ntokens); ??? if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) { ??????? out_string(c, "CLIENT_ERROR bad command line format"); ??????? return; ??? } ??? key = tokens[KEY_TOKEN].value; ??? nkey = tokens[KEY_TOKEN].length; ??? if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags) ?????????? && safe_strtol(tokens[3].value, &exptime_int) ?????????? && safe_strtol(tokens[4].value, (int32_t *)&vlen))) { ??????? out_string(c, "CLIENT_ERROR bad command line format"); ??????? return; ??? } ??? exptime = realtime(EXPTIME_TO_POSITIVE_TIME(exptime_int)); ??? // does cas value exist? ??? if (handle_cas) { ??????? if (!safe_strtoull(tokens[5].value, &req_cas_id)) { ??????????? out_string(c, "CLIENT_ERROR bad command line format"); ??????????? return; ??????? } ??? } ??? if (vlen < 0 || vlen > (INT_MAX - 2)) { ??????? out_string(c, "CLIENT_ERROR bad command line format"); ??????? return; ??? } ??? vlen += 2; it = item_alloc(key, nkey, flags, exptime, vlen); ………………………………………………… ? ??ITEM_set_cas(it, req_cas_id); ??? c->item = it; ??? c->ritem = ITEM_data(it); ??? c->rlbytes = it->nbytes; ??? c->cmd = comm; ??? conn_set_state(c, conn_nread); } |
3.3.2.5 conn_nread
conn_parse_cmd状态机中的process_update_command命令处理过程是没有把item的数据写入到item结构体中。只是把状态机迁移到了conn_nread就退出到drive_machine函数中。
尽管process_update_command留下了尾巴,但它也用conn的成员变量记录了一些重要值,用于填充item的数据域。rlbytes表示须要用多少字节填充item(需要填充的数据的长度),rbytes表示读缓冲区还有多少字节能够使用,ritem指向数据填充地点。
??????? case conn_nread: ??????????? if (c->rlbytes == 0) { ??????????????? complete_nread(c); ??????????????? break; ??????????? } ??????????? /* Check if rbytes < 0, to prevent crash */ ??????????? if (c->rlbytes < 0) { ??????????????? if (settings.verbose) { ??????????????????? fprintf(stderr, "Invalid rlbytes to read: len %d\n", c->rlbytes); ??????????????? } ??????????????? conn_set_state(c, conn_closing); ??????????????? break; ??????????? } ??????????? if (c->item_malloced || ((((item *)c->item)->it_flags & ITEM_CHUNKED) == 0) ) { ??????????????? /* first check if we have leftovers in the conn_read buffer */ ??????????????? if (c->rbytes > 0) { ??????????????????? int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes; ??????????????????? memmove(c->ritem, c->rcurr, tocopy); ??????????????????? c->ritem += tocopy; ??????????????????? c->rlbytes -= tocopy; ??????????????????? c->rcurr += tocopy; ??????????????????? c->rbytes -= tocopy; ??????????????????? if (c->rlbytes == 0) { ??????????????????????? break; ??????????????????? } ??????????????? } ??????????????? /*? now try reading from the socket */ ??????????????? res = c->read(c, c->ritem, c->rlbytes); ??????????????? if (res > 0) { ??????????????????? pthread_mutex_lock(&c->thread->stats.mutex); ??????????????????? c->thread->stats.bytes_read += res; ??????????????????? pthread_mutex_unlock(&c->thread->stats.mutex); ??????????????????? if (c->rcurr == c->ritem) { ??????????????????? ????c->rcurr += res; ??????????????????? } ??????????????????? c->ritem += res; ??????????????????? c->rlbytes -= res; ??????????????????? break; ??????????????? } ??????????? } else { ??????????????? res = read_into_chunked_item(c); ??????????????? if (res > 0) ??????????????????? break; ??????????? } ??????????? if (res == 0) { /* end of stream */ ??????????????? c->close_reason = NORMAL_CLOSE; ??????????????? conn_set_state(c, conn_closing); ??????????????? break; ??????????? } ??????????? if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { ??????????????? if (!update_event(c, EV_READ | EV_PERSIST)) { ??????????????????? if (settings.verbose > 0) ??????????????????????? fprintf(stderr, "Couldn't update event\n"); ??????????????????? conn_set_state(c, conn_closing); ??????????????????? break; ??????????????? } ??????????????? stop = true; ??????????????? break; ??????????? } ??????????? /* Memory allocation failure */ ??????????? if (res == -2) { ??????????????? out_of_memory(c, "SERVER_ERROR Out of memory during read"); ??????????????? c->sbytes = c->rlbytes; ??????????????? conn_set_state(c, conn_swallow); ??????????????? // Ensure this flag gets cleared. It gets killed on conn_new() ??????????? ????// so any conn_closing is fine, calling complete_nread is ??????????????? // fine. This swallow semms to be the only other case. ??????????????? c->set_stale = false; ??????????????? c->mset_res = false; ??????????????? break; ??????????? } ?????????? ?/* otherwise we have a real error, on which we close the connection */ ??????????? if (settings.verbose > 0) { ??????????????? fprintf(stderr, "Failed to read, and not due to blocking:\n" ??????????????????????? "errno: %d %s \n" ??????????????????????? "rcurr=%p ritem=%p rbuf=%p rlbytes=%d rsize=%d\n", ??????????????????????? errno, strerror(errno), ??????????????????????? (void *)c->rcurr, (void *)c->ritem, (void *)c->rbuf, ??????????????????????? (int)c->rlbytes, (int)c->rsize); ??????????? } ????????? ??conn_set_state(c, conn_closing); ??????????? break; |
当rlbytes值减少到0后,代表需要的数据值全部读取出来了,会进行complete_nread处理,
void complete_nread_ascii(conn *c) { ??? assert(c != NULL); ??? item *it = c->item; ??? int comm = c->cmd; ??? enum store_item_type ret; ??? bool is_valid = false; ??? pthread_mutex_lock(&c->thread->stats.mutex); ??? c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++; ??? pthread_mutex_unlock(&c->thread->stats.mutex); ??? if ((it->it_flags & ITEM_CHUNKED) == 0) { ??????? if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) == 0) { ??????????? is_valid = true; ??????? } ??? } ??? if (!is_valid) { ??????? // metaset mode always returns errors. ??????? if (c->mset_res) { ??????????? c->noreply = false; ??????? } ??????? out_string(c, "CLIENT_ERROR bad data chunk"); ??? } else { ???? ?ret = store_item(it, comm, c); …………………………………………. ????? if (c->mset_res) { ????????? _finalize_mset(c, ret); ????? } else { ????????? switch (ret) { ????????? case STORED: ????????????? out_string(c, "STORED"); ????????????? break; ????????? case EXISTS: ????????????? out_string(c, "EXISTS"); ????????????? break; ????????? case NOT_FOUND: ????????????? out_string(c, "NOT_FOUND"); ????????? ????break; ????????? case NOT_STORED: ????????????? out_string(c, "NOT_STORED"); ????????????? break; ????????? default: ????????????? out_string(c, "SERVER_ERROR Unhandled storage type."); ????????? } ????? } ??? } ??? c->set_stale = false; /* force flag to be off just in case */ ??? c->mset_res = false; ??? item_remove(c->item);?????? /* release the c->item reference */ ??? c->item = 0; } |
根据处理结果给client返回不同信息,并把状态机迁移到conn_new_cmd
void out_string(conn *c, const char *str) { ??? size_t len; ??? assert(c != NULL); ??? mc_resp *resp = c->resp; ??? // if response was original filled with something, but we're now writing ??? // out an error or similar, have to reset the object first. ??? // TODO: since this is often redundant with allocation, how many callers ?? ?// are actually requiring it be reset? Can we fast test by just looking at ??? // tosend and reset if nonzero? ??? resp_reset(resp); ??? if (c->noreply) { ??????? // TODO: just invalidate the response since nothing's been attempted ??????? // to send yet? ??????? resp->skip = true; ??????? if (settings.verbose > 1) ??????????? fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str); ??????? conn_set_state(c, conn_new_cmd); ??????? return; ??? } ??? if (settings.verbose > 1) ??????? fprintf(stderr, ">%d %s\n", c->sfd, str); ??? // Fill response object with static string. ??? len = strlen(str); ??? if ((len + 2) > WRITE_BUFFER_SIZE) { ??????? /* ought to be always enough. just fail for simplicity */ ??????? str = "SERVER_ERROR output line too long"; ??????? len = strlen(str); ??? } ??? memcpy(resp->wbuf, str, len); ??? memcpy(resp->wbuf + len, "\r\n", 2); ??? resp_add_iov(resp, resp->wbuf, len + 2); ??? conn_set_state(c, conn_new_cmd); ??? return; } |
在conn_new_cmd状态下,判断resp_head不为空,迁移状态到conn_mwrite
3.3.2.6 conn_mwrite
??????? case conn_write: ??????? case conn_mwrite: ? ????????????for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) { ??????????????? if (q->stack_ctx != NULL) { ??????????????????? io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type); ??????????????????? qcb->submit_cb(q); ??????????????????? c->io_queues_submitted++; ??????????????? } ??????????? } ??????????? if (c->io_queues_submitted != 0) { ??????????????? conn_set_state(c, conn_io_queue); ????? ??????????event_del(&c->event); ?? ?????????????stop = true; ??????????????? break; ??????????? } ??????????? switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) { ??????????? case TRANSMIT_COMPLETE: ??????????????? if (c->state == conn_mwrite) { ??????????????????? // Free up IO wraps and any half-uploaded items. ??????????????????? conn_release_items(c); ??????????????????? conn_set_state(c, conn_new_cmd); ??????????????????? if (c->close_after_write) { ??????????????????????? conn_set_state(c, conn_closing); ????????????????? ??} ??????????????? } else { ??????????????????? if (settings.verbose > 0) ??????????????????????? fprintf(stderr, "Unexpected state %d\n", c->state); ??????????????????? conn_set_state(c, conn_closing); ??????????????? } ??????????????? break; |
发送完出返回TRANSMIT_COMPLETE,迁移状态机到conn_new_cmd。
4 memcached数据存储
序号 | 描述 | 1 | Memcached在启动的时候,会默认初始化一个HashTable,这个table的默认长度为65536 | 2 | 我们将这个HashTable中的每一个元素称为桶,每个桶就是一个item结构的单向链表 | 3 | Memcached会将key值hash成一个变量名称为hv的uint32_t类型的值 | 4 | 通过hv与桶的个数之间的按位与计算,hv & hashmask(hashpower),就可以得到当前的key会落在哪个桶上面 | 5 | 然后会将item挂到这个桶的链表上面。链表主要是通过item结构中的h_next实现 |
4.1 Memcached存储结构分析
assoc_init负责初始化hashtable数据结构,通过初始化hashsize(hashpower)大小的数组指针,默认应该是2*16次方大小的数组。
void assoc_init(const int hashtable_init) { ??? if (hashtable_init) { ??????? hashpower = hashtable_init; ??? } ??? primary_hashtable = calloc(hashsize(hashpower), sizeof(void *)); ??? if (! primary_hashtable) { ??????? fprintf(stderr, "Failed to init hashtable.\n"); ??????? exit(EXIT_FAILURE); ??? } ??? STATS_LOCK(); ??? stats_state.hash_power_level = hashpower; ? ??stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *); ??? STATS_UNLOCK(); } |
?
Memcached存储数据结构item定义,item的结构分两部分, 第一部分定义 item 结构的属性,第二部分是 item 的数据
typedef struct _stritem { ??? /* Protected by LRU locks */ ??? struct _stritem *next; ??? struct _stritem *prev; ??? /* Rest are protected by an item lock */ ??? struct _stritem *h_next;??? /* hash chain next */ ??? rel_time_t????? time;?????? /* least recent access */ ??? rel_time_t????? exptime;??? /* expire time */ ??? int???????????? nbytes;???? /* size of data */ ??? unsigned short? refcount; ??? uint16_t??????? it_flags;?? /* ITEM_* above */ ??? uint8_t???????? slabs_clsid;/* which slab class we're in */ ??? uint8_t???????? nkey;?????? /* key length, w/terminating null and padding */ ??? /* this odd type prevents type-punning issues when we do ???? * the little shuffle to save space when not using CAS. */ ??? union { ??????? uint64_t cas; ??????? char end; ??? } data[]; ??? /* if it_flags & ITEM_CAS we have 8 bytes CAS */ ??? /* then null-terminated key */ ??? /* then " flags length\r\n" (no terminating null) */ ??? /* then data with terminating \r\n (no terminating null; it's binary!) */ } item; |
4.2 数据查找过程
1)首先通过key的hash值hv找到对应的桶,区分是否在扩容。 primary_hashtable[hv & hashmask(hashpower)];
2)然后遍历桶的单链表,比较key值并找到对应item。
item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) { ??? item *it; ??? uint64_t oldbucket; ??? if (expanding && ??????? (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket) ??? { ??????? it = old_hashtable[oldbucket]; ??? } else { ??????? it = primary_hashtable[hv & hashmask(hashpower)]; ? ??} ??? item *ret = NULL; ??? int depth = 0; ??? while (it) { ??????? if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) { ??????????? ret = it; ??????????? break; ??????? } ??????? it = it->h_next; ??????? ++depth; ??? } ??? MEMCACHED_ASSOC_FIND(key, nkey, depth); ??? return ret; } |
4.3 数据插入过程
1)首先通过key的hash值hv找到对应的桶。
2)然后将item放到对应桶的单链表的头部
int assoc_insert(item *it, const uint32_t hv) { ??? uint64_t oldbucket; //??? assert(assoc_find(ITEM_key(it), it->nkey) == 0);? /* shouldn't have duplicately named things defined */ ??? if (expanding && ??????? (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket) ??? { ??????? it->h_next = old_hashtable[oldbucket]; ??????? old_hashtable[oldbucket] = it; ??? } else { ??????? it->h_next = primary_hashtable[hv & hashmask(hashpower)]; ??????? primary_hashtable[hv & hashmask(hashpower)] = it; ??? } ??? MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey); ??? return 1; } |
4.4数据删除过程
1)首先通过key的hash值hv找到对应的桶。
2)找到桶对应的链表,遍历单链表,删除对应的Item。
void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) { ??? item **before = _hashitem_before(key, nkey, hv); ??? if (*before) { ??????? item *nxt; ??????? /* The DTrace probe cannot be triggered as the last instruction ???????? * due to possible tail-optimization by the compiler ???????? */ ??????? MEMCACHED_ASSOC_DELETE(key, nkey); ??????? nxt = (*before)->h_next; ??????? (*before)->h_next = 0;?? /* probably pointless, but whatever. */ ??????? *before = nxt; ??????? return; ??? } ??? /* Note:? we never actually get here.? the callers don't delete things ?????? they can't find. */ ??? assert(*before != 0); } |
4.5 数据扩容过程
1)数据扩容过程是由一个单独线程在检测是否需要扩容,扩容的前提条件是curr_items > (hashsize(hashpower) * 3) / 2,也就是说数据量是原来的1.5倍
2)检测需要扩容后通过信号通知pthread_cond_signal(&maintenance_cond)开始执行扩容
3)以2倍的扩容速度进行扩容,primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *))
4)迁移过程是一个逐步迁移过程,每次都只迁移一个桶里面的Item数据
5 LRU内存回收
以往的LRU算法,基本做法都是这样的:
- 创建一个LRU链表,每次新加入的元素都放在链表头。
- 如果元素被访问了一次,同样从当前链表中摘除放到链表头。
3)需要淘汰元素时,从链表尾开始找可以淘汰的元素出来淘汰。
这个算法有如下几个问题:
1)元素被访问一次就会被放到LRU链表的头部,这样即便这个元素可以被淘汰,也会需要很久才会淘汰掉这个元素。
2)由于上面的原因,从链表尾部开始找可以淘汰的元素时,实际可能访问到的是一些虽然不常被访问,但是还没到淘汰时间(即有效时间TTL还未过期)的数据,这样会一直沿着链表往前找很久才能找到适合淘汰的元素。由于这个查找被淘汰元素的过程是需要加锁保护的,加锁时间一长影响了系统的并发。
5.1 改进的分段LRU算法(Segmented LRU)
分段LRU算法中将LRU链表根据活跃度 分成了4类:
- HOT_LRU:存储热数据的LRU链表。
- WARM_LRU:存储温数据(即活跃度不如热数据)的LRU链表。
3)COLD_LRU:存储冷数据的LRU链表。
4)TEMP_LRU:存储临时数据(默认不开启)
需要说明的是:热(参数settings.hot_lru_pct )和暖(参数settings.warm_lru_pct )数据的占总体内存的比例有限制,而冷数据则无限。
#define HOT_LRU 0
#define WARM_LRU 64
#define COLD_LRU 128
#define TEMP_LRU 192
同时,使用了heads 和tails 两个数组用来保存LRU链表:
#define POWER_LARGEST 256 /* actual cap is 255 */
#define LARGEST_ID POWER_LARGEST
static item *heads[LARGEST_ID];
static item *tails[LARGEST_ID];
上面分析slabclass 的时候提到过,首先会根据被分配内存大小计算出来一个slabclass 数组的索引。在需要从LRU链表中淘汰数据时,由于LRU链表分为了上面三类,那么就还需要再进行一次slabid | lru id 计算(其实就是slabid + lru id ),到对应的LRU链表中查找元素:
?
由于从链表尾部往前查找可以淘汰的元素,中间可能会经历很多不能被淘汰的元素,影响了淘汰的速度,因此前面的分级LRU链表就能帮助程序快速识别出哪些元素可以被淘汰。三个分级的LRU链表之间的转换规则如下:
HOT_LRU | 在HOT LRU队列中的数据绝不会到HOT_LRU队列的前面,只会往更冷的队列中放。规则是:如果元素变得活跃,就放到WARM队列中;否则如果不活跃,就直接放到COLD队列中 | WARM_LRU | 如果WARM队列的元素变的活跃,就会移动到WARM队列头;否则往COLD队列移动 | COLD_LRU | COLD队列中的元素,都是不太活跃的了,所以当需要淘汰元素时都会首先到COLD LRU队列中找可以淘汰的数据。当一个在COLD队列的元素重新变成活跃元素时,并不会移动到COLD队列的头部,而是直接移动回去WARM队列 | PS:任何操作都不能将一个元素从WARM和COLD队列中移动回去HOT队列了,也就是从HOT队列中移动元素出去的操作是单向操作 |
?
原有LRU算法最大的问题是:只要元素被访问过一次,就马上会被移动到LRU链表的前面,影响了后面对这个元素的淘汰。
改进的算法中,加入了一个机制:只有当元素被访问两次以后,才会标记成活跃元素。
代码中引入了两个标志位,其置位的规则如下:
1)ITEM_FETCHED:第一次被访问时置位该标志位。
2)ITEM_ACTIVE:第二次被访问时(即it->it_flags & ITEM_FETCHED为true的情况下)置位该标志位。
3)INACTIVE:不活跃状态。
4)ITEM_ACTIVE标志位清除的规则,从链表尾遍历到某一个LRU链表时,该元素是链表的最后一个元素,则认为是不活跃的元素,即可以清除ITEM_ACTIVE标志位;
这样,有效避免了只访问一次就变成活跃元素的问题,所以元素变成活跃就意指“至少被访问两次以”。
5.2 memcached 内存回收
惰性删除 | memcached一般不会主动去清除已经过期或者失效的缓存,当get请求一个item才会去检查item是否失效 | flush命令 | flush命令会将所有的item设置为失效 | 创建的时候检查 | Memcached会在创建ITEM的时候去LRU的链表尾部开始检查,是否有失效的ITEM,如果没有的话就重新创建 | LRU爬虫 | memcached默认是关闭LRU爬虫的。LRU爬虫是一个单独的线程,会去清理失效的ITEM | LRU淘汰 | 当缓存没有内存可以分配给新的元素的时候,memcached会从LRU链表的尾部开始淘汰一个ITEM,不管这个ITEM是否还在有效期都将会面临淘汰。LRU链表插入缓存ITEM的时候有先后顺序,所以淘汰一个ITEM也是从尾部进行 也就是先淘汰最早的ITEM。 |
5.2.1惰性删除
惰性删除删除其实就是在get数据的时候进行比较判断数据是否过期,这里会跟flush_all命令过期结合起来使用,判断的时候依据了flush_all设置的过期时间settings.oldest_liv
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c, const bool do_update) {
item *it = assoc_find(key, nkey, hv);
if (it != NULL) {
refcount_incr(it);
/* Optimization for slab reassignment. prevents popular items from
* jamming in busy wait. Can only do this here to satisfy lock order
* of item_lock, slabs_lock. */
/* This was made unsafe by removal of the cache_lock:
* slab_rebalance_signal and slab_rebal.* are modified in a separate
* thread under slabs_lock. If slab_rebalance_signal = 1, slab_start =
* NULL (0), but slab_end is still equal to some value, this would end
* up unlinking every item fetched.
* This is either an acceptable loss, or if slab_rebalance_signal is
* true, slab_start/slab_end should be put behind the slabs_lock.
* Which would cause a huge potential slowdown.
* Could also use a specific lock for slab_rebal.* and
* slab_rebalance_signal (shorter lock?)
*/
/*if (slab_rebalance_signal &&
((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {
do_item_unlink(it, hv);
do_item_remove(it);
it = NULL;
}*/
}
int was_found = 0;
if (settings.verbose > 2) {
int ii;
if (it == NULL) {
fprintf(stderr, "> NOT FOUND ");
} else {
fprintf(stderr, "> FOUND KEY ");
}
for (ii = 0; ii < nkey; ++ii) {
fprintf(stderr, "%c", key[ii]);
}
}
if (it != NULL) {
was_found = 1;
if (item_is_flushed(it)) {
do_item_unlink(it, hv);
STORAGE_delete(c->thread->storage, it);
do_item_remove(it);
it = NULL;
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.get_flushed++;
pthread_mutex_unlock(&c->thread->stats.mutex);
if (settings.verbose > 2) {
fprintf(stderr, " -nuked by flush");
}
was_found = 2;
} else if (it->exptime != 0 && it->exptime <= current_time) {
do_item_unlink(it, hv);
STORAGE_delete(c->thread->storage, it);
do_item_remove(it);
it = NULL;
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.get_expired++;
pthread_mutex_unlock(&c->thread->stats.mutex);
if (settings.verbose > 2) {
fprintf(stderr, " -nuked by expire");
}
was_found = 3;
} else {
if (do_update) {
do_item_bump(c, it, hv);
}
DEBUG_REFCNT(it, '+');
}
}
if (settings.verbose > 2)
fprintf(stderr, "\n");
/* For now this is in addition to the above verbose logging. */
LOGGER_LOG(c->thread->l, LOG_FETCHERS, LOGGER_ITEM_GET, NULL, was_found, key,
nkey, (it) ? it->nbytes : 0, (it) ? ITEM_clsid(it) : 0, c->sfd);
return it;
}
5.2.2 flush_all命令删除
int item_is_flushed(item *it) { ??? rel_time_t oldest_live = settings.oldest_live; ??? uint64_t cas = ITEM_get_cas(it); ??? uint64_t oldest_cas = settings.oldest_cas; ??? if (oldest_live == 0 || oldest_live > current_time) ??????? return 0; ??? if ((it->time <= oldest_live) ??????????? || (oldest_cas != 0 && cas != 0 && cas < oldest_cas)) { ??????? return 1; ??? } ??? return 0; } |
5.2.3 新建item会检查数据过期
序号 | 描述 | 1 | do_item_alloc进入新增item的内存申请流程 | 2 | do_item_alloc_pull进入item申请的逻辑处理,最多处理10次 | 3 | do_item_alloc_pull内部逻辑是尝试通过slabs_alloc申请内存,失败则尝试通过lru_pull_tail方法释放LRU队列中的item变成可用item | 4 | lru_pull_tail执行释放LRU队列中item的过程,内部包括各种过期item的回收 |
5.2.4 LRU爬虫线程定时清理
后续分析
6 memcached应用中存在的问题
?
|