IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> memcached 源码分析 -> 正文阅读

[大数据]memcached 源码分析

1.Memcached概述

memcached是一个高性能的分布式内存缓存服务器,memcached在Linux上可以通过yum命令安装,这样方便很多,在生产环境下建议用Linux系统,memcached使用libevent这个库在Linux系统上才能发挥它的高性能。它的分布式其实在服务端是不具有分布式的特征的,是依靠客户端的分布式算法进行了分布式,memcached是一个纯内存型的数据库,这样在读写速度上相对来说比较快。

?

MemCache虽然被称为分布式缓存,但是MemCache本身完全不具备分布式的功能,MemCache集群之间不会相互通信(与之形成对比的,比如JBoss Cache,某台服务器有缓存数据更新时,会通知集群中其他机器更新缓存或清除缓存数据),所谓的分布式,完全依赖于客户端程序的实现,就像上面这张图的流程一样。

命 令

作 用

get

返回Key对应的Value值

add

添加一个Key值,没有则添加成功并提示STORED,有则失败并提示NOT_STORED

set

无条件地设置一个Key值,没有就增加,有就覆盖,操作成功提示STORED

replace

按照相应的Key值替换数据,如果Key值不存在则会操作失败

stats

返回MemCache通用统计信息

stats items

返回各个slab中item的数目和最老的item的年龄

stats slabs

返回MemCache运行期间创建的每个slab的信息

version

返回当前MemCache版本号

flush_all

清空所有键值,但不会删除items,所以此时MemCache依旧占用内存

quit

关闭连接

2.memcached内存管理

MemCache的数据存放在内存中,存放在内存中认为意味着几点:

1)访问数据的速度比传统的关系型数据库要快,因为Oracle、MySQL这些传统的关系型数据库为了保持数据的持久性,数据存放在硬盘中,IO操作速度慢

2)MemCache的数据存放在内存中同时意味着只要MemCache重启了,数据就会消失

3)既然MemCache的数据存放在内存中,那么势必受到机器位数的限制,32位机器最多只能使用2GB的内存空间,64位机器可以认为没有上限

MemCache采用的内存分配方式是固定空间分配

?

这里面涉及了slab_class、slab、page、chunk四个概念,它们之间的关系是:

1)MemCache将内存空间分为一组slab

2)每个slab下又有若干个page,每个page默认是1M,如果一个slab占用100M内存的话,那么这个slab下应该有100个page

3)每个page里面包含一组chunk,chunk是真正存放数据的地方,同一个slab里面的chunk的大小是固定的

4)有相同大小chunk的slab被组织在一起,称为slab_class

MemCache内存分配的方式称为allocator,slab的数量是有限的,几个、十几个或者几十个,这个和启动参数的配置相关。MemCache中的value过来存放的地方是由value的大小决定的,value总是会被存放到与chunk大小最接近的一个slab中,比如slab[1]的chunk大小为80字节、slab[2]的chunk大小为100字节、slab[3]的chunk大小为128字节(相邻slab内的chunk基本以1.25为比例进行增长,MemCache启动时可以用-f指定这个比例),那么过来一个88字节的value,这个value将被放到2号slab中。放slab的时候,首先slab要申请内存,申请内存是以page为单位的,所以在放入第一个数据的时候,无论大小为多少,都会有1M大小的page被分配给该slab。申请到page后,slab会将这个page的内存按chunk的大小进行切分,这样就变成了一个chunk数组,最后从这个chunk数组中选择一个用于存储数据。

如果这个slab中没有chunk可以分配了怎么办,如果MemCache启动没有追加-M(禁止LRU,这种情况下内存不够会报Out Of Memory错误),那么MemCache会把这个slab中最近最少使用的chunk中的数据清理掉,然后放上最新的数据。针对MemCache的内存分配及回收算法,总结三点:

1)MemCache的内存分配chunk里面会有内存浪费,88字节的value分配在128字节(紧接着大的用)的chunk中,就损失了30字节,但是这也避免了管理内存碎片的问题

MemCache的LRU算法不是针对全局的,是针对slab的

2)应该可以理解为什么MemCache存放的value大小是限制的,因为一个新数据过来,3)slab会先以page为单位申请一块内存,申请的内存最多就只有1M,所以value大小自然不能大于1M了

2.1MemCache的特性和限制

  1. 序号
  1. 限制描述
  1. 1
  1. MemCache中可以保存的item数据量是没有限制的,只要内存足够
  1. 2
  1. MemCache单进程在32位机中最大使用内存为2G,64位机则没有限制
  1. 3
  1. Key最大为250个字节,超过该长度无法存储
  1. 4
  1. 单个item最大数据是1MB,超过1MB的数据不予存储
  1. 5
  1. MemCache服务端是不安全的,比如已知某个MemCache节点,可以直接telnet过去,并通过flush_all让已经存在的键值对立即失效
  1. 6
  1. 不能够遍历MemCache中所有的item,因为这个操作的速度相对缓慢且会阻塞其他的操作
  1. 7
  1. MemCache的高性能源自于两阶段哈希结构:第一阶段在客户端,通过Hash算法根据Key值算出一个节点;第二阶段在服务端,通过一个内部的Hash算法,查找真正的item并返回给客户端。从实现的角度看,MemCache是一个非阻塞的、基于事件的服务器程序

2.2 slab内存管理

2.2.1 item 数据存储节点

typedef struct _stritem {

??? /* Protected by LRU locks */

??? //一个item的地址, 主要用于LRU链和freelist链

??? struct _stritem *next;

??? //下一个item的地址,主要用于LRU链和freelist链

??? struct _stritem *prev;



??? /* Rest are protected by an item lock */

??? //用于记录哈希表槽中下一个item节点的地址

??? struct _stritem *h_next;??? /* hash chain next */

??? //最近访问时间

??? rel_time_t????? time;?????? /* least recent access */

??? //缓存过期时间

??? rel_time_t????? exptime;??? /* expire time */

??? int???????????? nbytes;???? /* size of data */

??? //当前item被引用的次数,用于判断item是否被其它的线程在操作中

??? //refcount == 1的情况下该节点才可以被删除

??? unsigned short? refcount;

??? uint8_t???????? nsuffix;??? /* length of flags-and-length string */

??? uint8_t???????? it_flags;?? /* ITEM_* above */

??? //记录该item节点位于哪个slabclass_t中

??? uint8_t???????? slabs_clsid;/* which slab class we're in */

??? uint8_t?? ??????nkey;?????? /* key length, w/terminating null and padding */

??? /* this odd type prevents type-punning issues when we do

???? * the little shuffle to save space when not using CAS. */

??? union {

??????? uint64_t cas;

??????? char end;

??? } data[];

??? /* if it_flags & ITEM_CAS we have 8 bytes CAS */

??? /* then null-terminated key */

??? /* then " flags length\r\n" (no terminating null) */

??? /* then data with terminating \r\n (no terminating null; it's binary!) */

} item;

slab与chunk

slab是一块内存空间,默认大小为1M,memcached会把一个slab分割成一个个chunk, 这些被切割的小的内存块,主要用来存储item

slabclass

每个item的大小都可能不一样,item存储于chunk,如果chunk大小不够,则不足以分配给item使用,如果chunk过大,则太过于浪费内存空间。因此memcached采取的做法是,将slab切割成不同大小的chunk,这样就满足了不同大小item的存储。被划分不同大小chunk的slab的内存在memcached就是用slabclass这个结构体来表现的

typedef struct {
    unsigned int size;      /* sizes of items */
    unsigned int perslab;   /* how many items per slab */

    void *slots;           /* list of item ptrs */
    unsigned int sl_curr;   /* total free items in list */

    unsigned int slabs;     /* how many slabs were allocated for this class */

    void **slab_list;       /* array of slab pointers */
    unsigned int list_size; /* size of prev array */
} slabclass_t;

?

1)slabclass数组初始化的时候,每个slabclass_t都会分配一个1M大小的slab,slab会被切分为N个小的内存块,这个小的内存块的大小取决于slabclass_t结构上的size的大小

2)每个slabclass_t都只存储一定大小范围的数据,并且下一个slabclass切割的chunk块大于前一个slabclass切割的chunk块大小

3)memcached中slabclass数组默认大小为64,slabclass切割块大小的增长因子默认是1.25

例如:slabclass[1]切割的chunk块大小为100字节,slabclass[2]为125,如果需要存储一个110字节的缓存,那么就需要到slabclass[2] 的空闲链表中获取一个空闲节点进行存储。

2.2.2 slabclass的初始化

slabs_init

?????? |---> slabs_preallocate

?????????????????????????? |---> do_slabs_newslab

?????????????????????????????????????????? |--->grow_slab_list

?????????????????????????????????????????? |--->split_slab_page_into_freelist

????????

2.2.3 item节点的分配流程

序号

描述

1

根据大小,找到合适的slabclass

2

slabclass空闲列表中是否有空闲的item节点,如果有直接分配item,用于存储内容

3

空闲列表没有空闲的item可以分配,会重新开辟一个slab(默认大小为1M)的内存块,然后切割slab并放入到空闲列表中,然后从空闲列表中获取节点

