IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 深入理解Redis 事务机制及执行流程源码解析 -> 正文阅读

[大数据]深入理解Redis 事务机制及执行流程源码解析

笔者Redis事务使用相关文章链接:Redis 事务机制深入浅出Redis WATCH事务监视机制与回滚

前言

本篇文章将从源码角度分析整个Redis事务执行的流程,包括MULTIEXECDISCARDWATCH命令的源码实现以及相关数据结构。

本文源码版本为Redis 5.0,文中涉及到的源码均可在server.h、server.c以及multi.c三个文件中找到。

源码阅读不易,如出现纰漏或理解错误还望指正。

事务的执行流程

首先明确一个问题,在Redis中,服务器接受到任何命令后都不会立即执行,而是首先检查客户端内若干状态,这是事务实现的重要一环。

对于MULTI命令,实际上就是修改了客户端内的flags属性,该属性为int类型,专门用于标识状态,采用位存储(每一位的0和1对应某一状态的开启或关闭),MULTI指令执行后,会将客户端的状态修改为事务状态。

对于非事务相关命令,自然在执行前会先检查状态,若发现当前客户端处于事务关闭状态,则直接执行命令,若发现当前客户端出于事务开启状态,则会加入到事务相关的命令队列中等待执行。

对于EXEC命令,在执行命令前会先检查当前客户端是否有监视某数据库键,若没有或所监视键未被修改过,则按序从队列中取出储存的命令并依次执行,队列中全部命令执行完毕后修改事务状态,否则放弃当前事务执行。

对于DISCARD命令,放弃当前事务,实际上的操作是释放储存命令的队列,并修改事务状态。

对于WATCH命令,将参数键加入到一个字典中,随后在执行EXEC命令时会检查这些被监控的数据库键是否被修改过,被修改过则放弃执行事务。监视的周期为一个事务,即无论事务最终以哪种条件结束,监视都会随之失效。

源码详解

数据结构

首先先来察看客户端的数据结构定义,由于本文只关注事务执行流程,因此只保留相关属性,其余省略。

redisDb源码如下:

typedef struct redisDb {
	// ...
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
	// ...
} redisDb;

相关属性只有watched_keys,这是一个字典类型的属性,key为被监视的键,value为监视该键的客户端名。该属性在WATCH监视时用于判断当前客户端是否监视了某键。


客户端源码如下:

typedef struct client {
	// ...
    int flags;              /* Client flags: CLIENT_* macros. */
    multiState mstate;      /* MULTI/EXEC state */
    // 与当前客户端监视键相关联 见下文WATCH部分
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
	// ...
} client;

flag属性

主要涉及到两个属性,首先介绍flags属性,正如前文提及,flags属性专门作为标识位,用于标识当前客户端的各种状态,具体的实现是,每一位对于一个特殊的状态,int型具有4字节32位,理论上可以存储32个只要两个状态的标识位,事实上源码显示Redis使用了其中的28位,对于flags的每一位,Redis都对他们进行特殊定义。

flags状态的宏定义如下,与client结构体相同,此处涉及到的状态非常多,此处只列举与事务相关的状态:

#define CLIENT_MULTI (1<<3)   /* This client is in a MULTI context */
#define CLIENT_DIRTY_CAS (1<<5) /* Watched keys modified. EXEC will fail. */
#define CLIENT_DIRTY_EXEC (1<<12)  /* EXEC will fail for errors while queueing */

其中可见flags中的低第4位用于标识当前客户端是否处于事务开启状态、低第6位用于标识WATCH监视的键是否有修改、第13位用于标识指令入队时是否存在错误,在后文的其他代码部分中还会见到这三个变量的身影。

multiState属性

multiState数据结构源码如下:

