Skip to content

Latest commit

 

History

History
353 lines (332 loc) · 12.7 KB

transaction.md

File metadata and controls

353 lines (332 loc) · 12.7 KB

基于redis源码分支5.0

事务

redis事务能够保证一批命令原子性的执行,即所有命令要么都执行要么都不执行。并且在事务执行过程中不会为任何其他命令提供服务。 事务执行的完整流程主要有以下三阶段(除了正常流程,还有取消事务,监听键等流程):

  • 事务开启
  • 命令入队
  • 事务执行

事务开启

命令multi用于显示开启一个事务。命令格式如下:

MULTI

multi命令源码实现如下:

void multiCommand(client *c) {
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
    c->flags |= CLIENT_MULTI;
    addReply(c,shared.ok);
}

只是给当前client对象设置一个CLIENT_MULTI标志位,并且redis的事务不能嵌套,即不能在一个开启的事务内再次调用multi命令开启一个新事务。

命令入队

redis服务启动 小节介绍了redis服务端接收到客户端命令请求后会调用processCommand函数处理命令,对于事务相关的逻辑如下:

int processCommand(client *c) {
    ...
    /* Exec the command */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    }
    ...
}

如果当前客户端clientCLIENT_MULTI标志,且要执行的命令不是execdiscardmultiwatch,调用queueMultiCommand函数将命令入队。 命令入队函数queueMultiCommand实现如下:

/* Add a new command into the MULTI commands queue */
void queueMultiCommand(client *c) {
    multiCmd *mc;
    int j;
    // 重新分配存放命令对象 multiCmd 的数组空间
    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    // 初始化命令对象 mc
    mc = c->mstate.commands+c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = zmalloc(sizeof(robj*)*c->argc);
    memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);
    c->mstate.count++;
    c->mstate.cmd_flags |= c->cmd->flags;
}

其中每一个入队的命令都用multiCmd结构表示,multiCmd定于如下:

typedef struct multiCmd {
    robj **argv;  // 命令参数数组
    int argc;     // 命令参数个数
    struct redisCommand *cmd; // 解析后的命令对象
} multiCmd;

客户端client有一个multiState对象用于存储事务条件下入队的所有命令对象multiCmdmultiState对象定义如下:

typedef struct client {
    ...
    multiState mstate;      /* MULTI/EXEC state */
    ...
} client

typedef struct multiState {
    multiCmd *commands;     /* Array of MULTI commands */
    int count;              /* Total number of MULTI commands */
    int cmd_flags;          /* The accumulated command flags OR-ed together.
                               So if at least a command has a given flag, it
                               will be set in this field. */
    int minreplicas;        /* MINREPLICAS for synchronous replication */
    time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;

事务执行

命令exec用于执行所有入队命令并将命令返回值依次返回。命令格式如下:

EXEC

exec命令实现源码如下:

void execCommand(client *c) {
    ...
    /* Exec all the queued commands */
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyMultiBulkLen(c,c->mstate.count);
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;
        // 有不是只读命令且不是管理命令,需要将命令传播给 AOF 或者从节点,为了保证数据一致性
        if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
            execCommandPropagateMulti(c);
            must_propagate = 1;
        }
        // 执行单个命令并回复客户端(将回复内容添加到输出缓存中)
        call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);

        /* Commands may alter argc/argv, restore mstate. */
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    // 事务执行完后清理事务相关资源
    discardTransaction(c);
    ...
}

事务执行的核心逻辑就是按照命令入队顺序,依次执行命令并回复(回复内容添加到输出缓存)。

  • 在命令执行前,会unwatch当前客户端所有watch的键以避免CPU浪费,通过unwatchAllKeys函数实现;
  • 在所有命令执行后,会情况客户端的事务状态,通过discardTransaction函数实现;
    void discardTransaction(client *c) {
        // 释放客户端释放状态分配的内容
        freeClientMultiState(c);
        // 初始化事务状态属性
        initClientMultiState(c);
        // 清理客户端事务相关标注
        c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
        // unwatch 客户端监听的所有键
        unwatchAllKeys(c);
    }
    
    void freeClientMultiState(client *c) {
        int j;
    
        for (j = 0; j < c->mstate.count; j++) {
            int i;
            multiCmd *mc = c->mstate.commands+j;
    
            for (i = 0; i < mc->argc; i++)
                decrRefCount(mc->argv[i]);
            zfree(mc->argv);
        }
        zfree(c->mstate.commands);
    }
    
    void initClientMultiState(client *c) {
        c->mstate.commands = NULL;
        c->mstate.count = 0;
        c->mstate.cmd_flags = 0;
    }

