前言
本文参考源码版本为 redis6.2
一般来说,一个 redis 请求 有两大模块,网络模块 + 命令处理模块。我们常说的 redis 单线程模型,其实主要就指的是一个正常请求涉及的网络模块和命令处理模块。
当执行一个特别慢的命令时,比如删除一个百万级的字典,可能会造成暂时的卡顿,导致 QPS 骤降;基于此,在 redis 4.0 出现专门处理这种 Lazy Free 模型的后台线程 。
另外,正常情况下,redis 单线程模型中,网络模块往往成为瓶颈高发地;因此,redis 6.0 引入多线程 模型,解决网络模块的问题。
前面系列文章已经介绍了 redis 的单线程模型及其背后的后台线程,因此,本文主要焦点集中在 redis 6.0 出现的多线程。
在开始阅读之前,你也可以思考一个问题:命令处理为什么不采用多线程模型?
一、架构演进?
复杂架构都是逐渐演进而来,从单线程到多线程,从单体功能到复杂功能等等,redis 也是如此。
redis 单线程情况下,也能达到极高的吞吐量,但某些情况下,会出现相当耗时的操作,导致吞处理骤降,因此逐渐引入后台线程来完成这些操作。
慢慢地,当我们要求更高的吞吐量时,网络模块却经常掉链子,在 redis 6.0 中又引入多线程来解决这个问题 ----- 这也是本文主要探讨的问题。
话不多说,开始吧~
1. 单线程
redis 是单线程模式 ------- 这是我们经常听到了说法,仅靠一个线程就能达到几万QPS,简直令人称奇!
我画了张单线程模型图,大概是这样: 你可以直观的看到,redis 确实仅靠一个线程处理了所有客户端的请求 ,一条龙服务!!!
从接收新连接、IO就绪监听、IO读,到命令执行,最后到命令执行后的数据回复(IO写)等都是一个线程处理,这些操作的封装,redis 中称之为文件事件 ;
当然,还有。redis 中的另一大事件 ------- 时间事件 ,负责相关的周期性处理任务,比如 key 过期清理、字典 rehash、触发 AOF 重写/RDB 的 bgsave等等。
值得注意的是,文件事件 和 时间事件 都是由主线程来驱动完成的。入口是 aeMain() 方法,redis 服务启动后,将会一直在此方法中轮训监听事件。
到这,你可能会说,一个线程做这么多事还不得累死?
是的,一个线程串行做这么多事情确实存在很大风险,对于一些耗时长的操作,可能严重拖垮 redis 吞吐量,所以,redis 又搞了一些后台线程来专门处理这些耗时操作。
2. 单线程+后台线程
特别需要注意的是!!!我们经常听说的 redis 单线程模型(上图),其实仅仅指的是对客户端的请求处理,但其实,还有一些工作由对应特殊的线程来完成。
在 redis 6.0 以前,完整的 redis 线程模型是 主线程(1个)+ 后台线程(三个) ,我画了一张图,你可以看下: 三个后台线程分别处理:
- close_file:关闭 AOF、RDB 等过程中产生的大临时文件
- aof_fsync:将追加至 AOF 文件的数据刷盘(一般情况下 write 调用之后,数据被写入内核缓冲区,通过 fsync 调用才将内核缓冲区的数据写入磁盘)
- lazy_free:惰性释放大对象
这三个线程有一个共同特点,都是用来处理耗时长的操作,也印证了我们常说的,专业的人做专业的事 。
3. 多线程+后台线程
咱们继续将时钟往后拨到 redis6.0 版本,此版本出现了一种新的 IO 线程 ---- 多线程。我同样也画了张图,你可以看下:
我们先思考下,引进 IO 线程解决了哪些问题?
在之前系列文章中,我们提到过,通常情况下,redis 性能在于网络和内存,而不是 CPU 。针对 网络,一般是处理速度较慢的问题;针对内存,一般是指物理空间的限制。
所以到这,你应该很清楚了,究竟哪个模块需要引入多线程来处理?
没错,就是网络模块,因此,引入的这些线程也叫 IO线程;由于主线程也会处理网络模块的工作,因此,主线程习惯上也叫做主IO线程。
网络模块有接收连接、IO读(包括数据解析)、IO写等操作;其中,主线程负责接收新连接,然后分发到 IO线程进行处理(主线程也参与)。我画了张图,你可以看下: 默认情况下,只针对写操作启用IO线程,如果读操作也需要的话,需要在配置文件中进行配置:
int io_threads_do_reads;
二、原理
前面我们已经讲到,redis 6.0 出现的多线程主要致力于解决网络模块的瓶颈,通过使用多线程处理读/写客户端数据,进而分担主IO线程的压力。
值得注意的是,命令处理仍然是单线程执行。
为了更好的帮助你理解,我们再来回顾下,请求处理流程:
接下来,我们将结合源码,看看多线程如何大展身手~
1. 初始化
在 server.c#main 启动的最后阶段,调用方法 InitServerLast,我们来看看其实现:
void InitServerLast() {
bioInit();
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
其中,initThreadedIO 调用正是初始化 IO 线程:
void initThreadedIO(void) {
server.io_threads_active = 0;
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
for (int i = 0; i < server.io_threads_num; i++) {
io_threads_list[i] = listCreate();
if (i == 0) continue;
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]);
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
值得注意的是,i == 0 表示主IO线程!!!
我们定位到 for 循环中的 pthread_create 方法 ---- 真正的创建线程的方法,并指定线程的执行方法体 IOThreadMain — 主角。
void *IOThreadMain(void *myid) {
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
makeThreadKillable();
while(1) {
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}
if (getIOPendingCount(id) == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(getIOPendingCount(id) != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
setIOPendingCount(id, 0);
if (tio_debug) printf("[%ld] Done\n", id);
}
}
如果你熟悉 java 的话,应该知道 IOThreadMain 就相当于 runable 的具体实现。核心逻辑在于 while(1) 无限循环中。
从源码中看到,IO 线程是从 io_threads_list 队列(或者说列表)获取待处理的客户端,并根据操作类型选择具体的执行逻辑。
看到这,你应该就豁然开朗了,这就是典型的 生产者-消费者模型 ,主IO线程负责投递事件,IO 线程负责消费事件(主线程也参与)。
从 IO 线程执行主体中,我们看到,通过 writeToClient 处理写请求, readQueryFromClient 处理读请求,我们接下来将具体分析这两种情况~
2. 多线程读
一般情况下,当我们通过多路复用监听到客户端数据准备就绪时,将会在主事件循环中,轮询处理这批就绪的客户端。
从读取数据 => 数据解析 => 命令执行 => 写会客户端缓冲区 => 待下一轮主事件循环到来时,将客户端缓冲数据写会客户端。
在多线程模式下(假设配置了多线程读),上述流程有了些许变化:读取数据 => 数据解析 模块处理操作,将均分给多个 IO 线程处理(包括主IO线程)。
所有就绪客户端暂存至队列:
struct redisServer {
...
list *clients_pending_read;
...
}
1)入队:
具体代码上的体现是,postponeClientRead 返回 1 之后,将直接退出。
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
if (postponeClientRead(c)) return;
...
}
多线程读开启时,函数 postponeClientRead 是关键:
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
可以看到,当我们开启 读 IO 多线程配置,将直接将该客户端添加至队列中,等待进行分配(下一轮 eventloop 循环)。
2)分配:
在新一轮 eventloop 循环,通过 IO 多路复用查询之前(这一步通常是阻塞等待,因此,也常称为阻塞操作),会调用 beforeSleep 处理一些客户端的操作,其中就包括多线程读取客户端数据 ,刷新客户端缓存数据至客户端 。
来看看 handleClientsWithPendingReadsUsingThreads 方法:
int handleClientsWithPendingReadsUsingThreads(void) {
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
...
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
...
return processed;
}
主要处理:
- 将待处理客户端(clients_pending_read)均分至各 IO 线程对应队列中(主IO线程参与均分)
- 通知等待中的 IO 线程
- 主IO线程处理部分客户端
- 等待所有 IO 线程处理结束
简单总结以上两点:
- 当多线程读开启,并且多线程处于激活状态,客户端暂存于队列;反之直接通过 readQueryFromClient 进行处理。
- 暂存于队列中的客户端,会在下一次 eventloop 中,before sleep 之前,分发至IO线程各自的队列中处理
再次强调,主IO线程也参与处理。
3. 多线程写:
同样的,客户端响应数据也是先写到队列:
struct redisServer {
...
list *clients_pending_write;
...
}
从处理时机上看,多线程 写与读 都是在 beforeSleep 中被触发的,写操作是通过 handleClientsWithPendingWritesUsingThreads 完成:
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0;
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
if (!server.io_threads_active) startThreadedIO();
...
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
...
return processed;
}
该方法主要做了几件事情:
- 如果没有启用多线程写,或者仅有少量客户端,写操作就直接由主IO线程来完成。
- 激活IO线程(当待客户端较少时,会挂起IO线程 ---- 锁等待)
- 通知等待中的线程(共享变量值 > 0 时,表示有待处理任务)
- 主线程也要处理部分客户端。
- 等待所有线程完成工作。
值得注意的是,当 待处理客户端 过少时,redis 认为没必要采用多线程来共同处理,因此,完全交给主IO线程来完成:
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (server.io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
可见,当 待处理客户端 < 2倍IO线程数 时,将由主 IO 线程完成所有客户端数据刷回。
三、配置
redis 默认情况下不会开启多线程处理,官方也建议,除非性能达到瓶颈,否则没必要开启多线程。
配置多少合适?
官方文档 redis.conf 中介绍有:
By default threading is disabled, we suggest enabling it only in machines
that have at least 4 or more cores, leaving at least one spare core.
Using more than 8 threads is unlikely to help much. We also recommend using
threaded I/O only if you actually have performance problems, with Redis
instances being able to use a quite big percentage of CPU time, otherwise
there is no point in using this feature.
So for instance if you have a four cores boxes, try to use 2 or 3 I/O
threads, if you have a 8 cores, try to use 6 threads. In order to
enable I/O threads use the following configuration directive:
CPU 4 核以上,才考虑开启多线程,其中:
- 4 核开启 2 - 3 个 IO 线程
- 8 核 开启 6 个 IO 线程
- 超过 8 个 IO 线程,性能提升已经不大
值得注意的是,以上的 IO 线程其实包含了主 IO 线程。
配置:
开启多线程:配置 io-thread 即可。io-thread = 1 表示只使用主 IO 线程
io-threads 4
开启之后,默认写操作会通过多线程来处理,而读操作则不会。
如果读操作也想要开启多线程,则需要配置:
io-threads-do-reads yes
总结
本文从 redis 架构演进开始讲起,从单线程模型 => 单线程 + 后台线程 => 多线程 + 后台线程 演进。
每一次演进,都是为了解决某一类特殊问题;后台线程的出现,解决了一些耗时长的重操作。同样,多线程的出现,解决了网络模块的性能瓶颈。
回到开篇问题:为什么命令执行为什么不采用多线程?
- 使用多线程会提升复杂度,对于 redis 这种内存数据库,代价太高
- 一般情况下,redis 的瓶颈在于网络模块和内存,而非 CPU
- 可以在一台机器上部署多个实例(集群模式)
- 复杂(慢)命令可以通过 redis module 解决
相关文档:
|