快乐的天空

时间来得快,去得也快

 

redis event

redis是单进程单线程事件多路循环处理所有的客户端连接,它的运行都是靠事件触发的。

redis 的事件处理支持select、kqueue、epoll机制。其核心的poll函数aeApiPoll其实是一个封装函数,最终是调用 ae_select.c、ae_epoll.c还是ae_kqueue.c中的aeApiPoll(分别实现select、kqueue、epoll机制),取决于如下的宏定义:

ae_select.c#ifdef HAVE_EPOLL

#include "ae_epoll.c"
#else
    #ifdef HAVE_KQUEUE
    #include "ae_kqueue.c"
    #else
        #ifdef _WIN32
        #include "ae_wsiocp.c"
        #else
        #include "ae_select.c"
        #endif
    #endif

#endif
ae_epoll.c、ae_kqueue.c分别对select、kqueue、epoll进制进行了封装,对select、kqueue、epoll的性能比较可在网上找到详细资料。

/* State of an event based program */
typedef 
struct aeEventLoop {
    
int maxfd;
    
long long timeEventNextId;
    aeFileEvent events[AE_SETSIZE]; 
/* Registered events */
    aeFiredEvent fired[AE_SETSIZE]; 
/* Fired events */
    aeTimeEvent 
*timeEventHead;
    
int stop;
    
void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc 
*beforesleep;
} aeEventLoop;

其中maxfd是当前事件集合中最大的文件描述符id,timeEventNextId是下一个timer的id,events和fired分别保存了已注册的和已释放的文件event,timeEventHead指向一个timer event的链表,apidata保存了aeApiPoll的私有数据,其实也就是要监控的文件集合,具体实现要看采用哪种机制(select、kqueue、epoll三种机制)。stop用于停止事件循环,仅用于基准测试。beforesleep是在每次事件循环前都要被调用的函数,在main函数中被设置为beforeSleep函数。

对于文件event,其中mask为要检测的事件(读或者写),rfileProc、wfileProc分别为有读写事件时要调用的函数指针,clientData为函数要处理的数据;

/* File event structure */
typedef 
struct aeFileEvent {
    
int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc 
*rfileProc;
    aeFileProc 
*wfileProc;
    
void *clientData;
} aeFileEvent;
系统中的timer事件使用一个链表,每个timer有一个唯一的id,该timer在when_sec、 when_ms后被调用,调用函数为timeProc,timeProc处理的主要参数为clientData。在删除该timer时,需要调用 finalizerProc对clientData进行处理。 
/* Time event structure */
typedef 
struct aeTimeEvent {
    
long long id; /* time event identifier. */
    
long when_sec; /* seconds */
    
long when_ms; /* milliseconds */
    aeTimeProc 
*timeProc;
    aeEventFinalizerProc 
*finalizerProc;
    
void *clientData;
    
struct aeTimeEvent *next;
} aeTimeEvent;

我们分析下redis中事件的处理逻辑。

在函数initServer中调用aeCreateEventLoop完成初始化后,在main函数中调用ae_main,该函数是一个死循环:static void initServer() {

    ---
    server.el 
= aeCreateEventLoop();
    
---
}

int main(int argc, char **argv) {
   
---
   initServer();
   
---
   aeSetBeforeSleepProc(server.el,beforeSleep);
   aeMain(server.el);
   
---
}

