Redis源码阅读 - Redis事件驱动模型(一)

Redis事件驱动模型(一)

Posted by 王富杰 on Friday, May 1, 2026

一、引言

Redis是一个C/S架构的数据库,通常系统实现网络通信的基本方法是使用 Socket 编程模型。但是,由于基本的 Socket 编程模型一次只能处理一个客户端连接上的请求,所以当要处理高并发请求时,一种方案就是使用多线程,让每个线程负责处理一个客户端的请求。

处理并发请求的另一种方案就是IO多路复用,Linux 操作系统,就提供了 select、poll 和 epoll 三种编程模型。运行在linux上的Redis通常就使用 epoll 模型。

二、为什么不直接使用socket模型

使用用 Socket 模型实现网络通信时,需要经过创建 Socket、监听端口、处理连接和读写请求等多个步骤。首先,当我们需要让服务器端和客户端进行通信时,可以在服务器端通过以下三步,来创建监听客户端连接的监听套接字(Listening Socket):

  • 调用 socket 函数,创建一个套接字。我们通常把这个套接字称为主动套接字(Active Socket);
  • 调用 bind 函数,将主动套接字和当前服务器的 IP 和监听端口进行绑定;
  • 调用 listen 函数,将主动套接字转换为监听套接字,开始监听客户端的连接。

在完成上述三步之后,服务器端就可以接收客户端的连接请求了。为了能及时地收到客户端的连接请求,我们可以运行一个循环流程,在该流程中调用 accept 函数,用于接收客户端连接请求。

accept 也是阻塞函数,如果此时一直没有客户端连接请求,服务器端的执行流程会一直阻塞在 accept 函数。一旦有客户端连接请求到达,accept 将不再阻塞,而是处理连接请求,和客户端建立连接,并返回已连接套接字(Connected Socket)。最后,服务器端可以通过调用 recv 或 send 函数,在刚才返回的已连接套接字上,接收并处理读写请求,或是将数据发送给客户端。

listenSocket = socket(); //调用socket系统调用创建一个主动套接字
bind(listenSocket);  //绑定地址和端口
listen(listenSocket); //将默认的主动套接字转换为服务器使用的被动套接字,也就是监听套接字
while (1) { //循环监听是否有客户端连接请求到来
   connSocket = accept(listenSocket); //接受客户端连接
   recv(connsocket); //从客户端读取数据,只能同时处理一个客户端
   send(connsocket); //给客户端返回数据,只能同时处理一个客户端
}

上述代码中,虽然它能够实现服务器端和客户端之间的通信,但是程序每调用一次 accept 函数,只能处理一个客户端连接。因此,如果想要处理多个并发客户端的请求,就需要使用多线程的方法,来处理通过 accept 函数建立的多个客户端连接上的请求。

但是对于Redis,主执行流程是由一个线程在执行,无法使用多线程的方式来提升并发处理能力。 因此Redis并没有直接使用socket模型。

三、Redis对socket模型的封装

Socket 模型提供了最基本的网络通信机制,因此尽管Redis使用了 epoll 模型,本质还是需要通过 socket 进行网络通信。因此Redis在创建服务器时封装了socket,后边我们结合代码实际体会下。

3.1、Redis服务端整体架构

Redis 服务端网络层的实现架构分为了四层,如下:

┌─────────────────────────────────────────────────────┐
│                    networking.c                     │
│   协议处理 + 客户端管理 + 读写逻辑                      │
│   readQueryFromClient()  /  writeToClient()         │
├─────────────────────────────────────────────────────┤
│                    connection.c                     │
│   连接抽象层 (Redis 6.0+)                             │
│   connRead() / connWrite() / connAccept()           │
├─────────────────────────────────────────────────────┤
│                      anet.c                         │
│   网络基础设施:socket 创建、bind、listen、选项设置      │
│   anetTcpServer() / anetTcpAccept() / anetNonBlock()│
├─────────────────────────────────────────────────────┤
│                      ae.c                           │
│   事件循环:epoll/kqueue/select 封装                   │
│   aeCreateFileEvent() / aeMain()                    x│
└─────────────────────────────────────────────────────┘
  • ae.c — 事件循环,基于 Reactor 模式使用IO多路复用创建的事件循环
  • anet.c — 网络基础设施,负责创建和配置 socket,不参与数据读写。
  • connection.c — 连接抽象层
  • networking.c — 业务逻辑

以上我们对这四层先做一个了解,后续章节再逐层次进行介绍。

3.2、网络基础设施层

