posts - 297,  comments - 15,  trackbacks - 0
epoll,event poll,on linux kernel 2.6.x.pthread,nptl-2.12
   LT/ET:ET也会多次发送event,当然频率远低于LT,但是epoll one shot才是真正的对"one connection VS one thread in worker thread pool,不依赖于任何connection-data-queue"的基础支持 .我看到大部分对epoll_wait的处理模式如下,很教科化,因为man-pages就是这样举例子的。
man-pages epoll_wait handle:
#define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS];
for (;;) 
{
   nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
   for(1~nfds)
       handle events[i];
   ......
}
epoll_ctl_add的当然是EPOLLIN|EPOLLET,此外我就不知道处理上面代码的是一条线程还是一堆线程(in threadpool),但愿不是一条线程吧!如果是的话,难不成等处理完这MAX_EVENTS 个再去处理接下来的MAX_EVENTS 个?慢否? 但是如果是一堆线程的话,你是否考虑过如何处理request-data in one connection在逻辑上的完整性,也就是一个request-data-packet可能会被分割成若干次发送,在上面的处理模式中你真的要好好设计一下了。
而我的epoll_wait处理模式如下:
struct epoll_event activeEvent;
for(;;)
{
   epoll_wait(epollfd, &activeEvent, 1/*很惊讶吗,但绝不是一条线程在运行这段代码,而是一堆*/, timeout);
   if handle activeEvent success
      epoll_ctl_mod EPOLLIN|EPOLLET|EPOLLONESHOT
  ......
}
处理上面代码的当然是一堆线程in threadpool,而且epoll_ctl_add的是EPOLLIN|EPOLLET|EPOLLONESHOT
因为我的设计理念是严格遵守one connection VS one thread in worker thread pool。
所以我下面的server框架的基本模型是:
One connection VS one thread in worker thread pool ,worker thread performs epollWorkerRoutine.
在epollWorkerRoutine中有以下的职责:
1.handle request,当忙时增加epollWorkerThread数量但不超过maxThreads,post/MOD EPOLLIN|EPOLLONESHOT Interested Event to epoll.
2.timeout时检查是否空闲和当前epollWorkerThread数量,当空闲时保持或减少至minThreads数量.
3.对所有Accepted-socket管理生命周期,这里利用系统的keepalive probes,若想实现业务层"心跳探测"只需将QSS_SIO_KEEPALIVE_VALS_TIMEOUT 改回系统默认的2小时.这里并不维护所有连接列表,当然你可以在/proc/getpid/fd下找到所有的socket fd.
4.linux上的non-blocking socket的操作仍然依赖recv,send,不像windows上的wsarecv+overlapped,即便不用fcntl fd o_nonblock也可以立即返回。我这里把send动作实现成了blocking的(internalBlockingSender),同样的道理,non-blocking send依然会形成响应数据在逻辑上的碎片错序,特别是你如果采用上面那个教科化的处理模式的化,并且还是多线程的话,那么这简直就乱透了。当然你可以使用response-data-queue来达到异步准确发送数据的目的。

下面结合源码,浅析一下epoll programming:
socketserver.h
#ifndef __Q_SOCKET_SERVER__
#define __Q_SOCKET_SERVER__
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <string.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define SOCKET_ERROR -1
#define INVALID_SOCKET -1
typedef int SOCKET;
typedef struct sockaddr_in SOCKADDR_IN;
typedef unsigned short WORD;
typedef unsigned int DWORD;

#define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60
#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5
#define QSS_SIO_KEEPALIVE_VALS_COUNT 3
#define MAX_THREADS 100
#define MAX_THREADS_MIN  10
#define MIN_WORKER_WAIT_TIMEOUT  20*1000
#define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT
#define MAX_THREADPOOLS  32

#define MAX_BUF_SIZE 1024
/* ulimit -n opened FDs per process.记得修改哦,否则还是select效果,就不是epoll效果了哦,呵呵*/
#define BLOCKING_SEND_TIMEOUT 20

typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose
typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);
typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR

typedef struct {
  WORD passive;
  WORD port;//uint16_t
  WORD minThreads;
  WORD maxThreads;
  pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE
  volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped
  int  workerWaitTimeout;//wait timeout
  volatile int workerCounter;
  volatile int currentBusyWorkers;
  volatile int CSocketsCounter;
  CSocketLifecycleCallback cslifecb;
  InternalProtocolHandler protoHandler;
  SOCKET server_s;
  SOCKADDR_IN serv_addr;
  int epollFD;//main epoller.
  int BSendEpollFD;//For blocking send.
}QSocketServer;

typedef struct {
  SOCKET client_s;
  SOCKADDR_IN client_addr;
  uint32_t curEvents;

  char buf[MAX_BUF_SIZE];
  DWORD numberOfBytesTransferred;
  char * data;

  int BSendEpollFDRelated;
  pthread_mutex_t writableLock;
  pthread_cond_t  writableMonitor;
}QSSEPollEvent;//for per connection

