• Redis 源码简洁剖析 12 - 一条命令的处理过程


    命令的处理过程#

    Redis server 和一个客户端建立连接后,会在事件驱动框架中注册可读事件——客户端的命令请求。命令处理对应 4 个阶段:

    • 命令读取:对应 readQueryFromClient 函数
    • 命令解析:对应 processInputBuffer 函数
    • 命令执行:对应 processCommand 函数
    • 结果返回:对应 addReply 函数

    命令读取#

    readQueryFromClient 函数在之前的文章中分析过,主要流程就是:

    1. 调用 connRead 函数读取命令
    2. 将命令追加到同步缓冲区,修改同步偏移量
    3. 调用 processInputBuffer 函数进行命令解析
    void readQueryFromClient(connection *conn) {
        // 从 connection 结构中获取客户端
        client *c = connGetPrivateData(conn);
        ……
        nread = connRead(c->conn, c->querybuf+qblen, readlen);
        ……
    
        /* There is more data in the client input buffer, continue parsing it
         * in case to check if there is a full command to execute. */
         processInputBuffer(c);
    }
    

    命令解析#

    processInputBuffer 函数会调用 processCommandAndResetClient 函数,其中又会调用 processCommand 函数。

    void processInputBuffer(client *c) {
    
        while(c->qb_pos < sdslen(c->querybuf)) {
            ……
    
            // 根据客户端输入缓冲区的命令开头字符判断命令类型
            if (!c->reqtype) {
                // 符合 RESP 协议的命令
                if (c->querybuf[c->qb_pos] == '*') {
                    c->reqtype = PROTO_REQ_MULTIBULK;
                } else {
                    // 管道类型命令
                    c->reqtype = PROTO_REQ_INLINE;
                }
            }
    
            // 对于管道类型命令,调用 processInlineBuffer 函数解析
            if (c->reqtype == PROTO_REQ_INLINE) {
                if (processInlineBuffer(c) != C_OK) break;
                ……
            // 对于 RESP 协议命令,调用 processMultibulkBuffer 函数解析
            } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
                if (processMultibulkBuffer(c) != C_OK) break;
            }
            ……
    
            if (c->argc == 0) {
                resetClient(c);
            } else {
                ……
    
                // 可以开始执行命令了
                if (processCommandAndResetClient(c) == C_ERR) {
                    return;
                }
            }
        }
        ……
    }
    
    int processCommandAndResetClient(client *c) {
        int deadclient = 0;
        client *old_client = server.current_client;
        server.current_client = c;
        if (processCommand(c) == C_OK) {
            commandProcessed(c);
        }
        if (server.current_client == NULL) deadclient = 1;
        /*
         * Restore the old client, this is needed because when a script
         * times out, we will get into this code from processEventsWhileBlocked.
         * Which will cause to set the server.current_client. If not restored
         * we will return 1 to our caller which will falsely indicate the client
         * is dead and will stop reading from its buffer.
         */
        server.current_client = old_client;
        /* performEvictions may flush slave output buffers. This may
         * result in a slave, that may be the active client, to be
         * freed. */
        return deadclient ? C_ERR : C_OK;
    }
    

    命令执行#

    processCommand 函数是在 server.c 文件中实现的:

    • 调用 moduleCallCommandFilters 函数,将 Redis 命令替换成 module 想要替换的命令
    • 当前命令是否为 quit 命令,并进行相应处理
    • 调用 lookupCommand 函数,在全局变量 server 的 commands 成员变量中查找相关命令

    commands 是一个哈希表:

    struct redisServer {
       ...
       dict *commands; 
       ...
    }
    

    其是在 initServerConfig 函数中初始化的:

    void initServerConfig(void) {
        ...
        server.commands = dictCreate(&commandTableDictType,NULL);
        ...
        populateCommandTable();
        ...
    }
    

    populateCommandTable 函数中使用了 redisCommandTable 数组:

    void populateCommandTable(void) {
        int j;
        int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
    
        for (j = 0; j < numcommands; j++) {
            struct redisCommand *c = redisCommandTable+j;
            int retval1, retval2;
    
            /* Translate the command string flags description into an actual
             * set of flags. */
            if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)
                serverPanic("Unsupported command flag");
    
            c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */
            retval1 = dictAdd(server.commands, sdsnew(c->name), c);
            /* Populate an additional dictionary that will be unaffected
             * by rename-command statements in redis.conf. */
            retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
            serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
        }
    }
    

    redisCommandTable 数组是在 server.c 中定义的,记录了当前命令所对应的实现函数。具体见:https://github.com/LjyYano/redis/blob/unstable/src/server.c

    struct redisCommand redisCommandTable[] = {
        {"module",moduleCommand,-2,
         "admin no-script",
         0,NULL,0,0,0,0,0,0},
    
        {"get",getCommand,2,
         "read-only fast @string",
         0,NULL,1,1,1,0,0,0},
    
        {"getex",getexCommand,-2,
         "write fast @string",
         0,NULL,1,1,1,0,0,0},
    
         ……
    };
    

    redisCommand 结构如下:

    struct redisCommand {
        char *name;
        redisCommandProc *proc;
        int arity;
        char *sflags;   /* Flags as string representation, one char per flag. */
        uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
        /* Use a function to determine keys arguments in a command line.
         * Used for Redis Cluster redirect. */
        redisGetKeysProc *getkeys_proc;
        /* What keys should be loaded in background when calling this command? */
        int firstkey; /* The first argument that's a key (0 = no keys) */
        int lastkey;  /* The last argument that's a key */
        int keystep;  /* The step between first and last key */
        long long microseconds, calls, rejected_calls, failed_calls;
        int id;     /* Command ID. This is a progressive ID starting from 0 that
                       is assigned at runtime, and is used in order to check
                       ACLs. A connection is able to execute a given command if
                       the user associated to the connection has this command
                       bit set in the bitmap of allowed commands. */
    };
    

    再回到 processCommand 函数,断当前客户端是否有 CLIENT_MULTI 标记,如果有的话,就表明要处理的是 Redis 事务的相关命令,所以它会按照事务的要求,调用 queueMultiCommand 函数将命令入队保存,等待后续一起处理。而如果没有,processCommand 函数就会调用 call 函数来实际执行命令了。

    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&
        c->cmd->proc != resetCommand)
    {
        // 将命令入队保存,后续一起处理
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        // 调用 call 函数执行命令
        call(c,CMD_CALL_FULL);
        ……
    }
    

    下面以最简单的 get 命令为例:

    {"get",getCommand,2,
        "read-only fast @string",
        0,NULL,1,1,1,0,0,0},
    

    对应的实现函数是 getCommand,其调用了 getGenericCommand 函数:

    void getCommand(client *c) {
        getGenericCommand(c);
    }
    
    int getGenericCommand(client *c) {
        robj *o;
    
        if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)
            return C_OK;
    
        if (checkType(c,o,OBJ_STRING)) {
            return C_ERR;
        }
    
        addReplyBulk(c,o);
        return C_OK;
    }
    

    其最终会调用到 db.c 文件中的 lookupKeyReadWithFlags 函数:

    robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
        robj *val;
    
        if (expireIfNeeded(db,key) == 1) {
            /* If we are in the context of a master, expireIfNeeded() returns 1
             * when the key is no longer valid, so we can return NULL ASAP. */
            if (server.masterhost == NULL)
                goto keymiss;
    
            /* However if we are in the context of a slave, expireIfNeeded() will
             * not really try to expire the key, it only returns information
             * about the "logical" status of the key: key expiring is up to the
             * master in order to have a consistent view of master's data set.
             *
             * However, if the command caller is not the master, and as additional
             * safety measure, the command invoked is a read-only command, we can
             * safely return NULL here, and provide a more consistent behavior
             * to clients accessing expired values in a read-only fashion, that
             * will say the key as non existing.
             *
             * Notably this covers GETs when slaves are used to scale reads. */
            if (server.current_client &&
                server.current_client != server.master &&
                server.current_client->cmd &&
                server.current_client->cmd->flags & CMD_READONLY)
            {
                goto keymiss;
            }
        }
        val = lookupKey(db,key,flags);
        if (val == NULL)
            goto keymiss;
        server.stat_keyspace_hits++;
        return val;
    
    keymiss:
        if (!(flags & LOOKUP_NONOTIFY)) {
            notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
        }
        server.stat_keyspace_misses++;
        return NULL;
    }
    

    会调用到 lookupKey 函数:

    robj *lookupKey(redisDb *db, robj *key, int flags) {
        dictEntry *de = dictFind(db->dict,key->ptr);
        if (de) {
            robj *val = dictGetVal(de);
    
            /* Update the access time for the ageing algorithm.
             * Don't do it if we have a saving child, as this will trigger
             * a copy on write madness. */
            if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
                if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
                    updateLFU(val);
                } else {
                    val->lru = LRU_CLOCK();
                }
            }
            return val;
        } else {
            return NULL;
        }
    }
    

    结果返回#

    addReply 函数,主要是调用 prepareClientToWrite 函数,进而调用到 clientInstallWriteHandler 函数,将待写回客户端加入到全局变量 server 的 clients_pending_write 列表。最终调用 _addReplyToBuffer 函数,将要返回的结果添加到客户端的输出缓冲区。

    /* Add the object 'obj' string representation to the client output buffer. */
    void addReply(client *c, robj *obj) {
        if (prepareClientToWrite(c) != C_OK) return;
    
        if (sdsEncodedObject(obj)) {
            if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
                _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
        } else if (obj->encoding == OBJ_ENCODING_INT) {
            /* For integer encoded strings we just convert it into a string
             * using our optimized function, and attach the resulting string
             * to the output buffer. */
            char buf[32];
            size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
            if (_addReplyToBuffer(c,buf,len) != C_OK)
                _addReplyProtoToList(c,buf,len);
        } else {
            serverPanic("Wrong obj->encoding in addReply()");
        }
    }
    

    参考链接#

    Redis 源码简洁剖析系列#

    最简洁的 Redis 源码剖析系列文章

    Java 编程思想-最全思维导图-GitHub 下载链接,需要的小伙伴可以自取~

    原创不易,希望大家转载时请先联系我,并标注原文链接。

  • 相关阅读:
    将OSGB格式数据转换为3d tiles的格式
    作弊(c++题解)
    Android Runtime (ART) 和 Dalvik
    互联网获客经验分享(一)
    十大排序算法之——归并排序算法(Java实现)及思路讲解
    你了解Java的内部类吗
    TAP 系列文章8 | TAP 学习中心——通过动手教程来学习
    ssm+vue+elementUI 高校普法系统-#毕业设计
    虚拟机独立 IP 配置
    SAP CRM Fiori 应用 My Opportunity 的分页读取逻辑,在 GM4 - AG3 无法正常工作
  • 原文地址:https://www.cnblogs.com/510602159-Yano/p/15903414.html