void *slabs_alloc(size_t size, unsigned int id,

??????? unsigned int flags) {

??? void *ret;

??? pthread_mutex_lock(&slabs_lock);

??? ret = do_slabs_alloc(size, id, flags);

??? pthread_mutex_unlock(&slabs_lock);

??? return ret;

}

/*@null@*/

static void *do_slabs_alloc(const size_t size, unsigned int id,

??????? unsigned int flags) {

??? slabclass_t *p;

??? void *ret = NULL;

??? item *it = NULL;

??? if (id < POWER_SMALLEST || id > power_largest) {

??????? MEMCACHED_SLABS_ALLOCATE_FAILED(size, 0);

??????? return NULL;

??? }

??? p = &slabclass[id];

??? assert(p->sl_curr == 0 || (((item *)p->slots)->it_flags & ITEM_SLABBED));

??? assert(size <= p->size);

??? /* 没有空闲slab时,需要重新分配*/

??? if (p->sl_curr == 0 && flags != SLABS_ALLOC_NO_NEWPAGE) {

??????? do_slabs_newslab(id);

??? }

??? if (p->sl_curr != 0) {

??????? /* return off our freelist */

??????? it = (item *)p->slots;

??????? p->slots = it->next;

??????? if (it->next) it->next->prev = 0;

??????? /* Kill flag and initialize refcount here for lock safety in slab

???????? * mover's freeness detection. */

??????? it->it_flags &= ~ITEM_SLABBED;

??????? it->refcount = 1;

??????? p->sl_curr--;

??????? ret = (void *)it;

??? } else {

??????? ret = NULL;

??? }

??? if (ret) {

??????? MEMCACHED_SLABS_ALLOCATE(size, id, p->size, ret);

??? } else {

??????? MEMCACHED_SLABS_ALLOCATE_FAILED(size, id);

??? }

??? return ret;

}

2.2.4 item节点的释放

释放一个item节点,并不会free内存空间,而是将item节点归还到slabclass的空闲列表中

void slabs_free(void *ptr, size_t size, unsigned int id) {

??? pthread_mutex_lock(&slabs_lock);

??? do_slabs_free(ptr, size, id);

??? pthread_mutex_unlock(&slabs_lock);

}

static void do_slabs_free(void *ptr, const size_t size, unsigned int id) {

??? slabclass_t *p;

??? item *it;

??? assert(id >= POWER_SMALLEST && id <= power_largest);

??? if (id < POWER_SMALLEST || id > power_largest)

??????? return;

??? MEMCACHED_SLABS_FREE(size, id, ptr);

??? p = &slabclass[id];

??? it = (item *)ptr;

??? if ((it->it_flags & ITEM_CHUNKED) == 0) {

??????? it->it_flags = ITEM_SLABBED;

??????? it->slabs_clsid = id;

??????? it->prev = 0;

??????? it->next = p->slots;

??????? if (it->next) it->next->prev = it;

??????? p->slots = it;

??????? p->sl_curr++;

??? } else {

??????? do_slabs_free_chunked(it, size);

??? }

??? return;

}

3 Memcached网络模型

1

Memcached主要是基于Libevent 网络事件库进行开发的

2

Memcached的网络模型分为两部分:主线程和工作线程。主线程主要用来接收客户端的连接信息;工作线程主要用来接管客户端连接,处理具体的业务逻辑

3

主线程和工作线程之间主要是通过pipe管道来进行通信。当主线程接收到客户端的连接的时候,会通过轮询的方式选择一个工作线程,然后向该工作线程的管道pipe写数据。工作线程监听到管道中有数据写入的时候,就会触发代码逻辑去接管客户端的连接

4

每个工作线程也是基于Libevent的事件机制,当客户端有数据写入的时候,就会触发读取的操作

"主线程统一accept/dispatch子线程"的基础设施:主线程创建多个子线程(这些子线程也称为worker线程),每一个线程都维持自己的事件循环,即每个线程都有自己的epoll,并且都会调用epoll_wait函数进入事件监听状态。每一个worker线程(子线程)和主线程之间都用一条管道相互通信。每一个子线程都监听自己对应那条管道的读端。当主线程想和某一个worker线程进行通信,直接往对应的那条管道写入数据即可。

"主线程统一accept/dispatch子线程"模型的工作流程:主线程负责监听进程对外的TCP监听端口。当客户端申请连接connect到进程的时候,主线程负责接收accept客户端的连接请求。然后主线程选择其中一个worker线程,把客户端fd通过对应的管道传给worker线程。worker线程得到客户端的fd后负责和这个客户端进行一切的通信。

?

1. memcached使用libevent作为进行事件监听;

2.memcached往管道里面写的内容不是fd,而是一个简单的字符。每一个worker线程都维护一个CQ队列,主线程把fd和一些信息写入一个CQ_ITEM里面,然后主线程往worker线程的CQ队列里面push这个CQ_ITEM。接着主线程使用管道通知worker线程:“唤醒work线程处理新的链接请求”。

https://img-blog.csdn.net/20150114093937432

3.1 CQ_ITEM

memcached每一个worker线程都有一个CQ队列,主线程accept到新客户端后,就把新客户端的信息封装成一个CQ_ITEM,然后push到选定线程的CQ队列中

typedef struct conn_queue_item CQ_ITEM;

struct conn_queue_item {

??? int?????????????? sfd;

??? enum conn_states? init_state;

??? int?????????????? event_flags;

??? int?????????????? read_buffer_size;

??? enum network_transport???? transport;

??? enum conn_queue_item_modes mode;

??? conn *c;

??? void??? *ssl;

??? io_pending_t *io; // IO when used for deferred IO handling.

??? STAILQ_ENTRY(conn_queue_item) i_next;

};

/* A connection queue. */

typedef struct conn_queue CQ;

struct conn_queue {

??? STAILQ_HEAD(conn_ev_head, conn_queue_item) head;

??? pthread_mutex_t lock;

??? cache_t *cache; /* freelisted objects */

};

?可以看到结构体conn_queue(即CQ队列结构体)有一个pthread_mutex_t类型变量lock,这说明主线程往某个worker线程的CQ队列里面push一个CQ_ITEM的时候必然要加锁的。下面是初始化CQ队列,以及push、pop一个CQ_ITEM的代码

static void cq_init(CQ *cq) {

??? pthread_mutex_init(&cq->lock, NULL);

??? cq->head = NULL;

??? cq->tail = NULL;

}

static CQ_ITEM *cq_pop(CQ *cq) {

??? CQ_ITEM *item;

??? pthread_mutex_lock(&cq->lock);

??? item = cq->head;

??? if (NULL != item) {

??????? cq->head = item->next;

??????? if (NULL == cq->head)

??????????? cq->tail = NULL;

??? }

??? pthread_mutex_unlock(&cq->lock);

??? return item;

}

/*

?* Adds an item to a connection queue.

?*/

static void cq_push(CQ *cq, CQ_ITEM *item) {

??? item->next = NULL;

??? pthread_mutex_lock(&cq->lock);

??? if (NULL == cq->tail)

??????? cq->head = item;

??? else

??????? cq->tail->next = item;

??? cq->tail = item;

??? pthread_mutex_unlock(&cq->lock);

}

?

3.2 线程模型

?

?

3.2.1 主线程初始化逻辑

Memcached主线程的初始化逻辑比较简单,主要作用是启动监听的master线程和工作的worker线程。,其中启动worker线程通过memcached_thread_init函数进行实现,这部分逻辑分析在worker线程初始化当中进行分析,这里主要分析监听的master线程。整个master线程的启动过程就是socket的server端初始化结合libevent的初始化。整个过程如下:

1)server_sockets,该方法主要是遍历所有listen的socket列表并逐个进行绑定。

2)server_socket,该方法主要是操作单个socket到listen状态。

3)conn_new,将socket注册到libevent当中。

4)event_handler,监听socket的回调函数。

5)最后event_base_loop让整个libevent进行循环工作状态

int main (int argc, char **argv) {

?????? //检查libevent的版本是否足够新.1.3即可

??? if (!sanitycheck()) {

??????? return EX_OSERR;

??? }

?????? //对memcached的关键设置取默认值

??? settings_init();

?????? ...//解析memcached启动参数

?????? //main_base是一个struct event_base类型的全局变量

??? main_base = event_init();//为主线程创建一个event_base

??? conn_init();//先不管,后面会说到

?????? //创建settings.num_threads个worker线程,并且为每个worker线程创建一个CQ队列

?????? //并为这些worker申请各自的event_base,worker线程然后进入事件循环中??????

??? thread_init(settings.num_threads, main_base);

?????? //设置一个定时event(也叫超时event),定时(频率为一秒)更新current_time变量

?????? //这个超时event是add到全局变量main_base里面的,所以主线程负责更新current_time(这是一个很重要的全局变量)

??? clock_handler(0, 0, 0);

??? /* create the listening socket, bind it, and init */

??? if (settings.socketpath == NULL) {

??????? FILE *portnumber_file = NULL;

????????????? //创建监听客户端的socket

??????? if (settings.port && server_sockets(settings.port, tcp_transport,//tcp_transport是枚举类型

?????????????????????????????????????????? portnumber_file)) {

??????????? vperror("failed to listen on TCP port %d", settings.port);

??????????? exit(EX_OSERR);

??????? }

???????????? ...

??? }

??? if (event_base_loop(main_base, 0) != 0) {//主线程进入事件循环

??????? retval = EXIT_FAILURE;

??? }

??? return retval;

}