void aeMain(aeEventLoop *eventLoop) {
    eventLoop
->stop = 0;
    
while (!eventLoop->stop) {
        
if (eventLoop->beforesleep != NULL)
            eventLoop
->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

 

尽管aeMain函数有退出条件,但除了基准测试中会调用aeStop修改该值,该条件不会被改变。

aeMain在处理event之前,先调用beforeSleep,该函数先处理已ready的client,然后刷新aof缓冲区(aof机制后续章节会详细分析):

void beforeSleep(struct aeEventLoop *eventLoop) {
    listNode *ln;
    redisClient *c;
    REDIS_NOTUSED(eventLoop);

    /* Try to process pending commands for clients that were just unblocked. */
    while (listLength(server.unblocked_clients)) {
        ln = listFirst(server.unblocked_clients);
        redisAssert(ln != NULL);
        c = ln->value;
        listDelNode(server.unblocked_clients,ln);
        c->flags &= ~REDIS_UNBLOCKED;

        /* Process remaining data in the input buffer. */
        if (c->querybuf && sdslen(c->querybuf) > 0)
            processInputBuffer(c);
    }

    /* Write the AOF buffer on disk */
    flushAppendOnlyFile(0);
}
aeMain调用aeProcessEvents处理文件事件和timer事件。aeProcessEvents 先获得最先超时的timer,并记下该timer距此时的时间段,将该时间段作为aeApiPoll的超时时间(以能尽快调用timer处理,因为是先处理file事件,后处理timer事件),aeApiPoll返回后将调用注册的read、write函数进行读写: 
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. 
*/
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. 
*/
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to se the timeout
             * to zero 
*/
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

        /* note the fe->mask & mask &  code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. 
*/
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

接着,aeProcessEvents调用 processTimeEvents处理timer事件(此时至少有一个超时),processTimeEvents循环处理已超时的timer。注意,processTimeEvent并不一定会删除超时的timer,代码如下: 
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;

        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
#ifdef _WIN32
            long long retval;
#else
            int retval;
#endif

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            /* After an event is processed our time event list may
             * no longer be the same, so we restart from head.
             * Still we make sure to don't process events registered
             * by event handlers itself in order to don't loop forever.
             * To do so we saved the max ID we want to handle.
             *
             * FUTURE OPTIMIZATIONS:
             * Note that this is NOT great algorithmically. Redis uses
             * a single time event so it's not a problem but the right
             * way to do this is to add the new elements on head, and
             * to flag deleted elements in a special way for later
             * deletion (putting references to the nodes to delete into
             * another linked list). 
*/
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                aeDeleteTimeEvent(eventLoop, id);
            }
            te = eventLoop->timeEventHead;
        } else {
            te = te->next;
        }
    }
    return processed;
}

当timer超时时,会调用timer创建时注册的timeProc,根据timerProc的返回值,是删除还是继续修改超时时间。注意,redis的主要循环处理函数serverCron就是靠这种定时机制得以反复运行的,该定时处理函数就一直返回100,这样就使得redis每隔100ms执行一次serverCron函数。

因此,redis的主要循环逻辑为一开始使用beforeSleep处理ready的client,然后处理相关的文件event,最后调用serverCron做一些工作。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    
int j, loops = server.cronloops;
    REDIS_NOTUSED(eventLoop);
    REDIS_NOTUSED(id);
    REDIS_NOTUSED(clientData);

    
/* We take a cached value of the unix time in the global state because
     * with virtual memory and aging there is to store the current time
     * in objects at every object access, and accuracy is not needed.
     * To access a global var is faster than calling time(NULL) 
*/
    server.unixtime 
= time(NULL);

    
/* We have just 22 bits per object for LRU information.
     * So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
     * 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
     *
     * Note that even if this will wrap after 1.5 years it's not a problem,
     * everything will still work but just some object will appear younger
     * to Redis. But for this to happen a given object should never be touched
     * for 1.5 years.
     *
     * Note that you can change the resolution altering the
     * REDIS_LRU_CLOCK_RESOLUTION define.
     
*/
    updateLRUClock();

    
/* Record the max memory used since the server was started. */
    
if (zmalloc_used_memory() > server.stat_peak_memory)
        server.stat_peak_memory 
= zmalloc_used_memory();

    
/* We received a SIGTERM, shutting down here in a safe way, as it is
     * not ok doing so inside the signal handler. 
*/
    
if (server.shutdown_asap) {
        
if (prepareForShutdown() == REDIS_OK) exit(0);
        redisLog(REDIS_WARNING,
"SIGTERM received but errors trying to shut down the server, check the logs for more information");
    }

    
/* Show some info about non-empty databases */
    
for (j = 0; j < server.dbnum; j++) {
        
long long size, used, vkeys;

        size 
= dictSlots(server.db[j].dict);
        used 
= dictSize(server.db[j].dict);
        vkeys 
= dictSize(server.db[j].expires);
        
if (!(loops % 50&& (used || vkeys)) {
            redisLog(REDIS_VERBOSE,
"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
            
/* dictPrintStats(server.dict); */
        }
    }

    
/* We don't want to resize the hash tables while a bacground saving
     * is in progress: the saving child is created using fork() that is
     * implemented with a copy-on-write semantic in most modern systems, so
     * if we resize the HT while there is the saving child at work actually
     * a lot of memory movements in the parent will cause a lot of pages
     * copied. 
*/
    
if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1) {
        
if (!(loops % 10)) tryResizeHashTables();
        
if (server.activerehashing) incrementallyRehash();
    }

    
/* Show information about connected clients */
    
if (!(loops % 50)) {
#ifdef _WIN32
        redisLog(REDIS_VERBOSE,
"%d clients connected (%d slaves), %llu bytes in use",
            listLength(server.clients)
-listLength(server.slaves),
            listLength(server.slaves),
            (unsigned 
long long)zmalloc_used_memory());
#else
        redisLog(REDIS_VERBOSE,
"%d clients connected (%d slaves), %zu bytes in use",
            listLength(server.clients)
-listLength(server.slaves),
            listLength(server.slaves),
            zmalloc_used_memory());
#endif
    }

    
/* Close connections of timedout clients */
    
if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
        closeTimedoutClients();

    
/* Start a scheduled AOF rewrite if this was requested by the user while
     * a BGSAVE was in progress. 
*/
    
if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1 &&
        server.aofrewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }

    