事务执行前会有校验逻辑:

  • 检查客户端释放开启了事务;
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }
  • 被监听的键是否有改动或者命令参数等是否正确;
    // 如果监听的键有改动,会有 CLIENT_DIRTY_CAS 标志
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
        addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                  shared.nullmultibulk);
        discardTransaction(c);
        goto handle_monitor;
    }
  • 检查命令是否有写命令且节点不是主节点等;
    if (!server.loading && server.masterhost && server.repl_slave_ro &&
        !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
    {
        addReplyError(c,
            "Transaction contains write commands but instance "
            "is now a read-only slave. EXEC aborted.");
        discardTransaction(c);
        goto handle_monitor;
    }

取消事务

命令discard用于取消事务。命令格式如下:

DISCARD

discard命令源码如下:

void discardCommand(client *c) {
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"DISCARD without MULTI");
        return;
    }
    discardTransaction(c);
    addReply(c,shared.ok);
}

首先判断当前客户端client是否开启事务,也就是是否有CLIENT_MULTI标志,只有开启了事务后,才可以取消。事务的取消通过discardTransaction函数实现, discardTransaction函数详细介绍参考上一小节事务执行。最终,放弃一个事务时首先会将所有入队命令清空, 然后将client上事务相关的flags清空,最后将所有监听的keys取消监听。

监听键

命令watch用于实现一个乐观锁,在exec命令执行前监听任意数量的keys,并在exec命令执行时,检查被监听的键是否至少有一个被修改(被其它客户端修改), 如果有的话就放弃当前事务。watch命令只能在客户端进入事务状态之前执行。命令格式如下:

WATCH key [key ...]

watch命令的源码实现如下:

void watchCommand(client *c) {
    int j;

    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    // 回复 ok
    addReply(c,shared.ok);
}

watch命令必须在multi命令之前执行,对于每一个需要监听的键,都会调用watchForKey函数将键添加到对应的字典属性:

typedef struct watchedKey {
    robj *key;
    redisDb *db;
} watchedKey;

void watchForKey(client *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    // key 已经被监听,直接返回
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    /* This key is not already watched in this DB. Let's add it */
    clients = dictFetchValue(c->db->watched_keys,key);
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    listAddNodeTail(clients,c);
    /* Add the new key to the list of keys watched by this client */
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
}

客户端对象client有一个watched_keys链表用于存储监听的key

typedef struct client {
    ...
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
    ...
} client;

数据库对象redisDB有个watched_keys字典存储监听key相关,其中键对要监听的key,值为链表存放客户端对象client

typedef struct redisDb {
    ...
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    ...
} redisDb;

watchForKey函数逻辑主要有以下三步:

  • 检查监听的key是否已经在客户端对象c->watched_keys链表中,存在说明已经被监听,直接返回;
  • key没有被监听,将其添加到客户端对应数据库对象redisDBwatched_keys字典中;
  • key转为watchedKey对象,并添加到客户端对象c->watched_keys链表中;

redis数据库键空间进行修改后都会调用signalModifiedKey函数:

void signalModifiedKey(redisDb *db, robj *key) {
    touchWatchedKey(db,key);
}

进而touchWatchedKey函数被调用以通知监听的客户端:

/* "Touch" a key, so that if this key is being WATCHed by some client the
 * next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;

    if (dictSize(db->watched_keys) == 0) return;
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;

    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        // 添加监听此 key 的所有客户端标志 CLIENT_DIRTY_CAS
        c->flags |= CLIENT_DIRTY_CAS;
    }
}

touchWatchedKey函数会在数据库的watchedKey字典查找监听的key,并将监听此key的所有客户端标志增加CLIENT_DIRTY_CAS, 以标志此key被修改。

对于当前开启事务的客户端来说,在exec命令之前的命令都被入队,不会实际执行,所以signalModifiedKey函数不会调用。 watch命令的作用是防止其他客户端对数据库键的修改。

取消监听

命令unwatch用于取消watch命令对所有键的监控(针对当前客户端)。命令格式如下:

UNWATCH

unwatch命令源码实现如下:

void unwatchCommand(client *c) {
    // 删除客户端监听的所有 key
    unwatchAllKeys(c);
    // 清除 CLIENT_DIRTY_CAS 标志
    c->flags &= (~CLIENT_DIRTY_CAS);
    // 回复 ok
    addReply(c,shared.ok);
}

事务特性

  • exec命令开始执行入队命令之前取消事务或者存在命令错误,整个事务命令都不会执行;
  • exec命令开始执行命令,某个命令失败,redis不会终止事务,而是继续执行其他命令,也就是不支持事务回滚;
  • redis事务不是原子性的,在事务过程中,其他客户端可以修改某个key。所以redis引入watch命令实现乐观锁机制;
  • redis事务中命令是相互独立的,后执行的命令不能依赖前面执行命令结果;
  • redis事务中每一个命令都需要回复,浪费网络资源,因为因为事务是一个批量执行的命令,按理说回复最终结果一次就行;

基于上述存在的问题,redis引入了lua脚本。