A Za, A Za, Fighting...

坚信:勤能补拙

epoll方法实现non-blocking socket

epoll方法实现non-blocking socket

© Min的技术分享 – 54min.com (RSS订阅) | 原文链接:http://54min.com/post/using-epoll-method-create-non-blocking-socket.html

epoll方法实现non-blocking socket

event-based方法和epoll

epoll是event-based的方法实现异步io/non-blocking io。从Linux kernel 2.5.44之后epoll加入Linux kernel中,代替loop style方法的selectpoll,比后者更加高效更适用于高并发多client的应用。loop style方法的时间复杂度为O(n)(因为需要线性地检测指定的file descriptor),而epoll等event-based方法的时间复杂度为O(1)。event-based通过为不同的events设置callback函数,在该event发生的时候自动执行相应函数(epoll uses callbacks in the kernel file structure)。

和epoll类似的其他event-based的方法还有:kqueue(FreeBSD/NetBSD/OpenBSD/Darwin), /dev/poll(Solaris/HPUX), pollset(AIX), Event Completion(Solaris 10), I/O Completion Ports(Windows)等。因此如果程序运行目标平台是Linux(Kernel > 2.5.44)可以使用epoll即可,如果其他Unix平台可以考虑相应的方法(如在FreeBSD等UNIX上使用kqueue)。如果希望使用跨平台可移植的event-based可以使用libevent库,它支持多种方法(select/poll, epoll, kqueue, /dev/poll等)。

epoll提供的系统调用

