  1. 此次案例采用的redis是cluster模式。
  2. 网络模型采用 epoll 模式

本篇文章主要讲解 ,从redis原理的角度了解一个 set 命令从redis client发出到 redis server端接收到客户端请求的时候,到底经历了哪些过程?


  1. redis的执行原理
  2. Redis cluster集群模式的运行原理
  3. 同样解释了为什么redis的速度
  4. epoll网络模型

为了了解redis请求流程,首先先了解下redis的网络模型。redis 支持 4中网络模式, select、poll、epoll、kqueue ,其中epoll 模型我个人认为是应用最广泛的模型,所以本篇文章以epoll 模型为 demo 进行讲解。


Select 和 poll 模型的缺点:

  1. 每次调用 Select 都需要将进程加入到所有监视 Socket 的等待队列,每次唤醒都需要从每个队列中移除,这里涉及了两次遍历,而且每次都要将整个 FDS 列表传递给内核,牵涉到用户态到内核态的转移,有一定的开销。
  2. select /poll 返回的值是 int 类型,使得我们不知道是那个 socket 准备就绪了,我们还需要新一轮的系统调用去检查哪一个准备就绪了。

Epoll 模型为了解决 Select ,Poll的两次轮训和每次都需要传入文件描述符的问题,对整体的结构做了一个新的优化,具体架构如下:


Epoll 启动具体流程如下:

  1. 在内核中开辟一个新的存储空间,存储文件描述符(红黑树结构),构建方法是 epoll_create()
  2. 使用 epoll_ctl 函数,对文件描述符进行CRUD的管理
  3. 使用 epoll_wait 函数阻塞线程调用,同样把调用线程放到等待队列中

Epoll 收到消息后处理流程:

不同于 select/poll 的中断和异常处理,Epoll 采用的是内核通过调度机制,将等待事件的线程从挂起状态移动到可运行状态。

在 epoll 的等待过程中,内核会监视所有被注册的文件描述符,一旦有文件描述符上发生了注册的事件,内核会将这个事件通知到 epoll 实例。具体流程如下:

  1. 调用 epoll_wait 的线程在 epoll 实例上等待事件的发生。这时线程被挂起,进入休眠状态。
  2. 当有文件描述符上发生了注册的事件,内核会将这个事件信息标记到 epoll 实例中。
  3. 一旦事件发生,内核会唤醒等待的线程。这是通过调度机制完成的,内核会将等待的线程移动到可运行状态。
  4. 等待的线程被唤醒后,epoll_wait 返回,并将事件的信息填充到用户提供的数组中,使用户程序得以处理发生的事件。


// 用户空间代码
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) {
    // 在内核中等待事件发生
    wait_for_events(epfd, events, maxevents, timeout);
    // 返回事件信息
    return num_events;

// 内核空间代码
void wait_for_events(int epfd, struct epoll_event *events, int maxevents, int timeout) {
    // 如果没有事件发生,将当前线程挂起
    add_thread_to_wait_queue(current_thread, epfd->wait_queue);
    // 进入调度器,切换到其他线程执行
    // 返回时,说明事件发生,处理事件
    process_events(epfd, events, maxevents);

// 文件描述符事件发生时的处理
void handle_events(struct epoll_event *events, int num_events) {
    // 遍历等待队列,唤醒等待的线程


Redis server端启动

在了解完 epoll 模型的时候,那我们需要思考,在redis中是如何利用Epoll模型通信的。我们看下redis 核心启动的源码:

int main(int argc, char **argv) {

redis在启动时,有两个主要的方法,initServer 和 aeMain,其中 initServer 会有以下和epoll相关的核心流程:

  1. aeCreateEventLoop 创建 epoll的文件监控文件描述符列表
  2. listenToPort 监听指定端口
  3. createSocketAcceptHandler 注册对应接收事件的handler
  4. aeSetBeaforeSleepProc 前置处理器

aeMain 函数循环调用 aeApiPoll (相当于 epoll_wait)等待 FD 就绪。总体流程如下:



Redis Cluser 集群模式

Redis 集群模式是常用的架构模式,其结构图如下:


在集群中 master 节点同步采用的 Gossip协议进行通信,保证集群内消息通信。

在 master 和 slave 同步采用定时发送数据完成。

经过上面的讨论,把Redis 相关的背景知识进行了梳理,下面开始看命令的流转。


当redis启动时候,Redis 已经注册了链接应答管理器(tcpAccepthandler),这个作用主要是把就绪的 fd 绑定到对应的处理器上面(readQueryFromClient),这样当FD有数据就是的时候,可以调用对应的处理器方法。

void initServer(void) {
    createSocketAcceptHandler(&server.ipfd, acceptTcpHandler);

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                          "Accepting client connection: %s", server.neterr);
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);


static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    /* Create connection and client */
    if ((c = createClient(conn)) == NULL) {
                  "Error registering fd event for the new client: %s (conn: %s)",
                  connGetInfo(conn, conninfo, sizeof(conninfo)));
        connClose(conn); /* May be already closed, just ignore errors */

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));

    /* passing NULL as conn it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if (conn) {
        //调用 readQueryFromClient
        connSetReadHandler(conn, readQueryFromClient);
        connSetPrivateData(conn, c);


当注册完成后,在aeMain方法中会调用 epoll_wait() 方法,具体代码流程如下:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|

int aeProcessEvents(aeEventLoop *eventLoop, int flags){

    if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)

    /* Call the multiplexing API, will return only on timeout or when
     * some event fires. */
    numevents = aeApiPoll(eventLoop, tvp);

    /* After sleep callback. */
    if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)

// ae_epoll.c
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
                        tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
    return numevents;