/* Check if a background saving or AOF rewrite in progress terminated */
    
if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
        
int statloc;
        pid_t pid;

        
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            
if (pid == server.bgsavechildpid) {
                backgroundSaveDoneHandler(statloc);
            } 
else {
                backgroundRewriteDoneHandler(statloc);
            }
            updateDictResizePolicy();
        }
    } 
else {
         time_t now 
= time(NULL);

        
/* If there is not a background saving in progress check if
         * we have to save now 
*/
         
for (j = 0; j < server.saveparamslen; j++) {
            
struct saveparam *sp = server.saveparams+j;

            
if (server.dirty >= sp->changes &&
                now
-server.lastsave > sp->seconds) {
                redisLog(REDIS_NOTICE,
"%d changes in %d seconds. Saving",
                    sp
->changes, sp->seconds);
                rdbSaveBackground(server.dbfilename);
#ifdef _WIN32
                
/* On windows this will save in foreground and block */
                
/* Here we are allready saved, and we should return */
                
return 100;
#else
                
break;
#endif
            }
         }

         
/* Trigger an AOF rewrite if needed */
         
if (server.bgsavechildpid == -1 &&
             server.bgrewritechildpid 
== -1 &&
             server.auto_aofrewrite_perc 
&&
             server.appendonly_current_size 
> server.auto_aofrewrite_min_size)
         {
            
long long base = server.auto_aofrewrite_base_size ?
                            server.auto_aofrewrite_base_size : 
1;
            
long long growth = (server.appendonly_current_size*100/base- 100;
            
if (growth >= server.auto_aofrewrite_perc) {
                redisLog(REDIS_NOTICE,
"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
        }
    }


    
/* If we postponed an AOF buffer flush, let's try to do it every time the
     * cron function is called. 
*/
    
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

    
/* Expire a few keys per cycle, only if this is a master.
     * On slaves we wait for DEL operations synthesized by the master
     * in order to guarantee a strict consistency. 
*/
    
if (server.masterhost == NULL) activeExpireCycle();

    
/* Replication cron function -- used to reconnect to master and
     * to detect transfer failures. 
*/
    
if (!(loops % 10)) replicationCron();

    server.cronloops
++;
    
return 100;
}

serverCron做的工作很多,后续的很多章节都与此有关。该函数较复杂,分段分析。
一开始将当前时间保存,方便后续vm等机制对当前时间的访问;接着,如果收到SIGTERM等信号,则会在信号处理函数中设置server.shutdown_asap为1,此时就会调用prepareForShutdown做结束运行前的结尾工作;接着,显示些db中的信息;当没有进行save或者rewrite aof的后台子进程运行时,会调用tryResizeHashTables、incrementallyRehash,以分别调整db的大小和重新rehash db。当有后台子进程运行时,进行rehash会使得系统使用较多的内存。 接着,显示些client的信息,并关闭timeout的连接; 如果有后台子进程进行save或者rewrite aof的工作,此时等待其退出,并调用backgroundSaveDoneHandler或者backgroundRewriteDoneHandler做些后续工作,否则检查是否需要save db;释放expire的key:释放时从expired链表中随机选择,如果循环中超时的key的数量超过了设置值REDIS_EXPIRELOOKUPS_PER_CRON 的25%,则继续释放;如果内存超出阈值,则释放内存,前面的内存章节中已对此进行过分析. 
 
 



 

 

posted on 2012-08-03 11:18 探路者 阅读(697) 评论(0)  编辑 收藏 引用 所属分类: 学习笔记


只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理


导航

统计

常用链接

留言簿

随笔分类

随笔档案

文章分类

文章档案

新闻档案

Android

Compiler Course

VIM

编译技术集合

测试

高性能计算

个人博客

框架/组件/库

搜索

最新评论

阅读排行榜

评论排行榜