#include <deque> #include <map> #include <vector> #include <pthread.h> #include <semaphore.h> #include <time.h> #include <sys/time.h> #include <sys/shm.h> #include <errno.h> #include <sys/types.h> #include <fcntl.h> #include <stdio.h>
#include <string> #include <cstdio> #include <unistd.h> #include <signal.h> #include <sys/types.h> #include <sys/stat.h>
#include <cstdlib> #include <cctype> #include <sstream> #include <utility> #include <stdexcept>
#include <sys/socket.h> #include <sys/epoll.h> #include <netinet/in.h> #include <arpa/inet.h> #include <iostream> #include <signal.h>
using namespace std;
#pragma pack(1)
//管道消息结构 struct pipemsg { int op; int fd; unsigned int ip; unsigned short port; };
//地址端口结构 struct ipport { unsigned int ip; unsigned short port; bool operator < (const ipport rhs) const {return (ip < rhs.ip || (ip == rhs.ip && port < rhs.port));} bool operator == (const ipport rhs) const {return (ip == rhs.ip && port == rhs.port);} };
//对应于对方地址端口的连接信息 struct peerinfo { int fd; //对应连接句柄 unsigned int contime; //最后连接时间 unsigned int rcvtime; //收到数据时间 unsigned int rcvbyte; //收到字节个数 unsigned int sndtime; //发送数据时间 unsigned int sndbyte; //发送字节个数 };
//连接结构 struct conninfo { int rfd; //管道读端 int wfd; //管道写端 map<struct ipport, struct peerinfo> peer; //对方信息 };
#pragma pack()
//全局运行标志 bool g_bRun;
//全局连接信息 struct conninfo g_ConnInfo;
void setnonblocking(int sock) { int opts; opts = fcntl(sock,F_GETFL); if (opts < 0) { perror("fcntl(sock,GETFL)"); exit(1); } opts = opts|O_NONBLOCK; if (fcntl(sock, F_SETFL, opts) < 0) { perror("fcntl(sock,SETFL,opts)"); exit(1); } }
void setreuseaddr(int sock) { int opt; opt = 1; if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(&opt)) < 0) { perror("setsockopt"); exit(1); } }
static void sig_pro(int signum) { cout << "sig_pro, recv signal:" << signum << endl; if (signum == SIGQUIT) { g_bRun = false; } }
//接收连接线程 void * AcceptThread(void *arg) { cout << "AcceptThread, enter" << endl; int ret; //临时变量,存放返回值 int epfd; //监听用的epoll int listenfd; //监听socket int connfd; //接收到的连接socket临时变量 int i; //临时变量,轮询数组用 int nfds; //临时变量,有多少个socket有事件 struct epoll_event ev; //事件临时变量 const int MAXEVENTS = 1024; //最大事件数 struct epoll_event events[MAXEVENTS]; //监听事件数组 socklen_t clilen; //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件 struct sockaddr_in cliaddr; struct sockaddr_in svraddr; unsigned short uListenPort = 5000; int iBacklogSize = 5; int iBackStoreSize = 1024; struct pipemsg msg; //消息队列数据 //创建epoll,对2.6.8以后的版本,其参数无效,只要大于0的数值就行,内核自己动态分配 epfd = epoll_create(iBackStoreSize); if (epfd < 0) { cout << "AcceptThread, epoll_create fail:" << epfd << ",errno:" << errno << endl; return NULL; } //创建监听socket listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { cout << "AcceptThread, socket fail:" << epfd << ",errno:" << errno << endl; close(epfd); return NULL; } //把监听socket设置为非阻塞方式 setnonblocking(listenfd); //设置监听socket为端口重用 setreuseaddr(listenfd); //设置与要处理的事件相关的文件描述符 ev.data.fd = listenfd; //设置要处理的事件类型 ev.events = EPOLLIN|EPOLLET; //注册epoll事件 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); if (ret != 0) { cout << "AcceptThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl; close(listenfd); close(epfd); return NULL; } bzero(&svraddr, sizeof(svraddr)); svraddr.sin_family = AF_INET; svraddr.sin_addr.s_addr = htonl(INADDR_ANY); svraddr.sin_port=htons(uListenPort); bind(listenfd,(sockaddr *)&svraddr, sizeof(svraddr)); //监听,准备接收连接 ret = listen(listenfd, iBacklogSize); if (ret != 0) { cout << "AcceptThread, listen fail:" << ret << ",errno:" << errno << endl; close(listenfd); close(epfd); return NULL; } while (g_bRun) { //等待epoll事件的发生,如果当前有信号的句柄数大于输出事件数组的最大大小,超过部分会在下次epoll_wait时输出,事件不会丢 nfds = epoll_wait(epfd, events, MAXEVENTS, 500); //处理所发生的所有事件 for (i = 0; i < nfds && g_bRun; ++i) { if (events[i].data.fd == listenfd) //是本监听socket上的事件 { cout << "AcceptThread, events:" << events[i].events << ",errno:" << errno << endl; if (events[i].events&EPOLLIN) //有连接到来 { do { clilen = sizeof(struct sockaddr); connfd = accept(listenfd,(sockaddr *)&cliaddr, &clilen); if (connfd > 0) { cout << "AcceptThread, accept:" << connfd << ",errno:" << errno << ",connect:" << inet_ntoa(cliaddr.sin_addr) << ":" << ntohs(cliaddr.sin_port) << endl; //往管道写数据 msg.op = 1; msg.fd = connfd; msg.ip = cliaddr.sin_addr.s_addr; msg.port = cliaddr.sin_port; ret = write(g_ConnInfo.wfd, &msg, 14); if (ret != 14) { cout << "AcceptThread, write fail:" << ret << ",errno:" << errno << endl; close(connfd); } } else { cout << "AcceptThread, accept:" << connfd << ",errno:" << errno << endl;
if (errno == EAGAIN) //没有连接需要接收了 { break; } else if (errno == EINTR) //可能被中断信号打断,,经过验证对非阻塞socket并未收到此错误,应该可以省掉该步判断 { ; } else //其它情况可以认为该描述字出现错误,应该关闭后重新监听 {
//此时说明该描述字已经出错了,需要重新创建和监听 close(listenfd); epoll_ctl(epfd, EPOLL_CTL_DEL, listenfd, &ev); //创建监听socket listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { cout << "AcceptThread, socket fail:" << epfd << ",errno:" << errno << endl; close(epfd); return NULL; } //把监听socket设置为非阻塞方式 setnonblocking(listenfd); //设置监听socket为端口重用 setreuseaddr(listenfd); //设置与要处理的事件相关的文件描述符 ev.data.fd = listenfd; //设置要处理的事件类型 ev.events = EPOLLIN|EPOLLET; //注册epoll事件 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); if (ret != 0) { cout << "AcceptThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl; close(listenfd); close(epfd); return NULL; } bzero(&svraddr, sizeof(svraddr)); svraddr.sin_family = AF_INET; svraddr.sin_addr.s_addr = htonl(INADDR_ANY); svraddr.sin_port=htons(uListenPort); bind(listenfd,(sockaddr *)&svraddr, sizeof(svraddr)); //监听,准备接收连接 ret = listen(listenfd, iBacklogSize); if (ret != 0) { cout << "AcceptThread, listen fail:" << ret << ",errno:" << errno << endl; close(listenfd); close(epfd); return NULL; } } } } while (g_bRun); } else if (events[i].events&EPOLLERR || events[i].events&EPOLLHUP) //有异常发生 { //此时说明该描述字已经出错了,需要重新创建和监听 close(listenfd); epoll_ctl(epfd, EPOLL_CTL_DEL, listenfd, &ev); //创建监听socket listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { cout << "AcceptThread, socket fail:" << epfd << ",errno:" << errno << endl; close(epfd); return NULL; } //把监听socket设置为非阻塞方式 setnonblocking(listenfd); //设置监听socket为端口重用 setreuseaddr(listenfd); //设置与要处理的事件相关的文件描述符 ev.data.fd = listenfd; //设置要处理的事件类型 ev.events = EPOLLIN|EPOLLET; //注册epoll事件 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); if (ret != 0) { cout << "AcceptThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl; close(listenfd); close(epfd); return NULL; } bzero(&svraddr, sizeof(svraddr)); svraddr.sin_family = AF_INET; svraddr.sin_addr.s_addr = htonl(INADDR_ANY); svraddr.sin_port=htons(uListenPort); bind(listenfd,(sockaddr *)&svraddr, sizeof(svraddr)); //监听,准备接收连接 ret = listen(listenfd, iBacklogSize); if (ret != 0) { cout << "AcceptThread, listen fail:" << ret << ",errno:" << errno << endl; close(listenfd); close(epfd); return NULL; } } } } } //关闭监听描述字 if (listenfd > 0) { close(listenfd); } //关闭创建的epoll if (epfd > 0) { close(epfd); }
cout << "AcceptThread, exit" << endl;
return NULL; }
//读数据线程 void * ReadThread(void *arg) { cout << "ReadThread, enter" << endl; int ret; //临时变量,存放返回值 int epfd; //连接用的epoll int i; //临时变量,轮询数组用 int nfds; //临时变量,有多少个socket有事件 struct epoll_event ev; //事件临时变量 const int MAXEVENTS = 1024; //最大事件数 struct epoll_event events[MAXEVENTS]; //监听事件数组
int iBackStoreSize = 1024; const int MAXBUFSIZE = 8192; //读数据缓冲区大小 char buf[MAXBUFSIZE]; int nread; //读到的字节数 struct ipport tIpPort; //地址端口信息 struct peerinfo tPeerInfo; //对方连接信息 map<int, struct ipport> mIpPort; //socket对应的对方地址端口信息 map<int, struct ipport>::iterator itIpPort; //临时迭代子 map<struct ipport, struct peerinfo>::iterator itPeerInfo; //临时迭代子 struct pipemsg msg; //消息队列数据
//创建epoll,对2.6.8以后的版本,其参数无效,只要大于0的数值就行,内核自己动态分配 epfd = epoll_create(iBackStoreSize); if (epfd < 0) { cout << "ReadThread, epoll_create fail:" << epfd << ",errno:" << errno << endl; return NULL; }
while (g_bRun) { //从管道读数据 do { ret = read(g_ConnInfo.rfd, &msg, 14); if (ret > 0) { //队列中的fd必须是有效的 if (ret == 14 && msg.fd > 0) { if (msg.op == 1) //收到新的连接 { cout << "ReadThread, recv connect:" << msg.fd << ",errno:" << errno << endl; //把socket设置为非阻塞方式 setnonblocking(msg.fd); //设置描述符信息和数组下标信息 ev.data.fd = msg.fd; //设置用于注测的读操作事件 ev.events = EPOLLIN|EPOLLET; //注册ev ret = epoll_ctl(epfd, EPOLL_CTL_ADD, msg.fd, &ev); if (ret != 0) { cout << "ReadThread, epoll_ctl fail:" << ret << ",errno:" << errno << endl; close(msg.fd); } else { mIpPort[msg.fd] = tIpPort; tPeerInfo.fd = msg.fd; tPeerInfo.contime = time(NULL); tPeerInfo.rcvtime = 0; tPeerInfo.rcvbyte = 0; tPeerInfo.sndtime = 0; tPeerInfo.sndbyte = 0; g_ConnInfo.peer[tIpPort] = tPeerInfo; } } else if (msg.op == 2) //断开某个连接 { cout << "ReadThread, recv close:" << msg.fd << ",errno:" << errno << endl; close(msg.fd); epoll_ctl(epfd, EPOLL_CTL_DEL, msg.fd, &ev); itIpPort = mIpPort.find(msg.fd); if (itIpPort != mIpPort.end()) { mIpPort.erase(itIpPort); itPeerInfo = g_ConnInfo.peer.find(itIpPort->second); if (itPeerInfo != g_ConnInfo.peer.end()) { g_ConnInfo.peer.erase(itPeerInfo); } } } } } else { break; } } while(g_bRun); //等待epoll事件的发生,如果当前有信号的句柄数大于输出事件数组的最大大小,超过部分会在下次epoll_wait时输出,事件不会丢 nfds = epoll_wait(epfd, events, MAXEVENTS, 500); //处理所发生的所有事件 for (i = 0; i < nfds && g_bRun; ++i) { cout << "ReadThread, events:" << events[i].events << ",errno:" << errno << endl; if (events[i].events&EPOLLIN) //有数据可读 { do { bzero(buf, MAXBUFSIZE); nread = read(events[i].data.fd, buf, MAXBUFSIZE); if (nread > 0) //读到数据 { cout << "ReadThread, read:" << nread << ",errno:" << errno << endl; itIpPort = mIpPort.find(events[i].data.fd); if (itIpPort != mIpPort.end()) { itPeerInfo = g_ConnInfo.peer.find(itIpPort->second); if (itPeerInfo != g_ConnInfo.peer.end()) { itPeerInfo->second.rcvtime = time(NULL); itPeerInfo->second.rcvbyte += nread; } } } else if (nread < 0) //读取失败 { if (errno == EAGAIN) //没有数据了 { cout << "ReadThread, read:" << nread << ",errno:" << errno << ",no data" << endl; break; } else if(errno == EINTR) //可能被内部中断信号打断,经过验证对非阻塞socket并未收到此错误,应该可以省掉该步判断 { cout << "ReadThread, read:" << nread << ",errno:" << errno << ",interrupt" << endl; } else //客户端主动关闭 { cout << "ReadThread, read:" << nread << ",errno:" << errno << ",peer error" << endl; close(events[i].data.fd); epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev); itIpPort = mIpPort.find(events[i].data.fd); if (itIpPort != mIpPort.end()) { mIpPort.erase(itIpPort); itPeerInfo = g_ConnInfo.peer.find(itIpPort->second); if (itPeerInfo != g_ConnInfo.peer.end()) { g_ConnInfo.peer.erase(itPeerInfo); } } break; } } else if (nread == 0) //客户端主动关闭 { cout << "ReadThread, read:" << nread << ",errno:" << errno << ",peer close" << endl; close(events[i].data.fd); epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev); itIpPort = mIpPort.find(events[i].data.fd); if (itIpPort != mIpPort.end()) { mIpPort.erase(itIpPort); itPeerInfo = g_ConnInfo.peer.find(itIpPort->second); if (itPeerInfo != g_ConnInfo.peer.end()) { g_ConnInfo.peer.erase(itPeerInfo); } }
break; } } while (g_bRun); } else if (events[i].events&EPOLLERR || events[i].events&EPOLLHUP) //有异常发生 { cout << "ReadThread, read:" << nread << ",errno:" << errno << ",err or hup" << endl;
close(events[i].data.fd); epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev); itIpPort = mIpPort.find(events[i].data.fd); if (itIpPort != mIpPort.end()) { mIpPort.erase(itIpPort); itPeerInfo = g_ConnInfo.peer.find(itIpPort->second); if (itPeerInfo != g_ConnInfo.peer.end()) { g_ConnInfo.peer.erase(itPeerInfo); } } } } } //关闭所有连接 for (itIpPort = mIpPort.begin(); itIpPort != mIpPort.end(); itIpPort++) { if (itIpPort->first > 0) { close(itIpPort->first); } } //关闭创建的epoll if (epfd > 0) { close(epfd); } cout << "ReadThread, exit" << endl;
return NULL; }
int main(int argc, char* argv[]) { int ret; int fd[2]; //读写管道 pthread_t iAcceptThreadId; //接收连接线程ID pthread_t iReadThreadId; //读数据线程ID //为让应用程序不必对慢速系统调用的errno做EINTR检查,可以采取两种方式:1.屏蔽中断信号,2.处理中断信号 //1.由signal()函数安装的信号处理程序,系统默认会自动重启动被中断的系统调用,而不是让它出错返回, // 所以应用程序不必对慢速系统调用的errno做EINTR检查,这就是自动重启动机制. //2.对sigaction()的默认动作是不自动重启动被中断的系统调用, // 因此如果我们在使用sigaction()时需要自动重启动被中断的系统调用,就需要使用sigaction的SA_RESTART选项
//忽略信号 //sigset_t newmask; //sigemptyset(&newmask); //sigaddset(&newmask, SIGINT); //sigaddset(&newmask, SIGUSR1); //sigaddset(&newmask, SIGUSR2); //sigaddset(&newmask, SIGQUIT); //pthread_sigmask(SIG_BLOCK, &newmask, NULL); //处理信号 //默认自动重启动被中断的系统调用,而不是让它出错返回,应用程序不必对慢速系统调用的errno做EINTR检查 //signal(SIGINT, sig_pro); //signal(SIGUSR1, sig_pro); //signal(SIGUSR2, sig_pro); //signal(SIGQUIT, sig_pro);
struct sigaction sa; sa.sa_flags = SA_RESTART; sa.sa_handler = sig_pro; sigaction(SIGINT, &sa, NULL); sigaction(SIGUSR1, &sa, NULL); sigaction(SIGUSR2, &sa, NULL); sigaction(SIGQUIT, &sa, NULL); //设置为运行状态 g_bRun = true; //创建管道 ret = pipe(fd); if (ret < 0) { cout << "main, pipe fail:" << ret << ",errno:" << errno << endl; g_bRun = false; return 0; } g_ConnInfo.rfd = fd[0]; g_ConnInfo.wfd = fd[1]; //读端设置为非阻塞方式 setnonblocking(g_ConnInfo.rfd);
//创建线程时采用的参数 pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); //设置绑定的线程,以获取较高的响应速度 //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //设置分离的线程 //创建接收连接线程 ret = pthread_create(&iAcceptThreadId, &attr, AcceptThread, NULL); if( ret != 0) { cout << "main, pthread_create AcceptThread fail:" << ret << ",errno:" << errno << endl; g_bRun = false; close(g_ConnInfo.rfd); close(g_ConnInfo.wfd); return 0; } //创建接收连接线程 ret = pthread_create(&iReadThreadId, &attr, ReadThread, NULL); if( ret != 0) { cout << "main, pthread_create ReadThread fail:" << ret << ",errno:" << errno << endl; g_bRun = false; pthread_join(iAcceptThreadId, NULL); close(g_ConnInfo.rfd); close(g_ConnInfo.wfd); return 0; } //主循环什么事情也不做 while (g_bRun) { sleep(1); } //等待子线程终止 pthread_join(iAcceptThreadId, NULL); pthread_join(iReadThreadId, NULL); close(g_ConnInfo.rfd); close(g_ConnInfo.wfd);
return 0; }
在一个非阻塞的socket上调用read/write函数, 返回EAGAIN或者EWOULDBLOCK(注: EAGAIN就是EWOULDBLOCK)
从字面上看, 意思是:
* EAGAIN: 再试一次
* EWOULDBLOCK: 如果这是一个阻塞socket, 操作将被block
* perror输出: Resource temporarily unavailable
总结:
这个错误表示资源暂时不够, 可能read时, 读缓冲区没有数据, 或者, write时,
写缓冲区满了.
遇到这种情况, 如果是阻塞socket, read/write就要阻塞掉.
而如果是非阻塞socket, read/write立即返回-1, 同 时errno设置为EAGAIN.
所以, 对于阻塞socket, read/write返回-1代表网络出错了.
但对于非阻塞socket, read/write返回-1不一定网络真的出错了.
可能是Resource temporarily unavailable. 这时你应该再试, 直到Resource available.
综上, 对于non-blocking的socket, 正确的读写操作为:
读: 忽略掉errno = EAGAIN的错误, 下次继续读
写: 忽略掉errno = EAGAIN的错误, 下次继续写
对于select和epoll的LT模式, 这种读写方式是没有问题的. 但对于epoll的ET模式, 这种方式还有漏洞.
epoll的两种模式 LT 和 ET
二者的差异在于 level-trigger 模式下只要某个 socket 处于 readable/writable 状态,无论什么时候
进行 epoll_wait 都会返回该 socket;而 edge-trigger 模式下只有某个 socket 从 unreadable 变为 readable 或从
unwritable 变为 writable 时,epoll_wait 才会返回该 socket。如下两个示意图:
从socket读数据:
往socket写数据
所以, 在epoll的ET模式下, 正确的读写方式为:
读: 只要可读, 就一直读, 直到返回0, 或者 errno = EAGAIN
写: 只要可写, 就一直写, 直到数据发送完, 或者 errno = EAGAIN
正确的读:
- n = 0;
- while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) {
- n += nread;
- }
- if (nread == -1 && errno != EAGAIN) {
- perror("read error");
- }
正确的写:
- int nwrite, data_size = strlen(buf);
- n = data_size;
- while (n > 0) {
- nwrite = write(fd, buf + data_size - n, n);
- if (nwrite < n) {
- if (nwrite == -1 && errno != EAGAIN) {
- perror("write error");
- }
- break;
- }
- n -= nwrite;
- }
正确的accept,accept 要考虑 2 个问题
(1) 阻塞模式 accept 存在的问题
考虑这种情况: TCP 连接被客户端夭折,即在服务器调用 accept 之前,客户端主动发送 RST 终止
连接,导致刚刚建立的连接从就绪队列中移出,如果套接口被设置成阻塞模式,服务器就会一直阻塞
在 accept 调用上,直到其他某个客户建立一个新的连接为止。但是在此期间,服务器单纯地阻塞在
accept 调用上,就绪队列中的其他描述符都得不到处理.
解决办法是把监听套接口设置为非阻塞,当客户在服务器调用 accept 之前中止某个连接时,accept 调用
可以立即返回 -1, 这时源自 Berkeley 的实现会在内核中处理该事件,并不会将该事件通知给 epool,
而其他实现把 errno 设置为 ECONNABORTED 或者 EPROTO 错误,我们应该忽略这两个错误。
(2) ET 模式下 accept 存在的问题
考虑这种情况:多个连接同时到达,服务器的 TCP 就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,
epoll 只会通知一次,accept 只处理一个连接,导致 TCP 就绪队列中剩下的连接都得不到处理。
解决办法是用 while 循环抱住 accept 调用,处理完 TCP 就绪队列中的所有连接后再退出循环。如何知道
是否处理完就绪队列中的所有连接呢? accept 返回 -1 并且 errno 设置为 EAGAIN 就表示所有连接都处理完。
综合以上两种情况,服务器应该使用非阻塞地 accept, accept 在 ET 模式下 的正确使用方式为:
- while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote,
- (size_t *)&addrlen)) > 0) {
- handle_client(conn_sock);
- }
- if (conn_sock == -1) {
- if (errno != EAGAIN && errno != ECONNABORTED
- && errno != EPROTO && errno != EINTR)
- perror("accept");
- }
一道腾讯后台开发的面试题
使用Linux epoll模型,水平触发模式;当socket可写时,会不停的触发 socket 可写的事件,如何处理?
第一种最普遍的方式:
需要向 socket 写数据的时候才把 socket 加入 epoll ,等待可写事件。
接受到可写事件后,调用 write 或者 send 发送数据。。。
当所有数据都写完后,把 socket 移出 epoll。
这种方式的缺点是,即使发送很少的数据,也要把 socket 加入 epoll,写完后在移出 epoll,有一定操作代价。
一种改进的方式:
开始不把 socket 加入 epoll,需要向 socket 写数据的时候,直接调用 write 或者 send 发送数据。
如果返回 EAGAIN,把 socket 加入 epoll,在 epoll 的驱动下写数据,全部数据发送完毕后,再出 epoll。
这种方式的优点是:数据不多的时候可以避免 epoll 的事件处理,提高效率。
最后贴一个使用epoll, ET模式的简单HTTP服务器代码:
#include <sys/socket.h> #include <sys/wait.h> #include <netinet/in.h> #include <netinet/tcp.h> #include <sys/epoll.h> #include <sys/sendfile.h> #include <sys/stat.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <strings.h> #include <fcntl.h> #include <errno.h>
#define MAX_EVENTS 10 #define PORT 8080
//设置socket连接为非阻塞模式 void setnonblocking(int sockfd) { int opts;
opts = fcntl(sockfd, F_GETFL); if(opts < 0) { perror("fcntl(F_GETFL)\n"); exit(1); } opts = (opts | O_NONBLOCK); if(fcntl(sockfd, F_SETFL, opts) < 0) { perror("fcntl(F_SETFL)\n"); exit(1); } }
int main(){ struct epoll_event ev, events[MAX_EVENTS]; int addrlen, listenfd, conn_sock, nfds, epfd, fd, i, nread, n; struct sockaddr_in local, remote; char buf[BUFSIZ];
//创建listen socket if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("sockfd\n"); exit(1); } setnonblocking(listenfd); bzero(&local, sizeof(local)); local.sin_family = AF_INET; local.sin_addr.s_addr = htonl(INADDR_ANY);; local.sin_port = htons(PORT); if( bind(listenfd, (struct sockaddr *) &local, sizeof(local)) < 0) { perror("bind\n"); exit(1); } listen(listenfd, 20);
epfd = epoll_create(MAX_EVENTS); if (epfd == -1) { perror("epoll_create"); exit(EXIT_FAILURE); }
ev.events = EPOLLIN; ev.data.fd = listenfd; if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) == -1) { perror("epoll_ctl: listen_sock"); exit(EXIT_FAILURE); }
for (;;) { nfds = epoll_wait(epfd, events, MAX_EVENTS, -1); if (nfds == -1) { perror("epoll_pwait"); exit(EXIT_FAILURE); }
for (i = 0; i < nfds; ++i) { fd = events[i].data.fd; if (fd == listenfd) { while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, (size_t *)&addrlen)) > 0) { setnonblocking(conn_sock); ev.events = EPOLLIN | EPOLLET; ev.data.fd = conn_sock; if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock, &ev) == -1) { perror("epoll_ctl: add"); exit(EXIT_FAILURE); } } if (conn_sock == -1) { if (errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR) perror("accept"); } continue; } if (events[i].events & EPOLLIN) { n = 0; while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) { n += nread; } if (nread == -1 && errno != EAGAIN) { perror("read error"); } ev.data.fd = fd; ev.events = events[i].events | EPOLLOUT; if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) == -1) { perror("epoll_ctl: mod"); } } if (events[i].events & EPOLLOUT) { sprintf(buf, "HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\nHello World", 11); int nwrite, data_size = strlen(buf); n = data_size; while (n > 0) { nwrite = write(fd, buf + data_size - n, n); if (nwrite < n) { if (nwrite == -1 && errno != EAGAIN) { perror("write error"); } break; } n -= nwrite; } close(fd); } } }
return 0; }
|