int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout);
int startSocketServer(QSocketServer *qss);
int shutdownSocketServer(QSocketServer *qss);
#endif

qsocketserver_model.c  //下面的代码离生产环境还差内存池logger哦!
 #include "socketserver.h"
#include <dirent.h>
#include <regex.h>
#define DIGIT_PATTERN_STRING "^[0-9]+$"
void *  epollWorkerRoutine(void *);
void *  blockingSendEpollerRoutine(void *);
int isDigitStr(const char *str){
    int ret=-1;
    regex_t regex;
    regmatch_t matchs[1];
    if(!regcomp(&regex,DIGIT_PATTERN_STRING,REG_EXTENDED/*这里不要传0哦,否则nomatch*/)){
        ret=!regexec(&regex,str, 1,matchs,0);
        regfree(&regex);
    }
    return ret;
}

static int setNonBlocking(int sock)
{
    int opts;
    opts=fcntl(sock,F_GETFL);
    if(opts==-1)
    {
        perror("fcntl(sock,GETFL) failed!\n");
        return opts;
    }
    opts = opts|O_NONBLOCK;
    opts=fcntl(sock,F_SETFL,opts);
    if(opts==-1)
    {
        perror("fcntl(sock,SETFL,opts) failed!\n");
        return opts;
    }
    return 1;
}

static void adjustQSSWorkerLimits(QSocketServer *qss){
   //to adjust availabe size.
}
typedef struct{
 QSocketServer * qss;
 pthread_t th;
}QSSWORKER_PARAM;

static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
 WORD res=0;
 if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
 {
  QSSWORKER_PARAM * pParam=NULL;
  int i=0;
  pthread_spin_lock(&qss->g_spinlock);
  if(qss->workerCounter+addCounter<=qss->maxThreads)
   for(;i<addCounter;i++)
   {
    pParam=malloc(sizeof(QSSWORKER_PARAM));

    if(pParam){
     pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
     pParam->qss=qss;
     qss->workerCounter++,res++;
    }
   }
  pthread_spin_unlock(&qss->g_spinlock);
 }
 return res;
}

static void SOlogger(const char * msg,SOCKET s){
 perror(msg);
    if(s>0)
    close(s);
}

static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){
    QSSEPollEvent *qssEPEvent=event->data.ptr;
    int ret;
    printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
    if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
        //sleep(5);
        ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
    }
     printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
 return ret;
}

int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)
{

 QSocketServer * qss=malloc(sizeof(QSocketServer));
 qss->passive=passive;
 qss->port=port;
 qss->minThreads=minThreads;
 qss->maxThreads=maxThreads;
 qss->workerWaitTimeout=workerWaitTimeout;
 qss->lifecycleStatus=0;
 pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
 qss->workerCounter=0;
 qss->currentBusyWorkers=0;
 qss->CSocketsCounter=0;
 qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
 if(!qss->protoHandler)
  qss->protoHandler=_InternalProtocolHandler;
 adjustQSSWorkerLimits(qss);
 *qss_ptr=qss;
 return 1;
}

int startSocketServer(QSocketServer *qss)
{
    if(qss==NULL)
        return 0;
    else{
        pthread_spin_lock(&qss->g_spinlock);
        if(qss->lifecycleStatus==0){
            qss->lifecycleStatus=1;
            pthread_spin_unlock(&qss->g_spinlock);
        }else{
            pthread_spin_unlock(&qss->g_spinlock);
            return 0;
        }
    }
    //bzero(&qss->serv_addr, sizeof(qss->serv_addr));

 qss->serv_addr.sin_family=AF_INET;
 qss->serv_addr.sin_port=htons(qss->port);
 inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
 //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");

 qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
 if(setNonBlocking(qss->server_s)==-1)
 {
    SOlogger("setNonBlocking server_s failed.\n",0);
     return 0;
 }

 if(qss->server_s==INVALID_SOCKET)
 {
  SOlogger("socket failed.\n",0);
  return 0;
 }

 if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)
 {
  SOlogger("bind failed.\n",qss->server_s);
  return 0;
 }

 if(listen(qss->server_s,SOMAXCONN/*这个宏windows也有,这里是128,当然你可以设的小些,它影响开销的*/)==SOCKET_ERROR)
{
  SOlogger("listen failed.\n",qss->server_s);
  return 0;
}
    qss->epollFD=epoll_create1(0);/*这里不是epoll_create(size)哦,你可能不知道如何设置size,所以忽略它吧*/
    if(qss->epollFD==-1){
        SOlogger("epoll_create1 0, main epollFD  failed.\n",qss->server_s);
        return 0;
    }
    qss->BSendEpollFD=epoll_create1(0);//for blocking send.
    if(qss->BSendEpollFD==-1){
        SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
        return 0;
    }

    {//ADD ACCEPT EVENT
        struct epoll_event _epEvent;
        QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
        qssEPEvent->client_s=qss->server_s;
        _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
        _epEvent.data.ptr=qssEPEvent;
        if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
            SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
            free(qssEPEvent);
            return 0;
        }
    }
    {//starup blocking send epoller.
        QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
        pParam->qss=qss;
        pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
    }

 //initialize worker for epoll events.
 addQSSWorker(qss,qss->minThreads);
 qss->lifecycleStatus=2;
 return 1;
}

