笔者Redis事务使用相关文章链接:Redis 事务机制深入浅出、Redis WATCH事务监视机制与回滚。
前言
本篇文章将从源码角度分析整个Redis事务执行的流程,包括MULTI 、EXEC 、DISCARD 、WATCH 命令的源码实现以及相关数据结构。
本文源码版本为Redis 5.0,文中涉及到的源码均可在server.h、server.c以及multi.c三个文件中找到。
源码阅读不易,如出现纰漏或理解错误还望指正。
事务的执行流程
首先明确一个问题,在Redis中,服务器接受到任何命令后都不会立即执行,而是首先检查客户端内若干状态,这是事务实现的重要一环。
对于MULTI 命令,实际上就是修改了客户端内的flags属性,该属性为int类型,专门用于标识状态,采用位存储(每一位的0和1对应某一状态的开启或关闭),MULTI指令执行后,会将客户端的状态修改为事务状态。
对于非事务相关命令,自然在执行前会先检查状态,若发现当前客户端处于事务关闭状态,则直接执行命令,若发现当前客户端出于事务开启状态,则会加入到事务相关的命令队列中等待执行。
对于EXEC 命令,在执行命令前会先检查当前客户端是否有监视某数据库键,若没有或所监视键未被修改过,则按序从队列中取出储存的命令并依次执行,队列中全部命令执行完毕后修改事务状态,否则放弃当前事务执行。
对于DISCARD 命令,放弃当前事务,实际上的操作是释放储存命令的队列,并修改事务状态。
对于WATCH 命令,将参数键加入到一个字典中,随后在执行EXEC 命令时会检查这些被监控的数据库键是否被修改过,被修改过则放弃执行事务。监视的周期为一个事务,即无论事务最终以哪种条件结束,监视都会随之失效。
源码详解
数据结构
首先先来察看客户端的数据结构定义,由于本文只关注事务执行流程,因此只保留相关属性,其余省略。
redisDb源码如下:
typedef struct redisDb {
dict *watched_keys;
} redisDb;
相关属性只有watched_keys,这是一个字典类型的属性,key为被监视的键,value为监视该键的客户端名。该属性在WATCH 监视时用于判断当前客户端是否监视了某键。
客户端源码如下:
typedef struct client {
int flags;
multiState mstate;
list *watched_keys;
} client;
flag属性
主要涉及到两个属性,首先介绍flags属性,正如前文提及,flags属性专门作为标识位,用于标识当前客户端的各种状态,具体的实现是,每一位对于一个特殊的状态,int型具有4字节32位,理论上可以存储32个只要两个状态的标识位,事实上源码显示Redis使用了其中的28位,对于flags的每一位,Redis都对他们进行特殊定义。
flags状态的宏定义如下,与client结构体相同,此处涉及到的状态非常多,此处只列举与事务相关的状态:
#define CLIENT_MULTI (1<<3)
#define CLIENT_DIRTY_CAS (1<<5)
#define CLIENT_DIRTY_EXEC (1<<12)
其中可见flags中的低第4位用于标识当前客户端是否处于事务开启状态、低第6位用于标识WATCH 监视的键是否有修改、第13位用于标识指令入队时是否存在错误,在后文的其他代码部分中还会见到这三个变量的身影。
multiState属性
multiState数据结构源码如下:
typedef struct multiState {
multiCmd *commands;
int count;
int cmd_flags;
int minreplicas;
time_t minreplicas_timeout;
} multiState;
multiCmd数据结构如下:
typedef struct multiCmd {
robj **argv;
int argc;
struct redisCommand *cmd;
} multiCmd;
MULTI执行流程
MULTI 命令非常简单且容易理解,实际上只是修改了flags标识位而已,对应的函数如下:
void multiCommand(client *c) {
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
c->flags |= CLIENT_MULTI;
addReply(c,shared.ok);
}
事务开启后普通命令插入流程
在服务器端接收到客户端命令请求后,会经历四个阶段,分别是
- 命令读取
- 命令解析
- 命令执行
- 结果返回
指令检查事务是否开启是在命令执行阶段完成的,其对应函数processCommand,同样非关键部分省略,函数源码如下:
int processCommand(client *c) {
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
}
return C_OK;
}
随后是实现入队操作的函数queueMultiCommand,函数源码如下:
void queueMultiCommand(client *c) {
multiCmd *mc;
int j;
c->mstate.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1));
mc = c->mstate.commands+c->mstate.count;
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = zmalloc(sizeof(robj*)*c->argc);
memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
for (j = 0; j < c->argc; j++)
incrRefCount(mc->argv[j]);
c->mstate.count++;
c->mstate.cmd_flags |= c->cmd->flags;
}
EXEC执行流程
void execCommand(client *c) {
int j;
robj **orig_argv;
int orig_argc;
struct redisCommand *orig_cmd;
int must_propagate = 0;
int was_master = server.masterhost == NULL;
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
shared.nullmultibulk);
discardTransaction(c);
goto handle_monitor;
}
if (!server.loading && server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
{
addReplyError(c,
"Transaction contains write commands but instance "
"is now a read-only slave. EXEC aborted.");
discardTransaction(c);
goto handle_monitor;
}
unwatchAllKeys(c);
orig_argv = c->argv;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyMultiBulkLen(c,c->mstate.count);
for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->cmd = c->mstate.commands[j].cmd;
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
execCommandPropagateMulti(c);
must_propagate = 1;
}
call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;
discardTransaction(c);
if (must_propagate) {
int is_master = server.masterhost == NULL;
server.dirty++;
if (server.repl_backlog && was_master && !is_master) {
char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
feedReplicationBacklog(execcmd,strlen(execcmd));
}
}
handle_monitor:
if (listLength(server.monitors) && !server.loading)
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
DISCARD执行流程
DISCARD实际做了三件事,修改客户端flags的MULTI标识位,释放当前客户端储存的命令数组并重置初始状态,最后清空当前客户端下的所有监视,函数调用链如下:
void discardCommand(client *c) {
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"DISCARD without MULTI");
return;
}
discardTransaction(c);
addReply(c,shared.ok);
}
void discardTransaction(client *c) {
freeClientMultiState(c);
initClientMultiState(c);
c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
unwatchAllKeys(c);
}
void initClientMultiState(client *c) {
c->mstate.commands = NULL;
c->mstate.count = 0;
c->mstate.cmd_flags = 0;
}
void freeClientMultiState(client *c) {
int j;
for (j = 0; j < c->mstate.count; j++) {
int i;
multiCmd *mc = c->mstate.commands+j;
for (i = 0; i < mc->argc; i++)
decrRefCount(mc->argv[i]);
zfree(mc->argv);
}
zfree(c->mstate.commands);
}
WATCH执行流程
关于被监视键的主要信息,存储在DB中,信息是单一数据库级别的。如文章开头的数据结构中所示,watch_keys是一个dict字典类型,字典中的每个key为当前数据库中被监视的键,value为正在监视该键的客户端。
添加监视键
调用链如下:
void watchCommand(client *c) {
int j;
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"WATCH inside MULTI is not allowed");
return;
}
for (j = 1; j < c->argc; j++)
watchForKey(c,c->argv[j]);
addReply(c,shared.ok);
}
typedef struct watchedKey {
robj *key;
redisDb *db;
} watchedKey;
void watchForKey(client *c, robj *key) {
list *clients = NULL;
listIter li;
listNode *ln;
watchedKey *wk;
listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
wk = listNodeValue(ln);
if (wk->db == c->db && equalStringObjects(key,wk->key))
return;
}
clients = dictFetchValue(c->db->watched_keys,key);
if (!clients) {
clients = listCreate();
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
listAddNodeTail(clients,c);
wk = zmalloc(sizeof(*wk));
wk->key = key;
wk->db = c->db;
incrRefCount(key);
listAddNodeTail(c->watched_keys,wk);
}
监视触发
修改操作命令执行前会检查被修改键的监视状态。检查函数如下:
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
if (dictSize(db->watched_keys) == 0) return;
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
listRewind(clients,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags |= CLIENT_DIRTY_CAS;
}
}
取消所有监视
可以使用UNWATCH 命令主动取消当前客户端的所有监视,对应函数如下:
void unwatchCommand(client *c) {
unwatchAllKeys(c);
c->flags &= (~CLIENT_DIRTY_CAS);
addReply(c,shared.ok);
}
实际取消监视的函数为unwatchAllKeys
void unwatchAllKeys(client *c) {
listIter li;
listNode *ln;
if (listLength(c->watched_keys) == 0) return;
listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
list *clients;
watchedKey *wk;
wk = listNodeValue(ln);
clients = dictFetchValue(wk->db->watched_keys, wk->key);
serverAssertWithInfo(c,NULL,clients != NULL);
listDelNode(clients,listSearchKey(clients,c));
if (listLength(clients) == 0)
dictDelete(wk->db->watched_keys, wk->key);
listDelNode(c->watched_keys,ln);
decrRefCount(wk->key);
zfree(wk);
}
}
|