redis是单进程单线程事件多路循环处理所有的客户端连接,它的运行都是靠事件触发的。
redis 的事件处理支持select、kqueue、epoll机制。其核心的poll函数aeApiPoll其实是一个封装函数,最终是调用 ae_select.c、ae_epoll.c还是ae_kqueue.c中的aeApiPoll(分别实现select、kqueue、epoll机制),取决于如下的宏定义:
ae_select.c#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#ifdef _WIN32
#include "ae_wsiocp.c"
#else
#include "ae_select.c"
#endif
#endif
#endif、
ae_epoll.c、ae_kqueue.c分别对select、kqueue、epoll进制进行了封装,对select、kqueue、epoll的性能比较可在网上找到详细资料。
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd;
long long timeEventNextId;
aeFileEvent events[AE_SETSIZE]; /* Registered events */
aeFiredEvent fired[AE_SETSIZE]; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
其中maxfd是当前事件集合中最大的文件描述符id,timeEventNextId是下一个timer的id,events和fired分别保存了已注册的和已释放的文件event,timeEventHead指向一个timer event的链表,apidata保存了aeApiPoll的私有数据,其实也就是要监控的文件集合,具体实现要看采用哪种机制(select、kqueue、epoll三种机制)。stop用于停止事件循环,仅用于基准测试。beforesleep是在每次事件循环前都要被调用的函数,在main函数中被设置为beforeSleep函数。
对于文件event,其中mask为要检测的事件(读或者写),rfileProc、wfileProc分别为有读写事件时要调用的函数指针,clientData为函数要处理的数据;
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
系统中的timer事件使用一个链表,每个timer有一个唯一的id,该timer在when_sec、 when_ms后被调用,调用函数为timeProc,timeProc处理的主要参数为clientData。在删除该timer时,需要调用 finalizerProc对clientData进行处理。 /* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
我们分析下redis中事件的处理逻辑。
在函数initServer中调用aeCreateEventLoop完成初始化后,在main函数中调用ae_main,该函数是一个死循环:static void initServer() {
---
server.el = aeCreateEventLoop();
---
}
int main(int argc, char **argv) {
---
initServer();
---
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
---
}
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
尽管aeMain函数有退出条件,但除了基准测试中会调用aeStop修改该值,该条件不会被改变。
aeMain在处理event之前,先调用beforeSleep,该函数先处理已ready的client,然后刷新aof缓冲区(aof机制后续章节会详细分析):
void beforeSleep(struct aeEventLoop *eventLoop) {
listNode *ln;
redisClient *c;
REDIS_NOTUSED(eventLoop);
/* Try to process pending commands for clients that were just unblocked. */
while (listLength(server.unblocked_clients)) {
ln = listFirst(server.unblocked_clients);
redisAssert(ln != NULL);
c = ln->value;
listDelNode(server.unblocked_clients,ln);
c->flags &= ~REDIS_UNBLOCKED;
/* Process remaining data in the input buffer. */
if (c->querybuf && sdslen(c->querybuf) > 0)
processInputBuffer(c);
}
/* Write the AOF buffer on disk */
flushAppendOnlyFile(0);
}
aeMain调用aeProcessEvents处理文件事件和timer事件。aeProcessEvents 先获得最先超时的timer,并记下该timer距此时的时间段,将该时间段作为aeApiPoll的超时时间(以能尽快调用timer处理,因为是先处理file事件,后处理timer事件),aeApiPoll返回后将调用注册的read、write函数进行读写: int aeProcessEvents(aeEventLoop *eventLoop,
int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS))
return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */ if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */ aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
}
else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
}
else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to se the timeout
* to zero */ if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
else {
/* Otherwise we can block */ tvp = NULL;
/* wait forever */ }
}
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */ if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */ if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed;
/* return the number of processed file/time events */}
接着,aeProcessEvents调用 processTimeEvents处理timer事件(此时至少有一个超时),processTimeEvents循环处理已超时的timer。注意,processTimeEvent并不一定会删除超时的timer,代码如下: static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
#ifdef _WIN32
long long retval;
#else
int retval;
#endif
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
/* After an event is processed our time event list may
* no longer be the same, so we restart from head.
* Still we make sure to don't process events registered
* by event handlers itself in order to don't loop forever.
* To do so we saved the max ID we want to handle.
*
* FUTURE OPTIMIZATIONS:
* Note that this is NOT great algorithmically. Redis uses
* a single time event so it's not a problem but the right
* way to do this is to add the new elements on head, and
* to flag deleted elements in a special way for later
* deletion (putting references to the nodes to delete into
* another linked list). */
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
aeDeleteTimeEvent(eventLoop, id);
}
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}
当timer超时时,会调用timer创建时注册的timeProc,根据timerProc的返回值,是删除还是继续修改超时时间。注意,redis的主要循环处理函数serverCron就是靠这种定时机制得以反复运行的,该定时处理函数就一直返回100,这样就使得redis每隔100ms执行一次serverCron函数。
因此,redis的主要循环逻辑为一开始使用beforeSleep处理ready的client,然后处理相关的文件event,最后调用serverCron做一些工作。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j, loops = server.cronloops;
REDIS_NOTUSED(eventLoop);
REDIS_NOTUSED(id);
REDIS_NOTUSED(clientData);
/* We take a cached value of the unix time in the global state because
* with virtual memory and aging there is to store the current time
* in objects at every object access, and accuracy is not needed.
* To access a global var is faster than calling time(NULL) */
server.unixtime = time(NULL);
/* We have just 22 bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
* 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
*
* Note that even if this will wrap after 1.5 years it's not a problem,
* everything will still work but just some object will appear younger
* to Redis. But for this to happen a given object should never be touched
* for 1.5 years.
*
* Note that you can change the resolution altering the
* REDIS_LRU_CLOCK_RESOLUTION define.
*/
updateLRUClock();
/* Record the max memory used since the server was started. */
if (zmalloc_used_memory() > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used_memory();
/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
if (server.shutdown_asap) {
if (prepareForShutdown() == REDIS_OK) exit(0);
redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
}
/* Show some info about non-empty databases */
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
size = dictSlots(server.db[j].dict);
used = dictSize(server.db[j].dict);
vkeys = dictSize(server.db[j].expires);
if (!(loops % 50) && (used || vkeys)) {
redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
}
}
/* We don't want to resize the hash tables while a bacground saving
* is in progress: the saving child is created using fork() that is
* implemented with a copy-on-write semantic in most modern systems, so
* if we resize the HT while there is the saving child at work actually
* a lot of memory movements in the parent will cause a lot of pages
* copied. */
if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1) {
if (!(loops % 10)) tryResizeHashTables();
if (server.activerehashing) incrementallyRehash();
}
/* Show information about connected clients */
if (!(loops % 50)) {
#ifdef _WIN32
redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %llu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
(unsigned long long)zmalloc_used_memory());
#else
redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory());
#endif
}
/* Close connections of timedout clients */
if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
closeTimedoutClients();
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1 &&
server.aofrewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
/* Check if a background saving or AOF rewrite in progress terminated */
if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
int statloc;
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
if (pid == server.bgsavechildpid) {
backgroundSaveDoneHandler(statloc);
} else {
backgroundRewriteDoneHandler(statloc);
}
updateDictResizePolicy();
}
} else {
time_t now = time(NULL);
/* If there is not a background saving in progress check if
* we have to save now */
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
if (server.dirty >= sp->changes &&
now-server.lastsave > sp->seconds) {
redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving",
sp->changes, sp->seconds);
rdbSaveBackground(server.dbfilename);
#ifdef _WIN32
/* On windows this will save in foreground and block */
/* Here we are allready saved, and we should return */
return 100;
#else
break;
#endif
}
}
/* Trigger an AOF rewrite if needed */
if (server.bgsavechildpid == -1 &&
server.bgrewritechildpid == -1 &&
server.auto_aofrewrite_perc &&
server.appendonly_current_size > server.auto_aofrewrite_min_size)
{
long long base = server.auto_aofrewrite_base_size ?
server.auto_aofrewrite_base_size : 1;
long long growth = (server.appendonly_current_size*100/base) - 100;
if (growth >= server.auto_aofrewrite_perc) {
redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
/* If we postponed an AOF buffer flush, let's try to do it every time the
* cron function is called. */
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
/* Expire a few keys per cycle, only if this is a master.
* On slaves we wait for DEL operations synthesized by the master
* in order to guarantee a strict consistency. */
if (server.masterhost == NULL) activeExpireCycle();
/* Replication cron function -- used to reconnect to master and
* to detect transfer failures. */
if (!(loops % 10)) replicationCron();
server.cronloops++;
return 100;
} serverCron做的工作很多,后续的很多章节都与此有关。该函数较复杂,分段分析。
一开始将当前时间保存,方便后续vm等机制对当前时间的访问;接着,如果收到SIGTERM等信号,则会在信号处理函数中设置server.shutdown_asap为1,此时就会调用prepareForShutdown做结束运行前的结尾工作;接着,显示些db中的信息;当没有进行save或者rewrite aof的后台子进程运行时,会调用tryResizeHashTables、incrementallyRehash,以分别调整db的大小和重新rehash db。当有后台子进程运行时,进行rehash会使得系统使用较多的内存。 接着,显示些client的信息,并关闭timeout的连接; 如果有后台子进程进行save或者rewrite aof的工作,此时等待其退出,并调用backgroundSaveDoneHandler或者backgroundRewriteDoneHandler做些后续工作,否则检查是否需要save db;释放expire的key:释放时从expired链表中随机选择,如果循环中超时的key的数量超过了设置值REDIS_EXPIRELOOKUPS_PER_CRON 的25%,则继续释放;
如果内存超出阈值,则释放内存,前面的内存章节中已对此进行过分析.