阅读redis的源码永远也绕不过它的启动。我们来看下redis的启动流程。不想看代码可以直接看最后的流程图。
以下源码分析是redis的5.0分支 源码注释:https://github.com/yxkong/redis/commits/5.0
这是启动流程的核心代码。
int main(int argc, char **argv) {
//申请空间oom后的处理器
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
//哨兵模式
server.sentinel_mode = checkForSentinelMode(argc,argv);
//初始化服务器配置(默认值)
initServerConfig();
if (argc >= 2) {
//将配置文件的内容填充到server中
loadServerConfig(configfile,options);
}
//守护进程
int background = server.daemonize && !server.supervised;
if (background) daemonize();
//初始化server服务
initServer();
InitServerLast();
aeMain(server.el);
}
初始化服务配置
initServerConfig中只是给变量赋值了默认值
void initServerConfig(void) {
//初始化锁
pthread_mutex_init(&server.next_client_id_mutex,NULL);
pthread_mutex_init(&server.lruclock_mutex,NULL);
pthread_mutex_init(&server.unixtime_mutex,NULL);
//数据库配置填充
//淘汰策略
//持久化配置
//将预定义的命令填充到server.commands
populateCommandTable();
加载配置文件
将配置文件的内容填充到server中覆盖初始化变量 在config.c中
void loadServerConfig(char *filename, char *options) {
//读取配置到config
while(fgets(buf,CONFIG_MAX_LINE+1,fp) != NULL)
config = sdscat(config,buf);
//将配置填充到server,这里是各种if else逻辑
loadServerConfigFromString(config);
}
初始化server服务
这里是重头戏
void initServer(void) {
//创建共享对象
createSharedObjects();
//创建事件监听器,maxclients默认10000个
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
//socket监听,绑定几个ip,就监听几个,默认监听ipv6和ipv4
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
//初始化化数据库的属性
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
}
/**
* @brief 创建时间处理器,并将serverCron放入处理器里(重要)
* 在这里创建了aeTimeEvent并扔给了eventLoop->timeEventHead
*/
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/**
* @brief 重点 ##########
* 监听多少个tcp就创建多少个
*/
for (j = 0; j < server.ipfd_count; j++) {
//将acceptTcpHandler 放入文件监听器里,
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
//慢日志初始化
slowlogInit();
//monitor初始化
latencyMonitorInit();
}
在createSharedObjects()里主要创建了共享对象,包括一些异常提示,服务交互指令,共享int等
在ae.c中,我们看下aeCreateEventLoop
/**
* @brief 创建事件监听器
*
* @param setsize 比配置的最大链接数要多96,为了安全处理(比如有的处理完了,还没有释放,多创建的就相当于缓冲队列了)
* @return aeEventLoop*
*/
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
//分配空间
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
//创建对应数量的aeFileEvent空间
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
//创建对应数量的aeFiredEvent空间,此处承载的是触发事件(从epoll里读取后会放入这里)
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
//时间事件链表头节点(只有一个)
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
//创建一个aeApiState 给eventLoop(不同的系统实现不同,aeApiState也不同)
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
在ae.c中,看下aeCreateFileEvent
**
* @brief 创建文件事件监听器并放入到eventLoop->events中
* 注册acceptTcpHandler处理AE_READABLE和AE_WRITABLE
* @param eventLoop
* @param fd 对应tcp socket的fd值
* @param mask
* @param proc 处理器acceptTcpHandler
* @param clientData
* @return int
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 将创建的aeFileEvent 赋值给了&eventLoop->events[fd]
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
// 将处理器给rfileProc和wfileProc(后续会拿着这个直接执行)
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
初始化后端线程
void InitServerLast() {
bioInit();
server.initial_memory_usage = zmalloc_used_memory();
}
bioInit() 在 bio.c中
/**
* @brief 初始化后台线程
*
*/
void bioInit(void) {
/**
* @brief 根据后台线程的数量创建线程
*
*/
for (j = 0; j < BIO_NUM_OPS; j++) {
//初始化一个NUll 的互斥锁
pthread_mutex_init(&bio_mutex[j],NULL);
//新任务条件变量
pthread_cond_init(&bio_newjob_cond[j],NULL);
//下一个执行的条件变量
pthread_cond_init(&bio_step_cond[j],NULL);
//创建一个list
bio_jobs[j] = listCreate();
//待处理
bio_pending[j] = 0;
}
/**
* @brief 创建后端线程,并放入线程组
*
*/
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
//创建线程,并启动bioProcessBackgroundJobs
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
最重要的 aeMain
在ae.c中
/**
* @brief 执行eventLoop
*
* @param eventLoop
*/
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
//只要没有停止,就循环执行,这个是主线程
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
/**
* @brief 处理eventLoop里等待的 定时任务,文件事件,过期事件
* @param eventLoop
* @param flags 事件类型,
* 从main中过来是所有的事件,
* 从networking过来是文件事件
* @return int
*/
int aeProcessEvents(aeEventLoop *eventLoop, int flags){
//先从eventLoop取下一个要执行的定时器,如果下一次定时还有一定的时间间隔,那么就让epoll阻塞间隔的时间获取数据,否则则立即执行
/**
* @brief 从epoll里拿到numevents数量的数据
* 有时间,就阻塞指定的时间,没有时间,直到有数据
*/
numevents = aeApiPoll(eventLoop, tvp);
/**
* @brief 执行触发的事件
*
*/
for (j = 0; j < numevents; j++) {
//获取一个事件处理器
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
//处理读事件,此处的rfileProc和wfileProc 可以直接理解为acceptTcpHandler
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
}
//处理定时任务
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
//处理完以后进入另一次循环
return processed; /* return the number of processed file/time events */
}
ae_kqueue.c中
/**
* @brief 单位时间内获取事件数量
*
* @param eventLoop
* @param tvp 在单位时间内
* @return int 返回待处理的事件数量
*/
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
if (tvp != NULL) {
struct timespec timeout;
timeout.tv_sec = tvp->tv_sec;
timeout.tv_nsec = tvp->tv_usec * 1000;
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, &timeout);
} else {
/**
* @brief 获取已经就绪的文件描述符数量
* timeout指针为空,那么kevent()会永久阻塞,直到事件发生
*/
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, NULL);
}
//将事件填充到eventLoop->fired中
if (retval > 0) {
int j;
numevents = retval;
for(j = 0; j < numevents; j++) {
int mask = 0;
struct kevent *e = state->events+j;
if (e->filter == EVFILT_READ) mask |= AE_READABLE;
if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->ident;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
终极图
|