Redis 内部有个小型的事件驱动,它主要处理两项任务:

  1. 文件事件:使用I/O多路复用技术处理多个客户端请求,并返回执行结果。
  2. 时间事件:维护服务器的资源管理,状态检查。

主要的数据结构包括文件事件结构体,时间事件结构体,触发事件结构体,事件循环结构体

redis源码解析之事件驱动-LMLPHP

/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent; /* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent; /* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent; /* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;

首先通过aeCreateEventLoop()函数创建时间循环结构体

aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i; if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = ;
eventLoop->stop = ;
eventLoop->maxfd = -;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = ; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop; err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}

其中aeApiCreate()是核心处理函数,redis 根据不同系统构建了不同的多路复用实现:例如linux的epoll,OS X的kqueue,windows的select。

initServer() 为监听套接字注册了读事件acceptTcpHandler()或者 acceptUnixHandler()。起作用是当有客户端连接进来的时候调用它,并注册读时间,回调函数为从客户端读取命令的函数readQueryFromClient()。

创建了aeEventLoop后就进入时间循环aeProcessEvents()中,调用aeApiPoll()来监听事件发生。

获取阻塞时间后,就开始文件事件的触发获取。得到所有触发事件,然后遍历文件事件触发数组(eventLoop->fired),得到fd,然后获取对应的文件事件,这里的fired已经看出,它只是个索引。调用对应的回调函数,结束文件事件的处理。

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = , numevents; /* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return ; /* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != - ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms; /* Calculate the time missing for the nearest
* timer to fire. */
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+) - now_ms)*;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*;
}
if (tvp->tv_sec < ) tvp->tv_sec = ;
if (tvp->tv_usec < ) tvp->tv_usec = ;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = ;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
} numevents = aeApiPoll(eventLoop, tvp);
for (j = ; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = ; /* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = ;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */
}

当用户在客户端输入命令后,触发读时间,服务器调用readQueryFromClient()来读取命令,每条命令处理完成之后都会调用addReply(),其中之一的作用是注册写事件,回调函数sendReplyToClient(),目的是将处理结果写回客户端。

处理完文件事件后再执行时间事件(serverCron)

TimeEvent被组织为一个单向链表,表头指针timeEventHead保存在核心数据结构aeEventLoop中。aeMain函数在每一轮循环中都会遍历该链表,针对每个TimeEvent,先调用gettimeofday获取系统当前时间,如果它比TimeEvent中的时间要小,则说明TimeEvent还没触发,应继续前进,否则说明TimeEvent已经触发了,立即调用超时处理函数,接下来根据处理函数的返回值分两种情况讨论:

1)若处理函数返回-1,那么把这个TimeEvent删掉。

2)否则,根据返回值修改当前的TimeEvent。比如返回5000,这个TimeEvent就会在5秒后再次被触发。

由于情况1)我们不能由当前结点到达下一结点,于是就又从表头开始遍历。

在目前的版本中,正常模式下的Redis 只带有serverCron 一个时间事件,而在benchmark 模
式下,Redis 也只使用两个时间事件。
在这种情况下,程序几乎是将无序链表退化成一个指针来使用,所以使用无序链表来保存时间
事件,并不影响事件处理器的性能。

05-29 01:30