解析参数并把遍历所有的监听socket进行绑定。执行方法server_socket(p, the_port, transport, portnumber_file)

static int server_sockets(int port, enum network_transport transport,

????????????????????????? FILE *portnumber_file) {

??? if (settings.inter == NULL) {

??????? return server_socket(settings.inter, port, transport, portnumber_file);

??? } else {

??????? // tokenize them and bind to each one of them..

??????? char *b;

??????? int ret = 0;

??????? char *list = strdup(settings.inter);

??????? for (char *p = strtok_r(list, ";,", &b);

??????????? ret |= server_socket(p, the_port, transport, portnumber_file);

??????? }

??????? free(list);

??????? return ret;

??? }

}

?针对单个listen的socket的初始化过程,这里主要做的事情是socket的相关初始化过程,主要是指设置socket相关的一些参数;进行socket的bind操作;通过方法conn_new关联socket和libevent当中

static int server_socket(const char *interface,

???????????????????????? int port,

???????????????????????? enum network_transport transport,

???????????????????????? FILE *portnumber_file) {

??? int sfd;

??? struct linger ling = {0, 0};

??? struct addrinfo *ai;

??? struct addrinfo *next;

??? struct addrinfo hints = { .ai_flags = AI_PASSIVE,

????????????????????????????? .ai_family = AF_UNSPEC };

??? char port_buf[NI_MAXSERV];

??? int error;

??? int success = 0;

??? int flags =1;

??? for (next= ai; next; next= next->ai_next) {

??????? conn *listen_conn_add;

??????? if ((sfd = new_socket(next)) == -1) {

??????????? continue;

??????? }

??????? //todo 设置socket相关的属性,这里省略相关代码

??????? // 绑定socket,省略相关代码

??????? if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {}

??????? // 暂时只关心TCP协议的,忽略UDP协议实现

??????? if (IS_UDP(transport)) {

??????? } else {

??????????? if (!(listen_conn_add = conn_new(sfd, conn_listening,

???????????????????????????????????????????? EV_READ | EV_PERSIST, 1,

???????????????????????????????????????????? transport, main_base))) {

??????????????? fprintf(stderr, "failed to create listening connection\n");

?????????????? ?exit(EXIT_FAILURE);

??????????? }

??????????? listen_conn_add->next = listen_conn;

??????????? listen_conn = listen_conn_add;

??????? }

??? }

??? freeaddrinfo(ai);

??? /* Return zero iff we detected no errors in starting up connections */

??? return success == 0;

}

conn_new内部就是执行libevent相关的配置,包括event_set和event_base_set,这里需要关注的是event_set当中绑定了回调函数event_handler,用于连接到来后执行的逻辑

conn *conn_new(const int sfd, enum conn_states init_state,

??????????????? const int event_flags,

??????????????? const int read_buffer_size, enum network_transport transport,

??????????????? struct event_base *base, void *ssl) {

??? conn *c;

??? assert(sfd >= 0 && sfd < max_fds);

c = conns[sfd];

。。。。。。。。。。。

? ??event_set(&c->event, sfd, event_flags, event_handler, (void *)c);

??? event_base_set(base, &c->event);

??? c->ev_flags = event_flags;

??? if (event_add(&c->event, 0) == -1) {

??????? perror("event_add");

??????? return NULL;

??? }

??? return c;

}

回调函数event_handler的核心在于drive_machine,这个函数是整个Memcached的状态转移中心,所有的操作都通过drive_machine进行驱动来实现的

void event_handler(const evutil_socket_t fd, const short which, void *arg) {

??? conn *c;

??? c = (conn *)arg;

??? assert(c != NULL);

??? c->which = which;

??? /* sanity */

??? if (fd != c->sfd) {

??????? if (settings.verbose > 0)

??????????? fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");

??????? conn_close(c);

??????? return;

??? }

??? drive_machine(c);

??? /* wait for next event */

??? return;

}

3.2.2 work线程初始化

memcached_thread_init主要用于工作线程worker的初始化,核心的三个操作主要是:

1)初始化master线程和worker线程通信的pipe管道,pipe(fds)。

2)setup_thread,主要用于设置工作线程libevent相关的参数。

3)create_worker,主要是启动工作线程开始循环处理工作

void memcached_thread_init(int nthreads, void *arg) {

??? int???????? i;

??? int???????? power;

??? threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));

? ??for (i = 0; i < nthreads; i++) {

#ifdef HAVE_EVENTFD

??????? threads[i].notify_event_fd = eventfd(0, EFD_NONBLOCK);

??????? if (threads[i].notify_event_fd == -1) {

??????????? perror("failed creating eventfd for worker thread");

??????????? exit(1);

??? ????}

#else

??????? int fds[2];

??????? if (pipe(fds)) {

??????????? perror("Can't create notify pipe");

??????????? exit(1);

??????? }

??????? threads[i].notify_receive_fd = fds[0];

??????? threads[i].notify_send_fd = fds[1];

#endif

#ifdef EXTSTORE

???? ???threads[i].storage = arg;

#endif

??????? setup_thread(&threads[i]);

??????? /* Reserve three fds for the libevent base, and two for the pipe */

??????? stats_state.reserved_fds += 5;

??? }

??? /* Create threads after we've done all the libevent setup. */

??? for (i = 0; i < nthreads; i++) {

??????? create_worker(worker_libevent, &threads[i]);

??? }

??? /* Wait for all the threads to set themselves up before returning. */

??? pthread_mutex_lock(&init_lock);

??? wait_for_thread_registration(nthreads);

??? pthread_mutex_unlock(&init_lock);

}

setup_thread内部主要是初始化工作线程worker的libevent相关参数,这里我们重点关注包括:

1)回调函数thread_libevent_process。

2)初始化master线程和worker线程通信的队cq_init(me->new_conn_queue)

static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);
    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
    if (me->new_conn_queue == NULL) {
        perror("Failed to allocate memory for connection queue");
        exit(EXIT_FAILURE);
    }
    cq_init(me->new_conn_queue);
}

create_worker主要是启动工作线程worker使其开始工作就可以了。

create_worker(worker_libevent, &threads[i])传入函数是worker_libevent

通过pthread_create方法触发worker_libevent的工作,在worker_libevent方法内部通过event_base_loop最终使得libevent开始工作

static void create_worker(void *(*func)(void *), void *arg) {

??? pthread_attr_t? attr;

??? int???????????? ret;

??? pthread_attr_init(&attr);

??? if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {

??????? fprintf(stderr, "Can't create thread: %s\n",

??????????????? strerror(ret));

??????? exit(1);

??? }

}

static void *worker_libevent(void *arg) {

??? LIBEVENT_THREAD *me = arg;

??? /* Any per-thread setup can happen here; memcached_thread_init() will block until

???? * all threads have finished initializing.

???? */

??? me->l = logger_create();

??? me->lru_bump_buf = item_lru_bump_buf_create();

??? if (me->l == NULL || me->lru_bump_buf == NULL) {

??????? abort();

??? }

??? if (settings.drop_privileges) {

??????? drop_worker_privileges();

??? }

?? ?register_thread_initialized();

? ??event_base_loop(me->base, 0);

??? // same mechanism used to watch for all threads exiting.

?? ?register_thread_initialized();

??? event_base_free(me->base);

??? return NULL;

}

3.2.3 主从线程通信

在master线程接受连接以后会触发drive_machine方法,其中master的状态为conn_listening,最终我们通过dispatch_conn_new方法实现master到worker的分发操作

static void drive_machine(conn *c) {

??? bool stop = false;

??? int sfd;

??? socklen_t addrlen;

??? struct sockaddr_storage addr;

??? int nreqs = settings.reqs_per_event;

??? int res;

??? const char *str;

#ifdef HAVE_ACCEPT4

??? static int? use_accept4 = 1;

#else

??? static int? use_accept4 = 0;

#endif

??? assert(c != NULL);

??? while (!stop) {

??????? switch(c->state) {

??????? case conn_listening:

??????????? addrlen = sizeof(addr);

??????????? sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);

?????????? // 中间省略一系列的socket相关的初始化工作???????????

??????????? if (settings.maxconns_fast &&

??????????? } else {

??????????????? dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,

???????????????????????????????????? DATA_BUFFER_SIZE, c->transport);

????????? ??}

??????????? stop = true;

??????????? break;

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,

?????????????????????? int read_buffer_size, enum network_transport transport, void *ssl) {

??? CQ_ITEM *item = NULL;

??? LIBEVENT_THREAD *thread;

??? if (!settings.num_napi_ids)

??????? thread = select_thread_round_robin();

??? else

??????? thread = select_thread_by_napi_id(sfd);

??? item = cqi_new(thread->ev_queue);

??? if (item == NULL) {

??????? close(sfd);

??????? /* given that malloc failed this may also fail, but let's try */

??????? fprintf(stderr, "Failed to allocate memory for connection object\n");

??????? return;

??? }

??? item->sfd = sfd;

??? item->init_state = init_state;

??? item->event_flags = event_flags;

??? item->read_buffer_size = read_buffer_size;

??? item->transport = transport;

??? item->mode = queue_new_conn;

??? item->ssl = ssl;

??? MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);