typedef struct multiState {
	// 实际存储命令的数组
    multiCmd *commands;     /* Array of MULTI commands */
    // 命令数组长度 用于记录当前事务有多少命令
    int count;              /* Total number of MULTI commands */
    int cmd_flags;          /* The accumulated command flags OR-ed together.
                               So if at least a command has a given flag, it
                               will be set in this field. */
    int minreplicas;        /* MINREPLICAS for synchronous replication */
    time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;

multiCmd数据结构如下:

typedef struct multiCmd {
	// 命令参数
    robj **argv;
    // 参数数量
    int argc;
    // 命令指针 指向实际的命令
    struct redisCommand *cmd;
} multiCmd;

MULTI执行流程

MULTI命令非常简单且容易理解,实际上只是修改了flags标识位而已,对应的函数如下:

void multiCommand(client *c) {
	// 判断当前客户端是否已经出于事务开启状态 Redis不允许事务嵌套
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
    // 通过|= 或运算 将flags中的对应标识位修改
    c->flags |= CLIENT_MULTI;
    // 随后返回熟悉的OK信息提示
    addReply(c,shared.ok);
}

事务开启后普通命令插入流程

在服务器端接收到客户端命令请求后,会经历四个阶段,分别是

  1. 命令读取
  2. 命令解析
  3. 命令执行
  4. 结果返回

指令检查事务是否开启是在命令执行阶段完成的,其对应函数processCommand,同样非关键部分省略,函数源码如下:

int processCommand(client *c) {
	// ...
    /* Exec the command */
    // 判断分为两部分 首先判断当前客户端的事务状态
    //				之后判断当前的命令是否为EXEC、DISCARD、MULTI或WATCH
    // 除了四个事务相关命令外,其他的命令均会进入if分支入队
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
    	// 调用入队函数
        queueMultiCommand(c);
        // 入队后返回熟悉的提示信息QUEUED
        addReply(c,shared.queued);
    } else {
        // ...
    }
    return C_OK;
}

随后是实现入队操作的函数queueMultiCommand,函数源码如下:

/* Add a new command into the MULTI commands queue */
void queueMultiCommand(client *c) {
    multiCmd *mc;
    int j;
	// 为实际存储指令的数组开辟新的内存空间
    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    // 使mc指针指向新元素
    mc = c->mstate.commands+c->mstate.count;
    // 设置命令、参数数量以及参数
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = zmalloc(sizeof(robj*)*c->argc);
    memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);
    // 记录命令数的属性+1
    c->mstate.count++;
    c->mstate.cmd_flags |= c->cmd->flags;
}

EXEC执行流程