当在redis 客户端输入 set xxx aaa 这个命令后,会经历下面几个过程:

  1. 当 set 命令从客户端发出的时候,通过提前建立好的TCP链接,把数据发送到某一台服务器上
  2. 当前redis节点检测当前的这个key是否在自己服务的Hash槽中,如果不在则直接返回一个moved命令,客户端接收到moved命令,转移到指定正确的服务器中。
  3. 客户端把输入的命令解析和转化成 RESP协议 +SET xxx aaa\r\n
  4. 客户端把报文发送到 Redis 服务端,当 socket 变成可读的时候,epoll_wait 返回了就绪的fd个数
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,                    tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
  1. 循环遍历 fd 的个数,判断类型。此处这里是 EPOLLIN 事件,代表缓冲区已经可读,调用对应的函数(readQueryFromClient),具体代码如下:
void readQueryFromClient(connection *conn) {
    /* There is more data in the client input buffer, continue parsing it
     * in case to check if there is a full command to execute. */

void processInputBuffer(client *c) {
    /* Keep processing while there is something in the input buffer */
    while(c->qb_pos < sdslen(c->querybuf)) {
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
        } else {
            /* If we are in the context of an I/O thread, we can't really
             * execute the command here. All we can do is to flag the client
             * as one that needs to process the command. */
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;

            /* We are finally ready to execute the command. */
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid exiting this
                 * loop and trimming the client buffer later. So we return
                 * ASAP in that case. */


int processCommandAndResetClient(client *c) {
    int deadclient = 0;
    client *old_client = server.current_client;
    server.current_client = c;
    if (processCommand(c) == C_OK) {

int processCommand(client *c) {

    * lookupCommand 查询对应的命令
    /* Now lookup the command and check ASAP about trivial error conditions
     * such as wrong arity, bad command name and so forth. */
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);

    /* Exec the command */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&
        c->cmd->proc != resetCommand)


struct redisCommand *lookupCommand(sds name) {
    return dictFetchValue(server.commands, name);
  1. 读取fd内容,并解析对应的命令 set ,查询对应的命令实现:
void *dictFetchValue(dict *d, const void *key) {
    dictEntry *he;

    he = dictFind(d,key);
    return he ? dictGetVal(he) : NULL;

dictEntry *dictFind(dict *d, const void *key)
    dictEntry *he;
    uint64_t h, idx, table;

    if (dictSize(d) == 0) return NULL; /* dict is empty */
    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key))
                return he;
            he = he->next;
        if (!dictIsRehashing(d)) return NULL;
    return NULL;

void populateCommandTable(void) {
    int j;
    int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);

    for (j = 0; j < numcommands; j++) {
        struct redisCommand *c = redisCommandTable+j;
        int retval1, retval2;

        /* Translate the command string flags description into an actual
         * set of flags. */
        if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)
            serverPanic("Unsupported command flag");

        c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */
        retval1 = dictAdd(server.commands, sdsnew(c->name), c);
        /* Populate an additional dictionary that will be unaffected
         * by rename-command statements in redis.conf. */
        retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
        serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);

struct redisCommand redisCommandTable[] = {
        /* Note that we can't flag set as fast, since it may perform an
     * implicit DEL of a large key. */
     "write use-memory @string",

     "write use-memory fast @string",

     "write use-memory @string",

  1. 选择对应的 set命令类,执行set命令
void setCommand(client *c) {
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = OBJ_NO_FLAGS;

    if (parseExtendedStringArgumentsOrReply(c,&flags,&unit,&expire,COMMAND_SET) != C_OK) {

    c->argv[2] = tryObjectEncoding(c->argv[2]);

执行完命令后,实现函数会生成一个响应对象,并将其添加到客户端的输出缓冲区中。这个过程通常由 addReply 系列函数完成。 对于 SET 命令,实现函数可能会生成一个 “OK” 响应并添加到输出缓冲区中。

void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
    } else {
        serverPanic("Wrong obj->encoding in addReply()");

当事件循环检测到输出缓冲区中有数据可以发送时,它会调用 writeToClient 函数将响应发送给客户端。

通过以上步骤,Redis 能够根据客户端发送的命令找到相应的实现函数并执行它,然后将结果发送回客户端。这个过程涉及到多个源码文件和函数,但主要逻辑在 commands.c 文件中完成。

void beforeSleep(struct aeEventLoop *eventLoop) {

    /* Handle writes with pending output buffers. */


int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but the boring synchronous code. */
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();

    /* Start threads if needed. */
    if (!server.io_threads_active) startThreadedIO();

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;

        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);

        int target_id = item_id % server.io_threads_num;

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);

    /* Also use the main thread to process a slice of clients. */
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;

    /* Run the list of clients again to install the write handler where
     * needed. */
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        /* Install the write handler if there are pending writes in some
         * of the clients. */
        if (clientHasPendingReplies(c) &&
            connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)

    /* Update processed count on server */
    server.stat_io_writes_processed += processed;

    return processed;

int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);

    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;

        /* If a client is protected, don't do anything,
         * that may trigger write error or recreate handler. */
        if (c->flags & CLIENT_PROTECTED) continue;

        /* Don't write to clients that are going to be closed anyway. */
        if (c->flags & CLIENT_CLOSE_ASAP) continue;

        /* Try to write buffers to the client socket. */
        if (writeToClient(c,0) == C_ERR) continue;

        /* If after the synchronous writes above we still have data to
         * output to the client, we need to install the writable handler. */
        if (clientHasPendingReplies(c)) {
            int ae_barrier = 0;
            /* For the fsync=always policy, we want that a given FD is never
             * served for reading and writing in the same event loop iteration,
             * so that in the middle of receiving the query, and serving it
             * to the client, we'll call beforeSleep() that will do the
             * actual fsync of AOF to disk. the write barrier ensures that. */
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
                ae_barrier = 1;
            if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
    return processed;