我们知道socket模型开发服务端需要多个步骤,redis封装了整个流程,提供一个对外的函数。在anet.c、anet.h 源文件中实现, 这两个文件中除了实现了基于socket的通信,还实现了基于unixsock的通信模型,因为我们大多都是使用socket,这里不对unix套接字做过多介绍。

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)   // 封装一个服务端,设置地址和绑定端口
{
    int s = -1, rv;
    char _port[6];  /* strlen("65535") */  // 存储端口号的字符串
    struct addrinfo hints, *servinfo, *p;

    memset(&hints,0,sizeof(hints));
    hints.ai_family = af;
    hints.ai_socktype = SOCK_STREAM;       // 设置协议为TCP
    hints.ai_flags = AI_PASSIVE;

    if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {  // 调用 getaddrinfo 获取可用于 bind 的地址列
        anetSetError(err, "%s", gai_strerror(rv));
        return ANET_ERR;
    }
    for (p = servinfo; p != NULL; p = p->ai_next) {       // 遍历地址列表尝试绑定
        if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)     // 创建一个主动套接字
            continue;

        if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
        if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;     // 设置端口重用
        if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog,0) == ANET_ERR) s = ANET_ERR;     // 绑定ip并 监听端口
        goto end;
    }

error:
    if (s != -1) close(s);
    s = ANET_ERR;
end:
    freeaddrinfo(servinfo);
    return s;
}

如上所示,为redis创建一个服务端的通用实现函数。这是粘贴的代码中笔者删掉了部分错误处理代码以节省篇幅。在Redis对套接字封装的实现中,还实现了 accept、connect等函数,包括阻塞模式和非阻塞模式。这里我们不做过多展开。

四、IO多路复用

4.1、select模型

Linux 针对每一个套接字都会有一个文件描述符,用来唯一标识该套接字。在多路复用机制的函数中,Linux 通常会用文件描述符作为参数。有了文件描述符,函数也就能找到对应的套接字,进而进行监听、读写等操作。

第一种实现IO多路复用的是select模型,它提供一个select函数,原型如下:

int select (int __nfds, fd_set *__readfds, fd_set *__writefds, fd_set *__exceptfds, struct timeval *__timeout)

select 函数使用三个集合,表示监听的三类事件,分别是读数据事件(对应readfds集合)、写数据事件(对应writefds集合)和异常事件(对应__exceptfds集合)。fd_set 结构体的定义,其实就是一个 long int 类型的数组,该数组中一共有 32 个元素(1024/32=32),每个元素是 32 位(long int 类型的大小),而每一位可以用来表示一个文件描述符的状态。因此,select 函数对每一个描述符集合,都可以监听 1024 个描述符。

select函数一次监听1024个描述符,每次调用会返回已经就绪的描述符数量。因此我们需要对这些描述符进行循环,判断是哪个已经就绪并进行处理。因此可以看到select模型的两个显著缺陷:

  • select 函数对单个进程能监听的文件描述符数量是有限制的,它能监听的文件描述符个数由 __FD_SETSIZE 决定,默认值是 1024。
  • 当 select 函数返回后,我们需要遍历描述符集合,才能找到具体是哪些描述符就绪了。这个遍历过程会产生一定开销,从而降低程序的性能

4.2、pool模型

为了解决select模型的缺陷,linux又提出了pool模型。主要函数是 poll 函数,我们先来看下它的原型定义,如下所示:

int poll (struct pollfd *__fds, nfds_t __nfds, int __timeout);

,参数 *__fds 是 pollfd 结构体数组,参数 nfds 表示的是 *fds 数组的元素个数,而 __timeout 表示 poll 函数阻塞的超时时间。pollfd 结构体里包含了要监听的描述符,以及该描述符上要监听的事件类型。

在使用pool机制时,一样需要循环调用pool函数检查文件描述符是否有就绪的,返回就绪的文件描述符个数。不同的是,它允许一次监听超过 1024 个文件描述符。是当调用了 poll 函数后,我们仍然需要遍历每个文件描述符,检测该描述符是否就绪。

4.3、epool模型

为了解决遍历问题,linux再次提出了epool模型。epoll 机制是使用 epoll_event 结构体,来记录待监听的文件描述符及其监听的事件类型的,这和 poll 机制中使用 pollfd 结构体比较类似。

对于 epoll_event 结构体来说,其中包含了 epoll_data_t 联合体变量,以及整数类型的 events 变量。epoll_data_t 联合体中有记录文件描述符的成员变量 fd,而 events 变量会使用不同的宏定义值,来表示 epoll_data_t 变量中的文件描述符所关注的事件类型,比如一些常见的事件类型包括以下这几种:

  • EPOLLIN:读事件,表示文件描述符对应套接字有数据可读。
  • EPOLLOUT:写事件,表示文件描述符对应套接字有数据要写。
  • EPOLLERR:错误事件,表示文件描述符对于套接字出错。

下面的代码展示了 epoll_event 结构体以及 epoll_data 联合体的定义:

typedef union epoll_data
{
  ...
  int fd;  //记录文件描述符
  ...
} epoll_data_t;

struct epoll_event
{
  uint32_t events;  //epoll监听的事件类型
  epoll_data_t data; //应用程序数据
};