void execCommand(client *c) {
    int j;
    robj **orig_argv;
    int orig_argc;
    struct redisCommand *orig_cmd;
    int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
    int was_master = server.masterhost == NULL;
	// 若客户端不处于事务开启状态 返回
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }

    /* Check if we need to abort the EXEC because:
     * 1) Some WATCHed key was touched.
     * 2) There was a previous error while queueing commands.
     * A failed EXEC in the first case returns a multi bulk nil object
     * (technically it is not an error but a special behavior), while
     * in the second an EXECABORT error is returned. */
	// 有三种情况需要放弃执行当前事务
	// 第一种是命令入队时存在错误 
		// 在Redis 2.6版本及以前不会放弃执行当前事务 而是不处理错误命令
		// 此时返回等于指令个nil
	// 第二种是当前客户端监视任一键已经被修改
		// 此时返回错误
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
        addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                  shared.nullmultibulk);
        // 放弃执行事务
        discardTransaction(c);
        goto handle_monitor;
    }

    /* If there are write commands inside the transaction, and this is a read
     * only slave, we want to send an error. This happens when the transaction
     * was initiated when the instance was a master or a writable replica and
     * then the configuration changed (for example instance was turned into
     * a replica). */
	// 第三种是事务中存在写事务但当前服务器只读
		// 此时返回错误
    if (!server.loading && server.masterhost && server.repl_slave_ro &&
        !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
    {
        addReplyError(c,
            "Transaction contains write commands but instance "
            "is now a read-only slave. EXEC aborted.");
        // 放弃执行事务
        discardTransaction(c);
        goto handle_monitor;
    }

    /* Exec all the queued commands */
    // 首先取消当前客户端监视的所有键
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
	
	// 防止命令执行过程中命令和命令参数被修改 执行前先备份
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
	// 开始遍历按需执行队列中的命令
    addReplyMultiBulkLen(c,c->mstate.count);
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;

        /* Propagate a MULTI request once we encounter the first command which
         * is not readonly nor an administrative one.
         * This way we'll deliver the MULTI/..../EXEC block as a whole and
         * both the AOF and the replication link will have the same consistency
         * and atomicity guarantees. */
		// 执行第一个命令时 为保证数据一致性 传播MULTI命令	
        if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
        	// 向从节点与AOP文件中添加MULTI命令
            execCommandPropagateMulti(c);
            // 保证只执行一次
            must_propagate = 1;
        }
		// 实际执行命令 同时会像从节点与AOF文件中添加
        call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);

        /* Commands may alter argc/argv, restore mstate. */
        // 命令执行时可能会修改命令以及命令参数
        // 修改时同步修改队列中的命令和命令参数 确保一致性 全部执行完毕时会用先前的备份还原
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    // 命令与命令参数还原
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    // 执行完毕后相当于隐式执行了一个DISCARD命令来恢复初始状态
    discardTransaction(c);

    /* Make sure the EXEC command will be propagated as well if MULTI
     * was already propagated. */
	// MULTI命令已经被传播时 同时要确保EXEC命令也被传播
    if (must_propagate) {
        int is_master = server.masterhost == NULL;
        server.dirty++;
        /* If inside the MULTI/EXEC block this instance was suddenly
         * switched from master to slave (using the SLAVEOF command), the
         * initial MULTI was propagated into the replication backlog, but the
         * rest was not. We need to make sure to at least terminate the
         * backlog with the final EXEC. */
        if (server.repl_backlog && was_master && !is_master) {
            char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
            feedReplicationBacklog(execcmd,strlen(execcmd));
        }
    }

handle_monitor:
    /* Send EXEC to clients waiting data from MONITOR. We do it here
     * since the natural order of commands execution is actually:
     * MUTLI, EXEC, ... commands inside transaction ...
     * Instead EXEC is flagged as CMD_SKIP_MONITOR in the command
     * table, and we do it here with correct ordering. */
    if (listLength(server.monitors) && !server.loading)
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}

DISCARD执行流程

DISCARD实际做了三件事,修改客户端flags的MULTI标识位,释放当前客户端储存的命令数组并重置初始状态,最后清空当前客户端下的所有监视,函数调用链如下:

void discardCommand(client *c) {
	// 同样首先判断客户但MULTI状态
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"DISCARD without MULTI");
        return;
    }
    // 调用函数放弃事务
    discardTransaction(c);
    // 返回
    addReply(c,shared.ok);
}
void discardTransaction(client *c) {
	// 释放命令数组
    freeClientMultiState(c);
    // 重置(初始化)命令数组
    initClientMultiState(c);
    // 将当前客户端flags中对应的三个相关标识位重置
    c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
    // 取消所有监视
    unwatchAllKeys(c);
}
/* Client state initialization for MULTI/EXEC */
void initClientMultiState(client *c) {
	// 全部设置为初始状态
    c->mstate.commands = NULL;
    c->mstate.count = 0;
    c->mstate.cmd_flags = 0;
}

/* Release all the resources associated with MULTI/EXEC state */
void freeClientMultiState(client *c) {
    int j;
	
    for (j = 0; j < c->mstate.count; j++) {
        int i;
        // 依次释放每条指令的参数
        multiCmd *mc = c->mstate.commands+j;
		
        for (i = 0; i < mc->argc; i++)
            decrRefCount(mc->argv[i]);
        zfree(mc->argv);
    }
    // 释放整个数组空间
    zfree(c->mstate.commands);
}