int shutdownSocketServer(QSocketServer *qss){
    //change qss->lifecycleStatus
    if(qss==NULL)
        return 0;
    else{
        pthread_spin_lock(&qss->g_spinlock);
        if(qss->lifecycleStatus==2){
            qss->lifecycleStatus=3;
            pthread_spin_unlock(&qss->g_spinlock);
        }else{
            pthread_spin_unlock(&qss->g_spinlock);
            return 0;
        }
    }
    /*shutdown server-listening socket,这里优雅的做法是shutdown--notify-->epoll-->close.记得shutdown会发送EOF的哦*/
    shutdown(qss->server_s,SHUT_RDWR);

    // /proc/getpid/fd  shutdown all socket cs != serv_s
    {
        char dirBuf[64];
        struct dirent * de;
        DIR *pd=NULL;
        int sockFD;
        sprintf(dirBuf,"/proc/%d/fd/",getpid());
        pd=opendir(dirBuf);
        if(pd!=NULL){
            while((de=readdir(pd))!=NULL){
                if(isDigitStr(de->d_name)){
                    sockFD=atoi(de->d_name);
                    if(isfdtype(sockFD,S_IFSOCK))
                    shutdown(sockFD,SHUT_RDWR);
                }
            }
            closedir(pd);
        }
        /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
    }
 return 1;
}

static int onAcceptRoutine(QSocketServer * qss)
{
    SOCKADDR_IN client_addr;
    unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
    SOCKET cs;
    struct epoll_event _epEvent;
    QSSEPollEvent *qssEPEvent=NULL;
    cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
    if(cs==INVALID_SOCKET)
    {
        printf("onAccept failed:%d,%s\n",errno,strerror(errno));
        epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22  Invalid argument
        return 0;
    }
    if(setNonBlocking(cs)==-1)
    {
        printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
        return 0;
    }

    {// set keepalive option
        int keepAlive = 1;
        int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
        int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
        int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
        if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
           setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
           setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
           setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
           {
            printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
            return 0;
           }
    }
    qssEPEvent=malloc(sizeof(QSSEPollEvent));
    qssEPEvent->client_s=cs;
    {
        _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
        qssEPEvent->BSendEpollFDRelated=0;
        _epEvent.data.ptr=qssEPEvent;/*这里又和教科的不一样哦,真正的user data用ptr,而不是单一的fd*/
        if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
            printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
            free(qssEPEvent);
            return 0;
        }else{
            pthread_spin_lock(&qss->g_spinlock);
            qss->CSocketsCounter++;
            pthread_spin_unlock(&qss->g_spinlock);
            if(qss->cslifecb)
                qss->cslifecb(cs,0);
        }
    }
    printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
    return 1;
}

typedef struct{
    QSocketServer * qss;
    QSSEPollEvent * event;
}InternalSenderBase_t;

static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){
    InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
    char * _sbuf=_buf;
    int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;

    QSSEPollEvent *qssEPEvent=NULL;
    struct epoll_event _epEvent;

    struct timespec sendTimeo;

    while(1){
        *errno_ptr=0;
        while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
            sum+=ret,_sbuf+=ret;
        if(sum==nbs||ret==0)
            break;
        else if(ret==-1){
            if(errno==EAGAIN&&sum<nbs){
                qssEPEvent=sb->event;
                _epEvent.data.ptr=qssEPEvent;
                _epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
                if(qssEPEvent->BSendEpollFDRelated==0){
                    pthread_mutex_init(&qssEPEvent->writableLock,NULL);
                    pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
                    qssEPEvent->BSendEpollFDRelated=1;
                    curEpoll_ctl_opt=EPOLL_CTL_ADD;
                }else{
                    curEpoll_ctl_opt=EPOLL_CTL_MOD;
                }

                {//wait writable.
                    int flag=0;
                    pthread_mutex_lock(&qssEPEvent->writableLock);
                    if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
                        sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
                        int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
                        if(err)
                            flag=-1;
                    }else
                        flag=-1;
                    pthread_mutex_unlock(&qssEPEvent->writableLock);
                    if(flag==-1)
                        break;
                }

            }else{
                if(errno==EAGAIN&&sum==nbs)
                    ret=nbs;//it is ok;
                break;
            }
        }
    }//end while.
    return ret;
}
void *  blockingSendEpollerRoutine(void *_param){
    QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
    QSocketServer * qss=pParam->qss;
    //pthread_t * curThread=&pParam->th;
    struct epoll_event epEvents[qss->maxThreads];
    QSSEPollEvent *qssEPEvent=NULL;
    int pollRes,*errno_ptr=&errno;

    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

    free(pParam);
    while(1){

        pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1);
        if(pollRes>=1){
            int i=0;
            for(;i<pollRes;i++)
            if(epEvents[i].events&EPOLLOUT){//这个epollfd只应该做以下的事情,少做为快!
                qssEPEvent=epEvents[i].data.ptr;
                pthread_mutex_lock(&qssEPEvent->writableLock);
                pthread_cond_signal(&qssEPEvent->writableMonitor);
                pthread_mutex_unlock(&qssEPEvent->writableLock);
            }

        }else if(pollRes==-1){//errno 
            printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
            break;
        }

    }

    return NULL;
}