??? notify_worker(thread, item);

}

static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) {

??? cq_push(t->ev_queue, item);

#ifdef HAVE_EVENTFD

??? uint64_t u = 1;

??? if (write(t->notify_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {

??????? perror("failed writing to worker eventfd");

??????? /* TODO: This is a fatal problem. Can it ever happen temporarily? */

??? }

#else

??? char buf[1] = "c";

??? if (write(t->notify_send_fd, buf, 1) != 1) {

??????? perror("Failed writing to notify pipe");

??????? /* TODO: This is a fatal problem. Can it ever happen temporarily? */

??? }

#endif

}

thread_libevent_process是worker线程接受master分发新来连接时候的回调函数,内部通过conn_new来处理新连接的到来,conn_new的内部操作就是把新连接的socket注册到worker线程的libevent当中。

static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) {

??? LIBEVENT_THREAD *me = arg;

??? CQ_ITEM *item;

??? conn *c;

??? uint64_t ev_count = 0; // max number of events to loop through this run.

#ifdef HAVE_EVENTFD

??? if (read(fd, &ev_count, sizeof(uint64_t)) != sizeof(uint64_t)) {

??????? if (settings.verbose > 0)

??????????? fprintf(stderr, "Can't read from libevent pipe\n");

??????? return;

??? }

#else

??? char buf[MAX_PIPE_EVENTS];

??? ev_count = read(fd, buf, MAX_PIPE_EVENTS);

??? if (ev_count == 0) {

??????? if (settings.verbose > 0)

??????????? fprintf(stderr, "Can't read from libevent pipe\n");

??????? return;

??? }

#endif

??? for (int x = 0; x < ev_count; x++) {

??????? item = cq_pop(me->ev_queue);

??????? if (item == NULL) {

??????????? return;

??????? }

??????? switch (item->mode) {

??????????? case queue_new_conn:

?????? ?????????c = conn_new(item->sfd, item->init_state, item->event_flags,

?????????????????????????????????? item->read_buffer_size, item->transport,

?????????????????????????????????? me->base, item->ssl);

3.3 命令解析

memcached 的命令协议从直观逻辑上可以分为获取类型、变更类型、其他类型。但从实际处理层面区分,则可以细分为 get 类型、update 类型、delete 类型、算术类型、touch 类型、stats 类型,以及其他类型。对应的处理函数为,process_get_command, process_update_command, process_arithmetic_command, process_touch_command 等。每个处理函数能够处理不同的协议

?

?

工作线程监听到主线程的管道通知后,会从连接队列弹出一个新连接,然后就会创建一个 conn 结构体,注册该 conn 读事件,然后继续监听该连接上的 IO 事件。后续这个连接有命令进来时,工作线程会读取 client 发来的命令,进行解析并处理,最后返回响应。工作线程的主要处理逻辑也是在状态机中,一个名叫 drive_machine 的函数。整个处理流程如下:

序号

描述

1

当客户端和Memcached建立TCP连接后,Memcached会基于Libevent的event事件来监听客户端是否有可以读取的数据

2

当客户端有命令数据报文上报的时候,就会触发drive_machine方法中的conn_read这个Case,在进入这个状态之前经过conn_new_cmd->conn_waiting->conn_read的流程

3

memcached通过try_read_network方法读取客户端的报文。如果读取失败,则返回conn_closing,去关闭客户端的连接;如果没有读取到任何数据,则会返回conn_waiting,继续等待客户端的事件到来,并且退出drive_machine的循环;如果数据读取成功,则会将状态转交给conn_parse_cmd处理,读取到的数据会存储在c->rbuf容器中

4

conn_parse_cmd主要的工作就是用来解析命令。主要通过try_read_command这个方法来读取c->rbuf中的命令数据,通过\n来分隔数据报文的命令。如果c->buf内存块中的数据匹配不到\n,则返回继续等待客户端的命令数据报文到来conn_waiting;否则就会转交给process_command方法,来处理具体的命令(命令解析会通过\0符号来分隔)

5

process_command主要用来处理具体的命令。其中tokenize_command这个方法非常重要,将命令拆解成多个元素(KEY的最大长度250)。例如我们以get命令为例,最终会跳转到process_get_command这个命令 process_*_command这一系列就是处理具体的命令逻辑的

6

我们进入process_get_command,当获取数据处理完毕之后,会转交到conn_mwrite这个状态。如果获取数据失败,则关闭连接

7

进入conn_mwrite后,主要是通过transmit方法来向客户端提交数据。如果写数据失败,则关闭连接或退出drive_machine循环;如果写入成功,则又转交到conn_new_cmd这个状态

8

conn_new_cmd这个状态主要是处理c->rbuf中剩余的命令。主要看一下reset_cmd_handler这个方法,这个方法会去判断c->rbytes中是否还有剩余的报文没处理,如果未处理,则转交到conn_parse_cmd(第四步)继续解析剩余命令;如果已经处理了,则转交到conn_waiting,等待新的事件到来。在转交之前,每次都会执行一次conn_shrink方法

9

conn_shrink方法主要用来处理命令报文容器c->rbuf和输出内容的容器是否数据满了?是否需要扩大buffer的大小,是否需要移动内存块。接受命令报文的初始化内存块大小2048,最大8192

?1)memcached启动后,主线程监听并准备接受新连接接入。当有新连接接入时,主线程进入 conn_listening 状态,accept 新连接,并将新连接调度给工作线程。

2)Worker 线程监听管道,当收到主线程通过管道发送的消息后,工作线程中的连接进入 conn_new_cmd 状态,创建 conn 结构体,并做一些初始化重置操作,然后进入 conn_waiting 状态,注册读事件,并等待网络 IO。

3)有数据到来时,连接进入 conn_read 状态,读取网络数据。

4)读取成功后,就进入 conn_parse_cmd 状态,然后根据 Mc 协议解析指令。

5)对于读取指令,获取到 value 结果后,进入 conn_mwrite 状态。

6)对于变更指令,则进入 conn_nread,进行 value 的读取,读取到 value 后,对 key 进行变更,当变更完毕后,进入 conn_write,然后将结果写入缓冲。然后和读取指令一样,也进入 conn_mwrite 状态。

7)进入到 conn_mwrite 状态后,将结果响应发送给 client。发送响应完毕后,再次进入到 conn_new_cmd 状态,进行连接重置,准备下一次命令处理循环。

8)在读取、解析、处理、响应过程,遇到任何异常就进入 conn_closing,关闭连接

3.3.1 状态机解析

状态机

说明

conn_new_cmd

主线程通过调用 dispatch_conn_new,把新连接调度给工作线程后,worker 线程创建 conn 对象,这个连接初始状态就是 conn_new_cmd。除了通过新建连接进入 conn_new_cmd 状态之外,如果连接命令处理完毕,准备接受新指令时,也会将连接的状态设置为 conn_new_cmd 状态。

进入 conn_new_cmd 后,工作线程会调用 reset_cmd_handler 函数,重置 conn 的 cmd 和 substate 字段,并在必要时对连接 buf 进行收缩。因为连接在处理 client 来的命令时,对于写指令,需要分配较大的读 buf 来存待更新的 key value,而对于读指令,则需要分配较大的写 buf 来缓冲待发送给 client 的 value 结果。持续运行中,随着大 size value 的相关操作,这些缓冲会占用很多内存,所以需要设置一个阀值,超过阀值后就进行缓冲内存收缩,避免连接占用太多内存。在后端服务以及中间件开发中,这个操作很重要,因为线上服务的连接很容易达到万级别,如果一个连接占用几十 KB 以上的内存,后端系统仅连接就会占用数百 MB 甚至数 GB 以上的内存空间。

工作线程处理完 conn_new_cmd 状态的主要逻辑后,如果读缓冲区有数据可以读取,则进入 conn_parse_cmd 状态,否则就会进入到 conn_waiting 状态,等待网络数据进来。

conn_waiting

连接进入 conn_waiting 状态后,处理逻辑很简单,直接通过 update_event 函数注册读事件即可,之后会将连接状态更新为 conn_read。

conn_read

当工作线程监听到网络数据进来,连接就进入 conn_read 状态。对 conn_read 的处理,是通过 try_read_network 从 socket 中读取网络数据。如果读取失败,则进入 conn_closing 状态,关闭连接。如果没有读取到任何数据,则会返回 conn_waiting,继续等待 client 端的数据到来。如果读取数据成功,则会将读取的数据存入 conn 的 rbuf 缓冲,并进入 conn_parse_cmd 状态,准备解析 cmd

conn_parse_cmd

conn_parse_cmd 状态的处理逻辑就是解析命令。工作线程首先通过 try_read_command 读取连接的读缓冲,并通过 \n 来分隔数据报文的命令。如果命令首行长度大于 1024,关闭连接,这就意味着 key 长度加上其他各项命令字段的总长度要小于 1024 字节。当然对于 key,Mc 有个默认的最大长度,key_max_length,默认设置为 250 字节。校验完毕首行报文的长度,接下来会在 process_command 函数中对首行指令进行处理。