epoll包含了3个系统调用:epoll_createepoll_ctlepoll_wait(需#include <sys/epoll.h>)。具体步骤是:首先使用epoll_create创建一个epoll file descriptor;然后使用epoll_ctl添加要监听的IO file descriptor到epoll中;最后循环地调用epoll_wait检测各IO fd相关events的变化,然后采取相应的措施。

epoll_create

    int epoll_create(int size); //创建epoll

其中size告知kernel需要为之后添加的IO file descriptor准备的event backing store的大小。Open an epoll file descriptor by requesting the kernel allocate an event backing store dimensioned for size descriptors. The size is not the maximum size of the backing store but just a hint to the kernel about how to dimension internal structures.

Linux 2.6.8开始size值不再被使用,但是其赋值需要> 0。参考: http://www.kernel.org/doc/man-pages/online/pages/man2/epoll_create.2.html

使用epoll_create创建的epoll file descriptor在程序结束的时候需要使用close()将其关闭,例如:

    int epfd;
    if ( (epfd = epoll_create(1)) == -1 ) {
        perror("epoll_create failed");
        exit(EXIT_FAILURE);
    }

    ...

    close(epfd);

epoll_ctl

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // add/update/del IO file descriptor to be watched on the epoll instance

    //epfd: 即是如上epoll_create创建的epoll file descriptor
    //op:   指定要对指定的fd进行何种操作,支持的操作包括:
                EPOLL_CTL_ADD   将指定的file descriptor添加到epoll中
                EPOLL_CTL_MOD   修改指定file descriptor的event,相当于update
                EPOLL_CTL_DEL   将指定的file descriptor从epoll中清除,the event is ignored and can be NULL
    //fd:   要操作的IO file descriptor
    //event: 表示the event linked to this file descriptor

其中struct epoll_event的定义如下:

    struct epoll_event {
        __uint32_t      events; //epoll events
        epoll_data_t    data;   //user data variable
    };
    typedef union epoll_data {
        void        *ptr;
        int         fd;
        __uint32_t  u32;
        __uint64_t  u64;
    } epoll_data_t;

其中的struct epoll_eventevents域是一个bit set(可以通过|操作符进行多赋值),支持的event type有:

    EPOLLIN     //ready for read
    EPOLLOUT    //ready for write
    EPOLLPRI    //urgent data available for read

    EPOLLERR    //error condition happened, epoll_wait会默认检测该event不需要设置
    EPOLLHUP    //hang up happended on the fd, epoll_wait会默认检测该event不需要设置

    EPOLLET     //设置使用Edge Triggered模式,epoll模式使用Level Triggered

更多参看:http://linux.die.net/man/2/epoll_ctl

epoll_ctl的返回值:成功返回0, 发生错误返回-1

epoll_wait

    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

    //epfd      为如上定义的epoll file descriptor
    //events    返回发生改变的events
    //maxevents 最多返回events的个数,必须 > 0
    //timeout   等待的milliseconds;和poll类似,如果timeout设置为-1则epoll_wait将持续等待下去;如果timeout设置为0,则epoll_wait将立即返回

该函数类似selectpoll函数,执行的时候会等待直到epfd定义的指定IO fd的events发生变化或timeout参数指定的milliseconds时间到期才返回。wait and block until events on the watched set happens or timeout expires

返回值:成功返回number of fd ready for requested io; 0表示在timeout以后没有ready的fd; -1表示发生错误。

epoll检测events改变的两种模式:edge-triggered和level-triggered

调用epoll_wait会返回events发生变化的IO fd,epoll支持两种模式:

  • level triggered:

只要发生的events没有结束,每次调用epoll_wait都显示该events存在。例如:当一个IO fd的状态变为available for reading的时候,调用epoll_wait会将该event返回;如果下次调用epoll_wait的时候该read过程还没有完成,则epoll_wait仍旧会返回该event。

  • edge triggered(EPOLLET )

和level triggered不同,它只在event产生的时候发出event信息,之后即使event没有结束不再发送此信息。例如:当一个IO fd状态变为available for reading的时候,调用epoll_wait会将该event返回;如果下次调用epoll_wait的时候该read过程还没有完成,epoll_wait不会立即返回而是需要等待新的events或直到timetout的时间。Edge Triggered event distribution delivers events only when events happens on the monitored file.

epoll默认采用Level Triggered模式,如果需要对某个IO fd采用Edge Triggered模式,在调用epoll_ctl的时候指定其struct epoll_event的events的时候添加EPOLLET

epoll实现non-blocking socket实例:

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <arpa/inet.h>
    #include <sys/epoll.h>
    #include <errno.h>

    #define DEFAULT_PORT    1984    //默认端口
    #define BUFF_SIZE       1024    //buffer大小

    #define EPOLL_MAXEVENTS 64      //epoll_wait的最多返回的events个数
    #define EPOLL_TIMEOUT   5000    //epoll_wait的timeout milliseconds

    //函数:设置sock为non-blocking mode
    void setSockNonBlock(int sock) {
        int flags;
        flags = fcntl(sock, F_GETFL, 0);
        if (flags < 0) {
            perror("fcntl(F_GETFL) failed");
            exit(EXIT_FAILURE);
        }
        if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) {
            perror("fcntl(F_SETFL) failed");
            exit(EXIT_FAILURE);
        }
    }

    int main(int argc, char *argv[]) {

        //获取自定义端口
        unsigned short int port;
        if (argc == 2) {
            port = atoi(argv[1]);
        } else if (argc < 2) {
            port = DEFAULT_PORT;
        } else {
            fprintf(stderr, "USAGE: %s [port]\n", argv[0]);
            exit(EXIT_FAILURE);
        }

        //创建socket
        int sock;
        if ( (sock = socket(PF_INET, SOCK_STREAM, 0)) == -1 ) {
            perror("socket failed");
            exit(EXIT_FAILURE);
        }
        printf("socket done\n");

        //in case of 'address already in use' error message
        int yes = 1;
        if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int))) {
            perror("setsockopt failed");
            exit(EXIT_FAILURE);
        }

        //设置sock为non-blocking
        setSockNonBlock(sock);

        //创建要bind的socket address
        struct sockaddr_in bind_addr;
        memset(&bind_addr, 0, sizeof(bind_addr));
        bind_addr.sin_family = AF_INET;
        bind_addr.sin_addr.s_addr = htonl(INADDR_ANY);  //设置接受任意地址
        bind_addr.sin_port = htons(port);               //将host byte order转换为network byte order

        //bind sock到创建的socket address上
        if ( bind(sock, (struct sockaddr *) &bind_addr, sizeof(bind_addr)) == -1 ) {
            perror("bind failed");
            exit(EXIT_FAILURE);
        }
        printf("bind done\n");

        //listen
        if ( listen(sock, 5) == -1) {
            perror("listen failed");
            exit(EXIT_FAILURE);
        }
        printf("listen done\n");

        //创建epoll (epoll file descriptor)
        int epfd;
        if ( (epfd = epoll_create(1)) == -1 ) {
            perror("epoll_create failed");
            exit(EXIT_FAILURE);
        }
        //将sock添加到epoll中
        struct epoll_event event;
        event.events = EPOLLIN;
        event.data.fd = sock;
        if ( epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &event) == -1 ) {
            perror("epoll_ctl");
            exit(EXIT_FAILURE);
        }

        //初始化epoll_wait的参数
        struct epoll_event events[EPOLL_MAXEVENTS];
        memset(events, 0, sizeof(events));

        //循环侦听
        int conn_sock;
        struct sockaddr_in client_addr;
        socklen_t client_addr_len;
        char client_ip_str[INET_ADDRSTRLEN];
        int res;
        int i;
        char buffer[BUFF_SIZE];
        int recv_size;

        while (1) {

            //每次循环调用依次epoll_wait侦听
            res = epoll_wait(epfd, events, EPOLL_MAXEVENTS, EPOLL_TIMEOUT);
            if (res < 0) {
                perror("epoll_wait failed");
                exit(EXIT_FAILURE);
            } else if (res == 0) {
                fprintf(stderr, "no socket ready for read within %d secs\n", EPOLL_TIMEOUT / 1000);
                continue;
            }

            //检测到res个IO file descriptor的events,loop各个fd进行响应
            for (i = 0; i < res; i++) {
                //events[i]即为检测到的event,域events[i].events表示具体哪些events,域events[i].data.fd即对应的IO fd

                if ( (events[i].events & EPOLLERR) || 
                     (events[i].events & EPOLLHUP) ||
                     (!(events[i].events & EPOLLIN)) ) {
                    //由于events[i].events使用每个bit表示event,因此判断是否包含某个具体事件可以使用`&`操作符
                    //这里判断是否存在EPOLLERR, EPOLLHUP等event
                    fprintf (stderr, "epoll error\n");
                    close (events[i].data.fd);
                    continue;
                }

                //对检测到event的各IO fd进行响应
                if (events[i].data.fd == sock) {

                    //当前fd是server的socket,不进行读而是accept所有client连接请求
                    while (1) {
                        client_addr_len = sizeof(client_addr);
                        conn_sock = accept(sock, (struct sockaddr *) &client_addr, &client_addr_len);
                        if (conn_sock == -1) {
                            if ( (errno == EAGAIN) || (errno == EWOULDBLOCK) ) {
                                //non-blocking模式下无新connection请求,跳出while (1)
                                break;
                            } else {
                                perror("accept failed");
                                exit(EXIT_FAILURE);
                            }
                        }
                        if (!inet_ntop(AF_INET, &(client_addr.sin_addr), client_ip_str, sizeof(client_ip_str))) {
                            perror("inet_ntop failed");
                            exit(EXIT_FAILURE);
                        }
                        printf("accept a client from: %s\n", client_ip_str);
                        //设置conn_sock为non-blocking
                        setSockNonBlock(conn_sock);
                        //把conn_sock添加到epoll的侦听中
                        event.events = EPOLLIN;
                        event.data.fd = conn_sock;
                        if ( epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock, &event) == -1 ) {
                            perror("epoll_ctl(EPOLL_CTL_ADD) failed");
                            exit(EXIT_FAILURE);
                        }
                    }

                } else {

                    //当前fd是client连接的socket,可以读(read from client)
                    conn_sock = events[i].data.fd;
                    memset(buffer, 0, sizeof(buffer));
                    if ( (recv_size = recv(conn_sock, buffer, sizeof(buffer), 0)) == -1  && (errno != EAGAIN) ) {
                        //recv在non-blocking模式下,返回-1且errno为EAGAIN表示当前无可读数据,并不表示错误
                        perror("recv failed");
                        exit(EXIT_FAILURE);
                    }
                    printf("recved from conn_sock=%d : %s(%d length string)\n", conn_sock, buffer, recv_size);

                    //立即将收到的内容写回去
                    if ( send(conn_sock, buffer, recv_size, 0) == -1 && (errno != EAGAIN) && (errno != EWOULDBLOCK) ) {
                        //send在non-blocking模式下,返回-1且errno为EAGAIN或EWOULDBLOCK表示当前无可写数据,并不表示错误
                        perror("send failed");
                        exit(EXIT_FAILURE);
                    }
                    printf("send to conn_sock=%d done\n", conn_sock);

                    //将当前socket从epoll的侦听中移除(有文章说:关闭con_sock之后,其会自动从epoll中删除,因此此段代码可以省略)
                    if ( epoll_ctl(epfd, EPOLL_CTL_DEL, conn_sock, NULL) == -1 ) {
                        perror("epoll_ctl(EPOLL_CTL_DEL) failed");
                        exit(EXIT_FAILURE);
                    }

                    //关闭连接
                    if ( close(conn_sock) == -1 ) {
                        perror("close failed");
                        exit(EXIT_FAILURE);
                    }
                    printf("close conn_sock=%d done\n", conn_sock);
                }
            }

        }

        close(sock);    //关闭server的listening socket
        close(epfd);    //关闭epoll file descriptor

        return 0;
    }

