redis6运行架构图
redis6新特性中加入的最大的特性就是加入了多线程来处理网络的读写从而来提高响应性能,本文就简单的剖析一下redis6的运行的机制,学习一下redis6的多线程机制是如何在事件驱动中执行的。下图就是redis6概括的运行的逻辑。
redis6任务分类
在redis6中以事件驱动的工作流程中主要包括了三种类型的任务执行。
- 事件任务,即在每次进行网络epoll之前或者完成之后都会立马执行,例如多线程处理网络数据的读写等工作。
- 事件驱动处理程序任务,即网络IO的读写事件通过epoll来触发执行,获取可执行的网络事件。
- 定时任务,在处理完成任务1和任务2之后,才会判断是否时间任务触发。
任务1和任务3的区别在于,时间任务是按照时间维度来进行执行,但是任务1是再每一次事件驱动中都会执行,所以执行频率上任务1比任务3会高一些(理论上)。
redis6多线程
概述学习一下多线程的工作机制。
多线程初始化
redis6中多线程的初始化主要是在initThreadedIO函数中。
void initThreadedIO(void) {
server.io_threads_active = 0;
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
for (int i = 0; i < server.io_threads_num; i++) {
io_threads_list[i] = listCreate();
if (i == 0) continue;
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]);
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
事件驱动的回调函数
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
主要通过aeSetBeforeSleepProc和aeSetAfterSleepProc注册beforeSleep和afterSleep两个回调函数。
整个事件驱动的工作驱动如下。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
...
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
numevents = aeApiPoll(eventLoop, tvp);
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
...
}
...
}
...
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
通过aeProcessEvents在执行epoll之前执行beforeSleep回调在epoll之后执行afterSleep回调。
多线程的执行的回调函数就位于beforeSleep的回调中handleClientsWithPendingReadsUsingThreads和handleClientsWithPendingWritesUsingThreads。
handleClientsWithPendingReadsUsingThreads多线程读取网络数据
int handleClientsWithPendingReadsUsingThreads(void) {
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
if (clientsArePaused()) continue;
if (processPendingCommandsAndResetClient(c) == C_ERR) {
continue;
}
processInputBuffer(c);
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
clientInstallWriteHandler(c);
}
server.stat_io_reads_processed += processed;
return processed;
}
修改io_threads_op来标识当前的线程池是进行读操作,将每个线程负责的队列的判断计数标记成待处理的队列长度,等所有的任务都处理完成之后,主线程就挨个处理每个解析好的命令,从而在执行层面保持了命令执行的原子性。
handleClientsWithPendingWritesUsingThreads多线程写处理
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0;
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
if (!server.io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
server.stat_io_writes_processed += processed;
return processed;
}
多线程写的时候会将io_threads_op标记为写操作,此时线程池就都通过writeToClient来将回复给客户端的数据,在回复给客户端的时候只需要将所有数据全部写出即可。
IOThreadMain线程的工作流程
void *IOThreadMain(void *myid) {
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
makeThreadKillable();
while(1) {
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
工作流程相对比较简单,主要是通过一个for循环来判断线程是否需要开始工作,在循环等待中通过锁来进行线程的启动与停止,从而避免for循环中高消耗cpu,根据不同的执行状态线程进行读写操作。
那数据是如何加到队列的呢。
在读事件到来时,执行了如下代码。
void readQueryFromClient(connection *conn) {
...
if (postponeClientRead(c)) return;
...
}
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!clientsArePaused() &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
...
等到执行beforSleep的时候通过多线程进行处理。
当命令执行完成之后调用addRelpy等函数时,就进行如下流程。
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
...
}
...
int prepareClientToWrite(client *c) {
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;
if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
if ((c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
if (!c->conn) return C_ERR;
if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
clientInstallWriteHandler(c);
return C_OK;
}
至此,整个redis6的多线程架构就基本上明白了。
简单的用个python脚本来梳理一下redis6的多线程的工作流程如下。
import threading
import time
thread_nums = 3
work_nums = [0 for i in range(thread_nums)]
work_locks = [threading.Lock() for i in range(thread_nums)]
work_queue = []
for i in range(thread_nums):
work_queue.append([])
def worker(index):
while True:
for i in range(1000000):
if work_nums[index] != 0:
break
if work_nums[index] == 0:
print(" acquire ", index)
work_locks[index].acquire()
print(" release ", index)
work_locks[index].release()
continue
print(" start work ", index, work_queue[index])
while len(work_queue[index]):
work_queue[index].pop()
work_nums[index] = 0
for lock in work_locks:
lock.acquire()
for i in range(thread_nums):
t = threading.Thread(target=worker, args=(i, ))
t.start()
while True:
time.sleep(20)
for lock in work_locks:
lock.release()
for i in range(thread_nums):
work_queue[i] = ["index {0} value".format(i) for j in range(i+5)]
for i in range(thread_nums):
work_nums[i] = len(work_queue[i])
for lock in work_locks:
lock.acquire()
通过不同队列的锁来控制每个线程是否开始执行,先将队列的数据分发完成,然后再设置线程检查的数据值,此时就worker线程就开始执行。
总结
本文简单的概述了一下redis6的运行架构图,流程图中可能有很多细节都忽略掉了,并且描述也不一定正确如有错误请批评改正。redis6的多线程通过锁来控制运行,并通过队列的分发从而完成将任务分发到不同的队列来执行,提高网络解析读数据和数据写入的并发速度。由于本人才疏学浅,如有错误请批评指正。
|