备注:

conn_parse_cmd 的状态处理,只有读取到 \n,有了完整的命令首行协议,才会进入 process_command,否则会跳转到 conn_waiting,继续等待客户端的命令数据报文。在 process_command 处理中,如果是获取类命令,在获取到 key 对应的 value 后,则跳转到 conn_mwrite,准备写响应给连接缓冲。而对于 update 变更类型的指令,则需要继续读取 value 数据,此时连接会跳转到 conn_nread 状态。在 conn_parse_cmd 处理过程中,如果遇到任何失败,都会跳转到 conn_closing 关闭连接

conn_write

连接 conn_write 状态处理逻辑很简单,直接进入 conn_mwrite 状态。或者当 conn 的 iovused 为 0 或对于 udp 协议,将响应写入 conn 消息缓冲后,再进入 conn_mwrite 状态。

conn_mwrite

进入 conn_mwrite 状态后,工作线程将通过 transmit 来向客户端写数据。如果写数据失败,跳转到 conn_closing,关闭连接退出状态机。如果写数据成功,则跳转到 conn_new_cmd,准备下一次新指令的获取

conn_closing

最后一个 conn_closing 状态,前面提到过很多次,在任何状态的处理过程中,如果出现异常,就会进入到这个状态,关闭连接。

?

3.3.2 状态机源码分析

3.3.2.1 conn_new_cmd

conn_new_cmd内部通过reset_cmd_handler将状态设置为conn_parse_cmd,重新进入命令解析过程。重新进行一个大循环

??????? case conn_new_cmd:

??????????? /* Only process nreqs at a time to avoid starving other

?????????????? connections */

??????????? --nreqs;

??????????? if (nreqs >= 0) {

??????????????? reset_cmd_handler(c);

??????????? } else if (c->resp_head) {

??????????????? // flush response pipe on yield.

??????????????? conn_set_state(c, conn_mwrite);

??????????? } else {

??????????????? pthread_mutex_lock(&c->thread->stats.mutex);

??????????????? c->thread->stats.conn_yields++;

??????????????? pthread_mutex_unlock(&c->thread->stats.mutex);

??????????????? if (c->rbytes > 0) {

????????? ??????????/* We have already read in data into the input buffer,

?????????????????????? so libevent will most likely not signal read events

?????????????????????? on the socket (unless more data is available. As a

?????????????????????? hack we should just put in a request to write data,

?????????????????????? because that should be possible ;-)

??????????????????? */

??????????????????? if (!update_event(c, EV_WRITE | EV_PERSIST)) {

??????????????????????? if (settings.verbose > 0)

??????????????????????? ????fprintf(stderr, "Couldn't update event\n");

??????????????????????? conn_set_state(c, conn_closing);

??????????????????????? break;

??????????????????? }

??????????????? }

??????????????? stop = true;

??????????? }

static void reset_cmd_handler(conn *c) {

??? c->cmd = -1;

??? c->substate = bin_no_state;

??? if (c->item != NULL) {

??????? // TODO: Any other way to get here?

??????? // SASL auth was mistakenly using it. Nothing else should?

??????? if (c->item_malloced) {

??????????? free(c->item);

???? ???????c->item_malloced = false;

??????? } else {

??????????? item_remove(c->item);

??????? }

??????? c->item = NULL;

??? }

??? if (c->rbytes > 0) {

??????? conn_set_state(c, conn_parse_cmd);

??? } else if (c->resp_head) {

??????? conn_set_state(c, conn_mwrite);

??? } else {

??????? conn_set_state(c, conn_waiting);

??? }

}

?

客户端输入的第一条命令时,epoll触发了两次函数(epoll为水平触发,没有read的话会有第二次触发)

3.3.2.2 conn_waiting

??????? case conn_waiting:

??????????? rbuf_release(c);

??????????? if (!update_event(c, EV_READ | EV_PERSIST)) {

??????????????? if (settings.verbose > 0)

??????????????????? fprintf(stderr, "Couldn't update event\n");

??????????????? conn_set_state(c, conn_closing);

??????????????? break;

??????????? }

??????????? conn_set_state(c, conn_read);

??????????? stop = true;

??????????? break;

3.3.2.3 conn_read

memcached通过try_read_network方法读取客户端的报文。如果读取失败,则返回conn_closing,去关闭客户端的连接;如果没有读取到任何数据,则会返回conn_waiting,继续等待客户端的事件到来,并且退出drive_machine的循环;如果数据读取成功,则会将状态转交给conn_parse_cmd处理,读取到的数据会存储在c->rbuf容器中

??????? case conn_read:

??????????? if (!IS_UDP(c->transport)) {

??????????????? // Assign a read buffer if necessary.

??????????????? if (!rbuf_alloc(c)) {

??????????????????? // TODO: Some way to allow for temporary failures.

??????????????????? conn_set_state(c, conn_closing);

??????????????????? break;

??????????????? }

??????????????? res = try_read_network(c);

??????????? } else {

??????????????? // UDP connections always have a static buffer.

??????????????? res = try_read_udp(c);

??????????? }

??????????? switch (res) {

??????????? case READ_NO_DATA_RECEIVED:

??????????????? conn_set_state(c, conn_waiting);

??????????????? break;

??????????? case READ_DATA_RECEIVED:

??????????????? conn_set_state(c, conn_parse_cmd);

??????????????? break;

??????????? case READ_ERROR:

? ??????????????conn_set_state(c, conn_closing);

??????????????? break;

??????????? case READ_MEMORY_ERROR: /* Failed to allocate more memory */

??????????????? /* State already set by try_read_network */

??????????????? break;

??????????? }

??????????? break;

3.3.2.4 conn_parse_cmd

这个方法主要是用来读取rbuf中的命令的。因为数据报文会有粘包和拆包的特性,所以只有等到命令行完整了才能进行解析。所有只有匹配到了\n符号,才能匹配一个完整的命令。在整个解析过程中,每次解析到\n符号就说明一个完整的命令了,然后就进入处理这个命令的过程,进行处理后返回客户端后再次解析。

??????? case conn_parse_cmd:

??????????? c->noreply = false;

??????????? if (c->try_read_command(c) == 0) {

??????????????? /* we need more data! */

??????????????? if (c->resp_head) {

??????????????????? // Buffered responses waiting, flush in the meantime.

??????????????????? conn_set_state(c, conn_mwrite);

??????????????? } else {

???? ???????????????conn_set_state(c, conn_waiting);

??????????????? }

??????????? }

??????????? break;