测试:编译并运行程序;然后尝试运行多个telnet localhost 1984和server进行通信。

注意:epoll在使用epoll_ctl为file descriptor指定events的时候,默认采用Level Triggered,即如果events未完成调用epoll_wait的话每次都会返回该事件;通过如下方式:

    struct epoll_event event;
    event.data.fd   =   sock;
    event.events    =   EPOLLIN | EPOLLET;
    if ( epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &event) == -1 ) {
        perror("epoll_ctl(EPOLL_CTL_ADD) failed");
        exit(EXIT_FAILURE);
    }

可以指定该fd的event采用Edge Triggered模型,如果采用该模型,epoll_wait检测到每次事件变化只通知一次,因此在epoll_wait之后的处理的时候需要注意(例如有可读的event的时候,注意数据读取完整)。可参考该文章介绍的代码

小结:

epoll这种event-based的方法比较1。select/poll等loop style方法;2。多进程(forking)/多线程(threading)方法(每个进程或线程对应一个connection socket),在多client高并发下性能更优越。因此推荐在实际中应用。例如:Nginx, Lighttpd, Memcached等都采用有该event-based的异步IO模型。

另外,event-handling库libevent也支持epoll方法(还支持kqueue(FreeBSD/NetBSD/OpenBSD/Darwin), /dev/poll(Solaris/HPUX), select, poll等方法)在实际中也可使用该库编写高性能的Server,方便实现跨平台可移植。

posted on 2011-09-05 20:00 simplyzhao 阅读(4635) 评论(0)  编辑 收藏 引用 所属分类: R_找工复习2011


只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理


导航

<2010年6月>
303112345
6789101112
13141516171819
20212223242526
27282930123
45678910

统计

常用链接

留言簿(1)

随笔分类

随笔档案

搜索

最新评论

阅读排行榜

评论排行榜