• KeyDB源码解析一——网络模型


    KeyDB是Redis的多线程版本,在官网上QPS号称是Redis的5x以上,当然这是不限制CPU核数的情况下,Redis的单线程模型使得对于CPU的使用能力有限,KeyDB通过多线程的方式,尽可能的发挥多核CPU的潜力,提升系统的吞吐。

    线程模型

    在Redis中,由主线程处理客户端的连接和请求,同时在主线程中做一些后台任务,比如:过期键、内存淘汰等,如果是集群模式,还用来处理gossip消息。另外还有3个异步线程,用来删除键值对后,释放内存、fsync aof文件、关闭文件。
    KeyDB将Redis中的任务重新划分,其中处理客户端的连接和请求是提高吞吐的关键,由多个worker线程处理,用一个主线程处理后台任务和集群模式下的gossip消息。

    worker线程定义

    struct redisServerThreadVars {
      aeEventLoop *el = nullptr;
      socketFds ipfd;  /* TCP socket file descriptors */
      socketFds tlsfd; /* TLS socket file descriptors */
      int in_eval;     /* Are we inside EVAL? */
      int in_exec;     /* Are we inside EXEC? */
      std::vector<client *>
          clients_pending_write; /* There is to write or install handler. */
      list *unblocked_clients;   /* list of clients to unblock before next loop NOT
                                    THREADSAFE */
      list *clients_pending_asyncwrite;
      int cclients;
      int cclientsReplica = 0;
      client *current_client;       /* Current client */
      long fixed_time_expire = 0;   /* If > 0, expire keys against server.mstime. */
      client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */
      struct fastlock lockPendingWrite {
        "thread pending write"
      };
      char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
      long unsigned commandsExecuted = 0;
      GarbageCollectorCollection::Epoch gcEpoch;
      const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr;
      long long stat_total_error_replies; /* Total number of issued error replies (
                                             command + rejected errors ) */
      long long
          prev_err_count; /* per thread marker of exisiting errors during a call */
      bool fRetrySetAofEvent = false;
      bool modulesEnabledThisAeLoop =
          false; /* In this loop of aeMain, were modules enabled before
                    the thread went to sleep? */
      bool disable_async_commands =
          false; /* this is only valid for one cycle of the AE loop and is reset in
                    afterSleep */
      std::vector<client *> vecclientsProcess;
      dictAsyncRehashCtl *rehashCtl = nullptr;
    
      int getRdbKeySaveDelay();
    
    private:
      int rdb_key_save_delay = -1; // thread local cache
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    可以看出是把Redis中与处理客户端请求的部分拿出来,再加上一些KeyDB自己的特性。

    在KeyDB启动时,根据配置文件创建指定的worker线程数,这些线程同时负责监听客户端的连接,读取客户端发送的请求并处理。

    int main(int argc, char** argv) {
    	...
    	// 初始化工作线程
    	for (int iel = 0; iel < cserver.cthreads; ++iel) {
    	  initServerThread(g_pserver->rgthreadvar + iel, iel == IDX_EVENT_LOOP_MAIN);
    	}
    	...
    	// 设置worker线程的工作函数,并设置CPU亲和性
    	for (int iel = 0; iel < cserver.cthreads; ++iel) {
        pthread_create(g_pserver->rgthread + iel, &tattr, workerThreadMain,
                       (void *)((int64_t)iel));
        if (cserver.fThreadAffinity) {
    #ifdef __linux__
          cpu_set_t cpuset;
          CPU_ZERO(&cpuset);
          CPU_SET(iel + cserver.threadAffinityOffset, &cpuset);
          if (pthread_setaffinity_np(g_pserver->rgthread[iel], sizeof(cpu_set_t),
                                     &cpuset) == 0) {
            serverLog(LL_NOTICE, "Binding thread %d to cpu %d", iel,
                      iel + cserver.threadAffinityOffset + 1);
          }
    #else
          serverLog(LL_WARNING, "CPU pinning not available on this platform");
    #endif
        }
    }
    
    // 给redisServerThreadVars设置变量
    static void initServerThread(struct redisServerThreadVars *pvar, int fMain) {
      pvar->unblocked_clients = listCreate();
      pvar->clients_pending_asyncwrite = listCreate();
      pvar->ipfd.count = 0;
      pvar->tlsfd.count = 0;
      pvar->cclients = 0;
      pvar->in_eval = 0;
      pvar->in_exec = 0;
      // 每个工作线程创建一个eventloop
      pvar->el = aeCreateEventLoop(g_pserver->maxclients + CONFIG_FDSET_INCR);
      pvar->current_client = nullptr;
      pvar->fRetrySetAofEvent = false;
      if (pvar->el == NULL) {
        serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'",
                  strerror(errno));
        exit(1);
      }
      aeSetBeforeSleepProc(pvar->el, beforeSleep, AE_SLEEP_THREADSAFE);
      aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE);
    
      // 初始化线程锁
      fastlock_init(&pvar->lockPendingWrite, "lockPendingWrite");
    
      // 非主线程,初始化周期性函数
      if (!fMain) {
        if (aeCreateTimeEvent(pvar->el, 1, serverCronLite, NULL, NULL) == AE_ERR) {
          serverPanic("Can't create event loop timers.");
          exit(1);
        }
      }
    
      /* Register a readable event for the pipe used to awake the event loop
       * when a blocked client in a module needs attention. */
      if (aeCreateFileEvent(pvar->el, g_pserver->module_blocked_pipe[0],
                            AE_READABLE, moduleBlockedClientPipeReadable,
                            NULL) == AE_ERR) {
        serverPanic("Error registering the readable event for the module "
                    "blocked clients subsystem.");
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    重点看下线程工作函数workerThreadMain,主要做了两件事:初始化客户端连接、开启事件循环

    void *workerThreadMain(void *parg) {
      int iel = (int)((int64_t)parg);
      serverLog(LL_NOTICE, "Thread %d alive.", iel);
      // 设置线程参数
      serverTL = g_pserver->rgthreadvar + iel; // set the TLS threadsafe global
      tlsInitThread();
    
      if (iel != IDX_EVENT_LOOP_MAIN) {
        // 初始化工作线程网络连接
        aeThreadOnline();
        aeAcquireLock();
        initNetworkingThread(iel, cserver.cthreads > 1);
        aeReleaseLock();
        aeThreadOffline();
      }
    
      moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't
                              // be called on the first run
      aeThreadOnline();
      // 开始事件循环
      aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
      try {
        aeMain(el);
      } catch (ShutdownException) {
      }
      aeThreadOffline();
      moduleReleaseGIL(true);
      serverAssert(!GlobalLocksAcquired());
      aeDeleteEventLoop(el);
    
      tlsCleanupThread();
      return NULL;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    事件循环和Redis中差不多,在初始化线程结构体时,已经设置好了事件函数。

    客户端与worker线程绑定

    初始化worker线程中,会对客户端的连接做accept处理,然后创建Connection用来处理客户端后续请求:

    static void initNetworkingThread(int iel, int fReusePort) {
      /* Open the TCP listening socket for the user commands. */
      // listen
      if (fReusePort || (iel == IDX_EVENT_LOOP_MAIN)) {
        if (g_pserver->port != 0 &&
            listenToPort(g_pserver->port, &g_pserver->rgthreadvar[iel].ipfd,
                         fReusePort, (iel == IDX_EVENT_LOOP_MAIN)) == C_ERR) {
          serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.",
                    g_pserver->port);
          exit(1);
        }
        if (g_pserver->tls_port != 0 &&
            listenToPort(g_pserver->tls_port, &g_pserver->rgthreadvar[iel].tlsfd,
                         fReusePort, (iel == IDX_EVENT_LOOP_MAIN)) == C_ERR) {
          serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.",
                    g_pserver->port);
          exit(1);
        }
      } else {
        // We use the main threads file descriptors
        memcpy(&g_pserver->rgthreadvar[iel].ipfd,
               &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd,
               sizeof(socketFds));
        g_pserver->rgthreadvar[iel].ipfd.count =
            g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd.count;
      }
    
      /* Create an event handler for accepting new connections in TCP */
      // 每个线程创建连接处理函数
      for (int j = 0; j < g_pserver->rgthreadvar[iel].ipfd.count; j++) {
        if (aeCreateFileEvent(g_pserver->rgthreadvar[iel].el,
                              g_pserver->rgthreadvar[iel].ipfd.fd[j],
                              AE_READABLE | AE_READ_THREADSAFE, acceptTcpHandler,
                              NULL) == AE_ERR) {
          serverPanic("Unrecoverable error creating g_pserver->ipfd file event.");
        }
      }
      ...
    }
    
    // 用户连接处理函数
    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
        char cip[NET_IP_STR_LEN];
        UNUSED(mask);
        UNUSED(privdata);
        UNUSED(el);
    
        while(max--) {
            cfd = anetTcpAccept(serverTL->neterr, fd, cip, sizeof(cip), &cport);
            if (cfd == ANET_ERR) {
                if (errno != EWOULDBLOCK)
                    serverLog(LL_WARNING,
                        "Accepting client connection: %s", serverTL->neterr);
                return;
            }
            anetCloexec(cfd);
            serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
    
            acceptOnThread(connCreateAcceptedSocket(cfd), 0, cip);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    最后通过acceptOnThread将accept到的用户连接指定给某个worker线程,这里的考虑主要是尽可能的均匀给每个worker线程分配用户连接,同时考虑下特殊的用户连接,比如:主客户端。

    int chooseBestThreadForAccept()
    {
        int ielMinLoad = 0;
        int cclientsMin = INT_MAX;
        for (int iel = 0; iel < cserver.cthreads; ++iel)
        {
            // 选择accept个数最小的连接
            int cclientsThread;
            atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread);
            cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed);
            // Note: Its repl factor less one because cclients also includes replicas, so we don't want to double count
            // 考虑主从复制的扇出
            cclientsThread += (g_pserver->rgthreadvar[iel].cclientsReplica) * (g_pserver->replicaIsolationFactor-1);
            if (cclientsThread < cserver.thread_min_client_threshold)
                return iel;
            if (cclientsThread < cclientsMin)
            {
                cclientsMin = cclientsThread;
                ielMinLoad = iel;
            }
        }
        return ielMinLoad;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    最后与Redis类似,调用acceptCommonHandler创建Client,并在在client中设置读取用户请求读取函数:

    static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) {
        client *c;
        char conninfo[100];
        UNUSED(ip);
        AeLocker locker;
        locker.arm(nullptr);
        ...
        /* Limit the number of connections we take at the same time.
         *
         * Admission control will happen before a client is created and connAccept()
         * called, because we don't want to even start transport-level negotiation
         * if rejected. */
        if (listLength(g_pserver->clients) + getClusterConnectionsCount()
            >= g_pserver->maxclients)
        {
            const char *err;
            if (g_pserver->cluster_enabled)
                err = "-ERR max number of clients + cluster "
                      "connections reached\r\n";
            else
                err = "-ERR max number of clients reached\r\n";
    
            /* That's a best effort error message, don't check write errors.
             * Note that for TLS connections, no handshake was done yet so nothing
             * is written and the connection will just drop. */
            if (connWrite(conn,err,strlen(err)) == -1) {
                /* Nothing to do, Just to avoid the warning... */
            }
            g_pserver->stat_rejected_conn++;
            connClose(conn);
            return;
        }
    
        /* Create connection and client */
        // 创建client
        if ((c = createClient(conn, iel)) == NULL) {
            serverLog(LL_WARNING,
                "Error registering fd event for the new client: %s (conn: %s)",
                connGetLastError(conn),
                connGetInfo(conn, conninfo, sizeof(conninfo)));
            connClose(conn); /* May be already closed, just ignore errors */
            return;
        }
    
        /* Last chance to keep flags */
        c->flags |= flags;
    	...
    }
    
    client *createClient(connection *conn, int iel) {
        client *c = new client;
        serverAssert(conn == nullptr || (iel == (serverTL - g_pserver->rgthreadvar)));
    
        c->iel = iel;
        /* passing NULL as conn it is possible to create a non connected client.
         * This is useful since all the commands needs to be executed
         * in the context of a client. When commands are executed in other
         * contexts (for instance a Lua script) we need a non connected client. */
        // 
        if (conn) {
            serverAssert(iel == (serverTL - g_pserver->rgthreadvar));
            connNonBlock(conn);
            connEnableTcpNoDelay(conn);
            if (cserver.tcpkeepalive)
                connKeepAlive(conn,cserver.tcpkeepalive);
            // 设置读取函数
            connSetReadHandler(conn, readQueryFromClient, true);
            connSetPrivateData(conn, c);
        }
    
        selectDb(c,0);
        uint64_t client_id;
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    可以看出,这个处理函数readQueryFromClient与Redis中一样,剩下这个客户端所有的请求都由这个worker线程去处理,接下来就是读取用户网络数据,解析请求,处理请求,最后给客户端发回响应,最后看看发送响应给客户端的过程,通过addReply函数给用户发送响应,此时,可能是client绑定的worker线程调用,也有可能是非client绑定的线程调用,这样发送响应时分成两种:

    void addReply(client *c, robj_roptr obj) {
        if (prepareClientToWrite(c) != C_OK) return;
        ...
    }
    
    int prepareClientToWrite(client *c) {
    	bool fAsync = !FCorrectThread(c);
    	...
    	if (!fAsync && (c->flags & CLIENT_SLAVE || !clientHasPendingReplies(c))) clientInstallWriteHandler(c);
        if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c);
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    clientHasPendingReplies的响应在每个worker线程的beforeSleep中通过handleClientsWithPendingWrites处理,clientInstallAsyncWriteHandler会在全局的异步发送列追加该client,然后由对应的worker线程再把要发送的client转发给与之绑定的worker线程。
    至此,KeyDB的多线程网络模型,已经对应的用户请求处理流程分析完毕。

  • 相关阅读:
    Mybatis-Plus中QueryWrapper的使用
    【FPGA教程案例40】通信案例10——基于FPGA的简易OFDM系统verilog实现
    [论文笔记]A COMPARE-AGGREGATE MODEL FOR MATCHING TEXT SEQUENCES
    解决Elasticsearch集群 master_not_discovered_exception 异常
    开源在线表单工具 HeyForm 使用教程
    如何查看BWP相关log
    std::atomic<>
    [NOI2011] 阿狸的打字机
    小程序开发直传腾讯云操作步骤
    PHP8中final关键字的应用-PHP8知识详解
  • 原文地址:https://blog.csdn.net/yh88623131/article/details/127856608