void *  epollWorkerRoutine(void * _param){
    QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
    QSocketServer * qss=pParam->qss;
    pthread_t * curThread=&pParam->th;
    struct epoll_event _epEvent;
    QSSEPollEvent *qssEPEvent=NULL;
    InternalSenderBase_t _senderBase;
    int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
    _senderBase.qss=qss;
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

    free(pParam);
    while(!exitCode){

        *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;
        pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
        if(pollRes==1){
            qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;

            if(qssEPEvent->client_s==qss->server_s)
            {//Accepted Socket.
               onAcceptRoutine(qss);
               continue;
            }else{
                if(qss->protoHandler){
                    _senderBase.event=_epEvent.data.ptr;
                    pthread_spin_lock(&qss->g_spinlock);
                    qss->currentBusyWorkers++;
                    pthread_spin_unlock(&qss->g_spinlock);

                    addQSSWorker(qss,1);
                    handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);

                    pthread_spin_lock(&qss->g_spinlock);
                    qss->currentBusyWorkers--;
                    pthread_spin_unlock(&qss->g_spinlock);

                    if(handleCode>0){
                        _epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
                        if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
                            SOErrOccurred=2;
                    }else{
                        SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
                    }
                }
            }

        }else if(pollRes==0){//timeout
            printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
            if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
            {
                pthread_spin_lock(&qss->g_spinlock);
                if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
                    qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
                    exitCode=2;
                }
                pthread_spin_unlock(&qss->g_spinlock);
            }else if(qss->lifecycleStatus>=4)
                    exitCode=4;

        }else if(pollRes==-1){//errno
            printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
            exitCode=1;
        }

        if(SOErrOccurred){
            if(qss->cslifecb)
                qss->cslifecb(qssEPEvent->client_s,-1);
            /*if(qssEPEvent)*/{
                epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                close(qssEPEvent->client_s);
                if(qssEPEvent->BSendEpollFDRelated){
                    pthread_cond_destroy(&qssEPEvent->writableMonitor);
                    pthread_mutex_destroy(&qssEPEvent->writableLock);
                }
                free(qssEPEvent);
              }
            pthread_spin_lock(&qss->g_spinlock);
            if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
             //for qss workerSize,
             qss->lifecycleStatus=4;
             exitCode=3;
            }
            pthread_spin_unlock(&qss->g_spinlock);
        }//SOErrOccurred handle;

    }//end main while.

    if(exitCode!=2){
        int clearup=0;
        pthread_spin_lock(&qss->g_spinlock);
        if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
          clearup=1;
        }
        pthread_spin_unlock(&qss->g_spinlock);
        if(clearup){
         close(qss->epollFD);
         close(qss->BSendEpollFD);
         pthread_spin_destroy(&qss->g_spinlock);
         free(qss);
        }
    }//exitCode handle;
 return NULL;
}


from:

http://www.cppblog.com/adapterofcoms/archive/2010/08/03/122063.html

posted on 2010-08-25 20:41 chatler 阅读(2506) 评论(0)  编辑 收藏 引用 所属分类: NetworkSocketLinux_Coding

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理


<2010年4月>
28293031123
45678910
11121314151617
18192021222324
2526272829301
2345678

常用链接

留言簿(10)

随笔分类(307)

随笔档案(297)

algorithm

Books_Free_Online

C++

database

Linux

Linux shell

linux socket

misce

  • cloudward
  • 感觉这个博客还是不错,虽然做的东西和我不大相关,觉得看看还是有好处的

network

OSS

  • Google Android
  • Android is a software stack for mobile devices that includes an operating system, middleware and key applications. This early look at the Android SDK provides the tools and APIs necessary to begin developing applications on the Android platform using the Java programming language.
  • os161 file list

overall

搜索

  •  

最新评论

阅读排行榜

评论排行榜