int try_read_command_ascii(conn *c) {

??? char *el, *cont;

??? if (c->rbytes == 0)

??????? return 0;

??? el = memchr(c->rcurr, '\n', c->rbytes);

??? if (!el) {

??????? if (c->rbytes > 2048) {

??????????? char *ptr = c->rcurr;

??????????? while (*ptr == ' ') { /* ignore leading whitespaces */

??????????????? ++ptr;

??????????? }

??????????? if (ptr - c->rcurr > 100 ||

??????????????? (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {

??????????????? conn_set_state(c, conn_closing);

??????????????? return 1;

??????????? }

??????????? if (!c->rbuf_malloced) {

??????????????? if (!rbuf_switch_to_malloc(c)) {

??????????????????? conn_set_state(c, conn_closing);

??????????????????? return 1;

??????????????? }

??????????? }

??????? }

??????? return 0;

??? }

??? cont = el + 1;

??? if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {

??????? el--;

??? }

??? *el = '\0';

??? assert(cont <= (c->rcurr + c->rbytes));

??? c->last_cmd_time = current_time;

??? process_command_ascii(c, c->rcurr);

??? c->rbytes -= (cont - c->rcurr);

??? c->rcurr = cont;

??? assert(c->rcurr <= (c->rbuf + c->rsize));

??? return 1;

}

tokenize_command需要分析的下一个细节就是关于最后一个元素的问题,如果解析的命令个数没有达到max_tokens,最后一个元素内容为空,如果达到了max_tokens,最后一个元素时剩余的未解析字符串

static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {

??? char *s, *e;

??? size_t ntokens = 0;

??? assert(command != NULL && tokens != NULL && max_tokens > 1);

??? size_t len = strlen(command);

??? unsigned int i = 0;

??? s = e = command;

??? for (i = 0; i < len; i++) {

??????? if (*e == ' ') {

??????????? if (s != e) {

??????????????? tokens[ntokens].value = s;

??????????????? tokens[ntokens].length = e - s;

??????????????? ntokens++;

??????????????? *e = '\0';

??????????????? if (ntokens == max_tokens - 1) {

??????????????????? e++;

??????????????????? s = e; /* so we don't add an extra token */

??????????????????? break;

??????????????? }

??????????? }

??????????? s = e + 1;

??????? }

??????? e++;

??? }

??? if (s != e) {

??????? tokens[ntokens].value = s;

??????? tokens[ntokens].length = e - s;

??????? ntokens++;

??? }

??? /*

???? * If we scanned the whole string, the terminal value pointer is null,

???? * otherwise it is the first unprocessed character.

???? */

??? tokens[ntokens].value =? *e == '\0' ? NULL : e;

??? tokens[ntokens].length = 0;

??? ntokens++;

??? return ntokens;

}

void process_command_ascii(conn *c, char *command) {

??? token_t tokens[MAX_TOKENS];

??? size_t ntokens;

??? int comm;

??? assert(c != NULL);

??? MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);

??? if (settings.verbose > 1)

??????? fprintf(stderr, "<%d %s\n", c->sfd, command);

??? // Prep the response object for this query.

??? if (!resp_start(c)) {

??????? conn_set_state(c, conn_closing);

??????? return;

??? }

??? ntokens = tokenize_command(command, tokens, MAX_TOKENS);

??? // All commands need a minimum of two tokens: cmd and NULL finalizer

??? // There are also no valid commands shorter than two bytes.

??? if (ntokens < 2 || tokens[COMMAND_TOKEN].length < 2) {

??????? out_string(c, "ERROR");

??????? return;

??? }

??? // Meta commands are all 2-char in length.

??? char first = tokens[COMMAND_TOKEN].value[0];

??? if (first == 'm' && tokens[COMMAND_TOKEN].length == 2) {

??????? switch (tokens[COMMAND_TOKEN].value[1]) {

????????? …………………………………………..

??? } else if (first == 'g') {

??????? // Various get commands are very common.

??????? WANT_TOKENS_MIN(ntokens, 3);

??????? if (strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) {

??????????? process_get_command(c, tokens, ntokens, false, false);

??????? } else if (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0) {

??????????? process_get_command(c, tokens, ntokens, true, false);

??????? } else if (strcmp(tokens[COMMAND_TOKEN].value, "gat") == 0) {

??????????? process_get_command(c, tokens, ntokens, false, true);

??????? } else if (strcmp(tokens[COMMAND_TOKEN].value, "gats") == 0) {

??????????? process_get_command(c, tokens, ntokens, true, true);

??????? } else {

??????????? out_string(c, "ERROR");

??????? }

??? } else if (first == 's') {

??????? if (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) {

??????????? WANT_TOKENS_OR(ntokens, 6, 7);

??????????? process_update_command(c, tokens, ntokens, comm, false);

??????? } else if (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0) {

??????????? process_stat(c, tokens, ntokens);

??????? } else if (strcmp(tokens[COMMAND_TOKEN].value, "shutdown") == 0) {

??????????? process_shutdown_command(c, tokens, ntokens);

??????? } else if (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0) {

??????????? process_slabs_command(c, tokens, ntokens);

??????? } else {

??????????? out_string(c, "ERROR");

??????? }

………………………………………..

process_command_ascii 根据tokens里存储的命令字段进行不同的操作,按照set 操作进行分析。

process_update_command函数申请分配一个item后,并没有直接把这个item插入到LRU队列和哈希表中,而不过用conn结构体的item成员指向这个申请得到的item,而且用ritem成员指向item结构体的数据域(这为了方便写入数据)。最后conn的状态改动为conn_nread

static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {

? …………………………………………………….

??? item *it;

??? assert(c != NULL);

??? set_noreply_maybe(c, tokens, ntokens);

??? if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {

??????? out_string(c, "CLIENT_ERROR bad command line format");

??????? return;

??? }

??? key = tokens[KEY_TOKEN].value;

??? nkey = tokens[KEY_TOKEN].length;

??? if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)

?????????? && safe_strtol(tokens[3].value, &exptime_int)

?????????? && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {

??????? out_string(c, "CLIENT_ERROR bad command line format");

??????? return;

??? }

??? exptime = realtime(EXPTIME_TO_POSITIVE_TIME(exptime_int));

??? // does cas value exist?

??? if (handle_cas) {

??????? if (!safe_strtoull(tokens[5].value, &req_cas_id)) {

??????????? out_string(c, "CLIENT_ERROR bad command line format");

??????????? return;

??????? }

??? }

??? if (vlen < 0 || vlen > (INT_MAX - 2)) {

??????? out_string(c, "CLIENT_ERROR bad command line format");

??????? return;

??? }

??? vlen += 2;

it = item_alloc(key, nkey, flags, exptime, vlen);

…………………………………………………

? ??ITEM_set_cas(it, req_cas_id);

??? c->item = it;

??? c->ritem = ITEM_data(it);

??? c->rlbytes = it->nbytes;

??? c->cmd = comm;

??? conn_set_state(c, conn_nread);

}

3.3.2.5 conn_nread

conn_parse_cmd状态机中的process_update_command命令处理过程是没有把item的数据写入到item结构体中。只是把状态机迁移到了conn_nread退出到drive_machine函数中。

尽管process_update_command留下了尾巴,但它也用conn的成员变量记录了一些重要值,用于填充item的数据域。rlbytes表示须要用多少字节填充item(需要填充的数据的长度),rbytes表示读缓冲区还有多少字节能够使用,ritem指向数据填充地点。

??????? case conn_nread:

??????????? if (c->rlbytes == 0) {

??????????????? complete_nread(c);

??????????????? break;

??????????? }

??????????? /* Check if rbytes < 0, to prevent crash */

??????????? if (c->rlbytes < 0) {

??????????????? if (settings.verbose) {

??????????????????? fprintf(stderr, "Invalid rlbytes to read: len %d\n", c->rlbytes);

??????????????? }

??????????????? conn_set_state(c, conn_closing);

??????????????? break;

??????????? }

??????????? if (c->item_malloced || ((((item *)c->item)->it_flags & ITEM_CHUNKED) == 0) ) {

??????????????? /* first check if we have leftovers in the conn_read buffer */

??????????????? if (c->rbytes > 0) {

??????????????????? int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;

??????????????????? memmove(c->ritem, c->rcurr, tocopy);

??????????????????? c->ritem += tocopy;

??????????????????? c->rlbytes -= tocopy;

??????????????????? c->rcurr += tocopy;

??????????????????? c->rbytes -= tocopy;

??????????????????? if (c->rlbytes == 0) {

??????????????????????? break;

??????????????????? }

??????????????? }

??????????????? /*? now try reading from the socket */

??????????????? res = c->read(c, c->ritem, c->rlbytes);

??????????????? if (res > 0) {

??????????????????? pthread_mutex_lock(&c->thread->stats.mutex);

??????????????????? c->thread->stats.bytes_read += res;

??????????????????? pthread_mutex_unlock(&c->thread->stats.mutex);

??????????????????? if (c->rcurr == c->ritem) {

??????????????????? ????c->rcurr += res;

??????????????????? }

??????????????????? c->ritem += res;

??????????????????? c->rlbytes -= res;

??????????????????? break;

??????????????? }

??????????? } else {

??????????????? res = read_into_chunked_item(c);

??????????????? if (res > 0)

??????????????????? break;

??????????? }

??????????? if (res == 0) { /* end of stream */

??????????????? c->close_reason = NORMAL_CLOSE;

??????????????? conn_set_state(c, conn_closing);

??????????????? break;

??????????? }

??????????? if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {

??????????????? if (!update_event(c, EV_READ | EV_PERSIST)) {

??????????????????? if (settings.verbose > 0)

??????????????????????? fprintf(stderr, "Couldn't update event\n");

??????????????????? conn_set_state(c, conn_closing);

??????????????????? break;

??????????????? }

??????????????? stop = true;

??????????????? break;

??????????? }

??????????? /* Memory allocation failure */

??????????? if (res == -2) {

??????????????? out_of_memory(c, "SERVER_ERROR Out of memory during read");

??????????????? c->sbytes = c->rlbytes;

??????????????? conn_set_state(c, conn_swallow);

??????????????? // Ensure this flag gets cleared. It gets killed on conn_new()

??????????? ????// so any conn_closing is fine, calling complete_nread is

??????????????? // fine. This swallow semms to be the only other case.

??????????????? c->set_stale = false;

??????????????? c->mset_res = false;

??????????????? break;

??????????? }

?????????? ?/* otherwise we have a real error, on which we close the connection */

??????????? if (settings.verbose > 0) {

??????????????? fprintf(stderr, "Failed to read, and not due to blocking:\n"

??????????????????????? "errno: %d %s \n"

??????????????????????? "rcurr=%p ritem=%p rbuf=%p rlbytes=%d rsize=%d\n",

??????????????????????? errno, strerror(errno),

??????????????????????? (void *)c->rcurr, (void *)c->ritem, (void *)c->rbuf,

??????????????????????? (int)c->rlbytes, (int)c->rsize);

??????????? }

????????? ??conn_set_state(c, conn_closing);

??????????? break;

当rlbytes值减少到0后,代表需要的数据值全部读取出来了,会进行complete_nread处理,

void complete_nread_ascii(conn *c) {

??? assert(c != NULL);

??? item *it = c->item;

??? int comm = c->cmd;

??? enum store_item_type ret;

??? bool is_valid = false;

??? pthread_mutex_lock(&c->thread->stats.mutex);

??? c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++;

??? pthread_mutex_unlock(&c->thread->stats.mutex);

??? if ((it->it_flags & ITEM_CHUNKED) == 0) {

??????? if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) == 0) {

??????????? is_valid = true;

??????? }

??? }

??? if (!is_valid) {

??????? // metaset mode always returns errors.

??????? if (c->mset_res) {

??????????? c->noreply = false;

??????? }

??????? out_string(c, "CLIENT_ERROR bad data chunk");

??? } else {

???? ?ret = store_item(it, comm, c);

………………………………………….

????? if (c->mset_res) {

????????? _finalize_mset(c, ret);

????? } else {

????????? switch (ret) {

????????? case STORED:

????????????? out_string(c, "STORED");

????????????? break;

????????? case EXISTS:

????????????? out_string(c, "EXISTS");

????????????? break;

????????? case NOT_FOUND:

????????????? out_string(c, "NOT_FOUND");

????????? ????break;

????????? case NOT_STORED:

????????????? out_string(c, "NOT_STORED");

????????????? break;

????????? default:

????????????? out_string(c, "SERVER_ERROR Unhandled storage type.");

????????? }

????? }

??? }

??? c->set_stale = false; /* force flag to be off just in case */

??? c->mset_res = false;

??? item_remove(c->item);?????? /* release the c->item reference */

??? c->item = 0;

}

根据处理结果给client返回不同信息,并把状态机迁移到conn_new_cmd

void out_string(conn *c, const char *str) {

??? size_t len;

??? assert(c != NULL);

??? mc_resp *resp = c->resp;

??? // if response was original filled with something, but we're now writing

??? // out an error or similar, have to reset the object first.

??? // TODO: since this is often redundant with allocation, how many callers

?? ?// are actually requiring it be reset? Can we fast test by just looking at

??? // tosend and reset if nonzero?

??? resp_reset(resp);

??? if (c->noreply) {

??????? // TODO: just invalidate the response since nothing's been attempted

??????? // to send yet?

??????? resp->skip = true;

??????? if (settings.verbose > 1)

??????????? fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);

??????? conn_set_state(c, conn_new_cmd);

??????? return;

??? }

??? if (settings.verbose > 1)

??????? fprintf(stderr, ">%d %s\n", c->sfd, str);

??? // Fill response object with static string.

??? len = strlen(str);

??? if ((len + 2) > WRITE_BUFFER_SIZE) {

??????? /* ought to be always enough. just fail for simplicity */

??????? str = "SERVER_ERROR output line too long";

??????? len = strlen(str);

??? }

??? memcpy(resp->wbuf, str, len);

??? memcpy(resp->wbuf + len, "\r\n", 2);

??? resp_add_iov(resp, resp->wbuf, len + 2);

??? conn_set_state(c, conn_new_cmd);

??? return;

}