我们知道,在使用 select 或 poll 函数的时候,创建好文件描述符集合或 pollfd 数组后,就可以往数组中添加我们需要监听的文件描述符。 但是对于 epoll 机制来说,我们则需要先调用 epoll_create 函数,创建一个 epoll 实例。这个 epoll 实例内部维护了两个结构,分别是记录要监听的文件描述符和已经就绪的文件描述符,而对于已经就绪的文件描述符来说,它们会被返回给用户程序进行处理。所以,我们在使用 epoll 机制时,就不用像使用 select 和 poll 一样,遍历查询哪些文件描述符已经就绪了。

在创建了 epoll 实例后,我们需要再使用 epoll_ctl 函数,给被监听的文件描述符添加监听事件类型,以及使用 epoll_wait 函数获取就绪的文件描述符。如下为 epool机制的使用流程示例:

int sock_fd,conn_fd; //监听套接字和已连接套接字的变量
sock_fd = socket() //创建套接字
bind(sock_fd)   //绑定套接字
listen(sock_fd) //在套接字上进行监听,将套接字转为监听套接字
  
epfd = epoll_create(EPOLL_SIZE); //创建epoll实例,
//创建epoll_event结构体数组,保存套接字对应文件描述符和监听事件类型  
ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE);

//创建epoll_event变量
struct epoll_event ee
//监听读事件
ee.events = EPOLLIN;
//监听的文件描述符是刚创建的监听套接字
ee.data.fd = sock_fd;

//将监听套接字加入到监听列表中  
epoll_ctl(epfd, EPOLL_CTL_ADD, sock_fd, &ee); 
  
while (1) {
   //等待返回已经就绪的描述符 
   n = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); 
   //遍历所有就绪的描述符   
   for (int i = 0; i < n; i++) {
       //如果是监听套接字描述符就绪,表明有一个新客户端连接到来 
       if (ep_events[i].data.fd == sock_fd) { 
          conn_fd = accept(sock_fd); //调用accept()建立连接
          ee.events = EPOLLIN;  
          ee.data.fd = conn_fd;
          //添加对新创建的已连接套接字描述符的监听,监听后续在已连接套接字上的读事件    
          epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ee); 
            
       } else { //如果是已连接套接字描述符就绪,则可以读数据
           ...//读取数据并处理
       }
   }
}

2.2、Redis的IO多路复用实现

Redis 在设计和实现网络通信框架时,就基于 epoll 机制中的 epoll_create、epoll_ctl 和 epoll_wait 等函数和读写事件,进行了封装开发,实现了用于网络通信的事件驱动框架,从而使得 Redis 虽然是单线程运行。Redis的实现主要位于 ae_epool.c 中,如下:

typedef struct aeApiState {
    int epfd;                    // 保存epoll实例的文件描述符
    struct epoll_event *events;  // epoll_event数组,内核把就绪事件写到这里。
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);  // 根据事件循环的大小申请内存
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 是一个提示值, */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    anetCloexec(state->epfd);   // 设置 CLOEXEC 如果 Redis 调用:fork或者 exec 执行新程序时,自动关闭 epfd。否则子进程继承了epfd会导致fd泄露
    eventLoop->apidata = state;
    return 0;
}

如上展示了创建一个 epool 实例的实现。核心调用了 epoll_create 创建实例,并把实例描述符和事件数组封装到 aeApiState 结构中。接下来我们看下是如何添加一个事件的:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0};  // 避免未初始化数据 
    int op = eventLoop->events[fd].mask == AE_NONE ?    // 判断 ADD 还是 MOD
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* 合并旧事件 */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;  // 调用 epoll_ctl 实现监听
    return 0;
}

这个函数实现了把一个 fd 注册到 epoll,或者修改这个 fd 已经注册的监听事件。在Redis中自定义了事件类型 AE_READABLE、AE_WRITABLE,因此在实际监听时需要转换为linux定义的EPOLLIN 和 EPOLLOUT。同样的Redis封装了删除时间监听以及释放epool实例的函数,请自行阅读代码。

这里你可能会好奇aeEventLoop是什么样的结构,因为我们这里并未做介绍,它定义在ae.c文件中。 aeEventLoop 是 Redis 的“事件循环对象”,保存了整个 Redis 运行时所有网络事件、时间事件以及底层 IO 多路复用状态,redis利用它实现的时间循环。 在这一节我们主要介绍redis多epool的封装,下一篇文件我们重点来聊聊aeEventLoop这个结构。

五、总结

Redis在进行网络通信的实现时分为了四层,其中网路基础设施封装了socket模型,也封装了sock套接字。日常我们大多使用TCP协议,因此本文只重点介绍了对Socket的封装。 Redis使用了事件循环机制,底层利用了IO多路复用。考虑到对不同平台的兼容,提供了统一的API接口,如aeApiCreate。

「真诚赞赏,手留余香」

WangFuJie Blog

真诚赞赏,手留余香

使用微信扫描二维码完成支付