WATCH执行流程

关于被监视键的主要信息,存储在DB中,信息是单一数据库级别的。如文章开头的数据结构中所示,watch_keys是一个dict字典类型,字典中的每个key为当前数据库中被监视的键,value为正在监视该键的客户端。

添加监视键

调用链如下:

void watchCommand(client *c) {
    int j;
	// 判断事务状态,不允许在事务开启时启动监视
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    // 由于允许WATCH命令后传入若干参数 因此此处需要循环依次处理
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    addReply(c,shared.ok);
}
// 首先涉及到一个数据结构
typedef struct watchedKey {
    robj *key;
    redisDb *db;
} watchedKey;

void watchForKey(client *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    /* Check if we are already watching for this key */
    // 首先检查client中的watched_keys属性是否已经监视过当前键 是则直接返回
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    /* This key is not already watched in this DB. Let's add it */
    // 从当前DB的watched_keys取出所有监视键key的客户端
    clients = dictFetchValue(c->db->watched_keys,key);
    // clients为空说明当前键尚未未被任何客户端监视 添加
    if (!clients) {
    	// value值为一个链表 因为一个键key可以被多个客户端监视
        clients = listCreate();
        // key为被监视键 value为存放若干监视该键客户端的链表
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    // 在这个客户端链表末尾加上当前的客户端
    listAddNodeTail(clients,c);
    /* Add the new key to the list of keys watched by this client */
    // watch_key结构添加到表尾
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
}

监视触发

修改操作命令执行前会检查被修改键的监视状态。检查函数如下:

void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;
	// 首先检查DB中的被监视键数量
    if (dictSize(db->watched_keys) == 0) return;
    // 取出监视当前键的所有客户端
    clients = dictFetchValue(db->watched_keys, key);
    // clients为空自然说明没有任何客户端正在监视当前键key
    if (!clients) return;

    /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
    /* Check if we are already watching for this key */
    // 遍历查找
    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
		// 发现当前客户端的确在监视键key时 由于已经做了修改操作 修改标识位
		// 注意此处的触发条件为修改 无论是什么样的操作 值改变如何 都修改标识位
        c->flags |= CLIENT_DIRTY_CAS;
    }
}

取消所有监视

可以使用UNWATCH命令主动取消当前客户端的所有监视,对应函数如下:

void unwatchCommand(client *c) {
    unwatchAllKeys(c);
    // 取消监视后 标识位自然恢复初始状态
    c->flags &= (~CLIENT_DIRTY_CAS);
    addReply(c,shared.ok);
}

实际取消监视的函数为unwatchAllKeys

/* Unwatch all the keys watched by this client. To clean the EXEC dirty
 * flag is up to the caller. */
void unwatchAllKeys(client *c) {
    listIter li;
    listNode *ln;
	// 被监视的键为0 自然不用取消 直接返回
    if (listLength(c->watched_keys) == 0) return;
    // 开始遍历watched_keys字典
    // 查看每一个该DB中当前线程监视的键 即删除所有key中value存在的当前client
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        list *clients;
        watchedKey *wk;

        /* Lookup the watched key -> clients list and remove the client
         * from the list */
        wk = listNodeValue(ln);
        clients = dictFetchValue(wk->db->watched_keys, wk->key);
        serverAssertWithInfo(c,NULL,clients != NULL);
        listDelNode(clients,listSearchKey(clients,c));
        /* Kill the entry at all if this was the only client */
        // 只有当前client监视该键时 直接从dict中删除这一项
        if (listLength(clients) == 0)
            dictDelete(wk->db->watched_keys, wk->key);
        /* Remove this watched key from the client->watched list */
        listDelNode(c->watched_keys,ln);
        // 实际释放空间
        decrRefCount(wk->key);
        zfree(wk);
    }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-19 01:14:20  更:2022-02-19 01:14:36 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 0:07:17-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码