在conn_new_cmd状态下,判断resp_head不为空,迁移状态到conn_mwrite

3.3.2.6 conn_mwrite

??????? case conn_write:

??????? case conn_mwrite:

? ????????????for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {

??????????????? if (q->stack_ctx != NULL) {

??????????????????? io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type);

??????????????????? qcb->submit_cb(q);

??????????????????? c->io_queues_submitted++;

??????????????? }

??????????? }

??????????? if (c->io_queues_submitted != 0) {

??????????????? conn_set_state(c, conn_io_queue);

????? ??????????event_del(&c->event);

?? ?????????????stop = true;

??????????????? break;

??????????? }

??????????? switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) {

??????????? case TRANSMIT_COMPLETE:

??????????????? if (c->state == conn_mwrite) {

??????????????????? // Free up IO wraps and any half-uploaded items.

??????????????????? conn_release_items(c);

??????????????????? conn_set_state(c, conn_new_cmd);

??????????????????? if (c->close_after_write) {

??????????????????????? conn_set_state(c, conn_closing);

????????????????? ??}

??????????????? } else {

??????????????????? if (settings.verbose > 0)

??????????????????????? fprintf(stderr, "Unexpected state %d\n", c->state);

??????????????????? conn_set_state(c, conn_closing);

??????????????? }

??????????????? break;

发送完出返回TRANSMIT_COMPLETE,迁移状态机到conn_new_cmd。

4 memcached数据存储

序号

描述

1

Memcached在启动的时候,会默认初始化一个HashTable,这个table的默认长度为65536

2

我们将这个HashTable中的每一个元素称为桶,每个桶就是一个item结构的单向链表

3

Memcached会将key值hash成一个变量名称为hv的uint32_t类型的值

4

通过hv与桶的个数之间的按位与计算,hv & hashmask(hashpower),就可以得到当前的key会落在哪个桶上面

5

然后会将item挂到这个桶的链表上面。链表主要是通过item结构中的h_next实现

4.1 Memcached存储结构分析

assoc_init负责初始化hashtable数据结构,通过初始化hashsize(hashpower)大小的数组指针,默认应该是2*16次方大小的数组

void assoc_init(const int hashtable_init) {

??? if (hashtable_init) {

??????? hashpower = hashtable_init;

??? }

??? primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));

??? if (! primary_hashtable) {

??????? fprintf(stderr, "Failed to init hashtable.\n");

??????? exit(EXIT_FAILURE);

??? }

??? STATS_LOCK();

??? stats_state.hash_power_level = hashpower;

? ??stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *);

??? STATS_UNLOCK();

}

?

Memcached存储数据结构item定义,item的结构分两部分, 第一部分定义 item 结构的属性,第二部分是 item 的数据

typedef struct _stritem {

??? /* Protected by LRU locks */

??? struct _stritem *next;

??? struct _stritem *prev;

??? /* Rest are protected by an item lock */

??? struct _stritem *h_next;??? /* hash chain next */

??? rel_time_t????? time;?????? /* least recent access */

??? rel_time_t????? exptime;??? /* expire time */

??? int???????????? nbytes;???? /* size of data */

??? unsigned short? refcount;

??? uint16_t??????? it_flags;?? /* ITEM_* above */

??? uint8_t???????? slabs_clsid;/* which slab class we're in */

??? uint8_t???????? nkey;?????? /* key length, w/terminating null and padding */

??? /* this odd type prevents type-punning issues when we do

???? * the little shuffle to save space when not using CAS. */

??? union {

??????? uint64_t cas;

??????? char end;

??? } data[];

??? /* if it_flags & ITEM_CAS we have 8 bytes CAS */

??? /* then null-terminated key */

??? /* then " flags length\r\n" (no terminating null) */

??? /* then data with terminating \r\n (no terminating null; it's binary!) */

} item;

4.2 数据查找过程

1)首先通过key的hash值hv找到对应的桶,区分是否在扩容。 primary_hashtable[hv & hashmask(hashpower)];

2)然后遍历桶的单链表,比较key值并找到对应item。

item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {

??? item *it;

??? uint64_t oldbucket;

??? if (expanding &&

??????? (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)

??? {

??????? it = old_hashtable[oldbucket];

??? } else {

??????? it = primary_hashtable[hv & hashmask(hashpower)];

? ??}

??? item *ret = NULL;

??? int depth = 0;

??? while (it) {

??????? if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {

??????????? ret = it;

??????????? break;

??????? }

??????? it = it->h_next;

??????? ++depth;

??? }

??? MEMCACHED_ASSOC_FIND(key, nkey, depth);

??? return ret;

}

4.3 数据插入过程

1)首先通过key的hash值hv找到对应的桶。

2)然后将item放到对应桶的单链表的头部

int assoc_insert(item *it, const uint32_t hv) {

??? uint64_t oldbucket;

//??? assert(assoc_find(ITEM_key(it), it->nkey) == 0);? /* shouldn't have duplicately named things defined */

??? if (expanding &&

??????? (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)

??? {

??????? it->h_next = old_hashtable[oldbucket];

??????? old_hashtable[oldbucket] = it;

??? } else {

??????? it->h_next = primary_hashtable[hv & hashmask(hashpower)];

??????? primary_hashtable[hv & hashmask(hashpower)] = it;

??? }

??? MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey);

??? return 1;

}

4.4数据删除过程

1)首先通过key的hash值hv找到对应的桶。

2)找到桶对应的链表,遍历单链表,删除对应的Item。

void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {

??? item **before = _hashitem_before(key, nkey, hv);

??? if (*before) {

??????? item *nxt;

??????? /* The DTrace probe cannot be triggered as the last instruction

???????? * due to possible tail-optimization by the compiler

???????? */

??????? MEMCACHED_ASSOC_DELETE(key, nkey);

??????? nxt = (*before)->h_next;

??????? (*before)->h_next = 0;?? /* probably pointless, but whatever. */

??????? *before = nxt;

??????? return;

??? }

??? /* Note:? we never actually get here.? the callers don't delete things

?????? they can't find. */

??? assert(*before != 0);

}

4.5 数据扩容过程

1)数据扩容过程是由一个单独线程在检测是否需要扩容,扩容的前提条件是curr_items > (hashsize(hashpower) * 3) / 2,也就是说数据量是原来的1.5倍

2)检测需要扩容后通过信号通知pthread_cond_signal(&maintenance_cond)开始执行扩容

3)以2倍的扩容速度进行扩容,primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *))

4)迁移过程是一个逐步迁移过程,每次都只迁移一个桶里面的Item数据

5 LRU内存回收

以往的LRU算法,基本做法都是这样的:

  1. 创建一个LRU链表,每次新加入的元素都放在链表头。
  2. 如果元素被访问了一次,同样从当前链表中摘除放到链表头。

3)需要淘汰元素时,从链表尾开始找可以淘汰的元素出来淘汰。

这个算法有如下几个问题:

1)元素被访问一次就会被放到LRU链表的头部,这样即便这个元素可以被淘汰,也会需要很久才会淘汰掉这个元素。

2)由于上面的原因,从链表尾部开始找可以淘汰的元素时,实际可能访问到的是一些虽然不常被访问,但是还没到淘汰时间(即有效时间TTL还未过期)的数据,这样会一直沿着链表往前找很久才能找到适合淘汰的元素。由于这个查找被淘汰元素的过程是需要加锁保护的,加锁时间一长影响了系统的并发。

5.1 改进的分段LRU算法(Segmented LRU)

分段LRU算法中将LRU链表根据活跃度分成了4类:

  1. HOT_LRU:存储热数据的LRU链表。
  2. WARM_LRU:存储温数据(即活跃度不如热数据)的LRU链表。

3COLD_LRU:存储冷数据的LRU链表。

4TEMP_LRU:存储临时数据(默认不开启)

需要说明的是:热(参数settings.hot_lru_pct)和暖(参数settings.warm_lru_pct)数据的占总体内存的比例有限制,而冷数据则无限。

#define HOT_LRU 0
#define WARM_LRU 64
#define COLD_LRU 128
#define TEMP_LRU 192

同时,使用了headstails两个数组用来保存LRU链表:

#define POWER_LARGEST  256 /* actual cap is 255 */

#define LARGEST_ID POWER_LARGEST

static item *heads[LARGEST_ID];
static item *tails[LARGEST_ID];

上面分析slabclass的时候提到过,首先会根据被分配内存大小计算出来一个slabclass数组的索引。在需要从LRU链表中淘汰数据时,由于LRU链表分为了上面三类,那么就还需要再进行一次slabid | lru id计算(其实就是slabid + lru id),到对应的LRU链表中查找元素:

?

由于从链表尾部往前查找可以淘汰的元素,中间可能会经历很多不能被淘汰的元素,影响了淘汰的速度,因此前面的分级LRU链表就能帮助程序快速识别出哪些元素可以被淘汰。三个分级的LRU链表之间的转换规则如下:

HOT_LRU

HOT LRU队列中的数据绝不会到HOT_LRU队列的前面,只会往更冷的队列中放。规则是:如果元素变得活跃,就放到WARM队列中;否则如果不活跃,就直接放到COLD队列中

WARM_LRU

如果WARM队列的元素变的活跃,就会移动到WARM队列头;否则往COLD队列移动

COLD_LRU

COLD队列中的元素,都是不太活跃的了,所以当需要淘汰元素时都会首先到COLD LRU队列中找可以淘汰的数据。当一个在COLD队列的元素重新变成活跃元素时,并不会移动到COLD队列的头部,而是直接移动回去WARM队列

PS任何操作都不能将一个元素从WARMCOLD队列中移动回去HOT队列了,也就是从HOT队列中移动元素出去的操作是单向操作

?

原有LRU算法最大的问题是:只要元素被访问过一次,就马上会被移动到LRU链表的前面,影响了后面对这个元素的淘汰。

改进的算法中,加入了一个机制:只有当元素被访问两次以后,才会标记成活跃元素。

代码中引入了两个标志位,其置位的规则如下:

1ITEM_FETCHED:第一次被访问时置位该标志位。

2ITEM_ACTIVE:第二次被访问时(即it->it_flags & ITEM_FETCHEDtrue的情况下)置位该标志位。

3INACTIVE:不活跃状态。

4ITEM_ACTIVE标志位清除的规则,从链表尾遍历到某一个LRU链表时,该元素是链表的最后一个元素,则认为是不活跃的元素,即可以清除ITEM_ACTIVE标志位;

这样,有效避免了只访问一次就变成活跃元素的问题,所以元素变成活跃就意指至少被访问两次以”。

5.2 memcached 内存回收

惰性删除

memcached一般不会主动去清除已经过期或者失效的缓存,当get请求一个item才会去检查item是否失效

flush命令

flush命令会将所有的item设置为失效

创建的时候检查

Memcached会在创建ITEM的时候去LRU的链表尾部开始检查,是否有失效的ITEM,如果没有的话就重新创建

LRU爬虫

memcached默认是关闭LRU爬虫的。LRU爬虫是一个单独的线程,会去清理失效的ITEM

LRU淘汰

当缓存没有内存可以分配给新的元素的时候,memcached会从LRU链表的尾部开始淘汰一个ITEM,不管这个ITEM是否还在有效期都将会面临淘汰。LRU链表插入缓存ITEM的时候有先后顺序,所以淘汰一个ITEM也是从尾部进行 也就是先淘汰最早的ITEM。

5.2.1惰性删除

惰性删除删除其实就是在get数据的时候进行比较判断数据是否过期,这里会跟flush_all命令过期结合起来使用,判断的时候依据了flush_all设置的过期时间settings.oldest_liv

item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c, const bool do_update) {
    item *it = assoc_find(key, nkey, hv);
    if (it != NULL) {
        refcount_incr(it);
        /* Optimization for slab reassignment. prevents popular items from
         * jamming in busy wait. Can only do this here to satisfy lock order
         * of item_lock, slabs_lock. */
        /* This was made unsafe by removal of the cache_lock:
         * slab_rebalance_signal and slab_rebal.* are modified in a separate
         * thread under slabs_lock. If slab_rebalance_signal = 1, slab_start =
         * NULL (0), but slab_end is still equal to some value, this would end
         * up unlinking every item fetched.
         * This is either an acceptable loss, or if slab_rebalance_signal is
         * true, slab_start/slab_end should be put behind the slabs_lock.
         * Which would cause a huge potential slowdown.
         * Could also use a specific lock for slab_rebal.* and
         * slab_rebalance_signal (shorter lock?)
         */
        /*if (slab_rebalance_signal &&
            ((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {
            do_item_unlink(it, hv);
            do_item_remove(it);
            it = NULL;
        }*/
    }
    int was_found = 0;

    if (settings.verbose > 2) {
        int ii;
        if (it == NULL) {
            fprintf(stderr, "> NOT FOUND ");
        } else {
            fprintf(stderr, "> FOUND KEY ");
        }
        for (ii = 0; ii < nkey; ++ii) {
            fprintf(stderr, "%c", key[ii]);
        }
    }

    if (it != NULL) {
        was_found = 1;
        if (item_is_flushed(it)) {
            do_item_unlink(it, hv);
            STORAGE_delete(c->thread->storage, it);
            do_item_remove(it);
            it = NULL;
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.get_flushed++;
            pthread_mutex_unlock(&c->thread->stats.mutex);
            if (settings.verbose > 2) {
                fprintf(stderr, " -nuked by flush");
            }
            was_found = 2;
        } else if (it->exptime != 0 && it->exptime <= current_time) {
            do_item_unlink(it, hv);
            STORAGE_delete(c->thread->storage, it);
            do_item_remove(it);
            it = NULL;
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.get_expired++;
            pthread_mutex_unlock(&c->thread->stats.mutex);
            if (settings.verbose > 2) {
                fprintf(stderr, " -nuked by expire");
            }
            was_found = 3;
        } else {
            if (do_update) {
                do_item_bump(c, it, hv);
            }
            DEBUG_REFCNT(it, '+');
        }
    }

    if (settings.verbose > 2)
        fprintf(stderr, "\n");
    /* For now this is in addition to the above verbose logging. */
    LOGGER_LOG(c->thread->l, LOG_FETCHERS, LOGGER_ITEM_GET, NULL, was_found, key,
               nkey, (it) ? it->nbytes : 0, (it) ? ITEM_clsid(it) : 0, c->sfd);

    return it;
}

5.2.2 flush_all命令删除

int item_is_flushed(item *it) {

??? rel_time_t oldest_live = settings.oldest_live;

??? uint64_t cas = ITEM_get_cas(it);

??? uint64_t oldest_cas = settings.oldest_cas;

??? if (oldest_live == 0 || oldest_live > current_time)

??????? return 0;

??? if ((it->time <= oldest_live)

??????????? || (oldest_cas != 0 && cas != 0 && cas < oldest_cas)) {

??????? return 1;

??? }

??? return 0;

}

5.2.3 新建item会检查数据过期

序号

描述

1

do_item_alloc进入新增item的内存申请流程

2

do_item_alloc_pull进入item申请的逻辑处理,最多处理10次

3

do_item_alloc_pull内部逻辑是尝试通过slabs_alloc申请内存,失败则尝试通过lru_pull_tail方法释放LRU队列中的item变成可用item

4

lru_pull_tail执行释放LRU队列中item的过程,内部包括各种过期item的回收

5.2.4 LRU爬虫线程定时清理

后续分析

6 memcached应用中存在的问题

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-23 10:53:19  更:2022-04-23 10:54:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 12:34:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码