doing5552

记录每日点滴,不枉人生一世

  C++博客 :: 首页 :: 联系 :: 聚合  :: 管理
  73 Posts :: 0 Stories :: 94 Comments :: 0 Trackbacks

公告

常用链接

留言簿(24)

我参与的团队

最新随笔

搜索

  •  

积分与排名

  • 积分 - 454136
  • 排名 - 47

最新随笔

最新评论

阅读排行榜

评论排行榜

从头编写高性能服务程序1-单进程阻塞

标题有些大.其实自己也是在不断的摸索
觉得BLOG的作用应该和日记一样
把自己获取到的东西记录下来
到时候回头看能找到自己跌倒过的地方避免再次跌倒
同时别人看的时候也能避开这些地方
所以在自己开始写服务程序一段时间之后
把原来的代码整理一下一次次的发上来
从零开始完全手写一个服务,经过慢慢的演化
期间的收获还是很多的

最开始,
程序是从最简单的socket连接开始
只有一个进程,创建socket之后开始listen
然后accept连上来的链接
整个程序是阻塞的.当一个链接上来之后
其他链接是无法连入的
之后当程序处理完并close之后,后面的才能被处理
这个例子是不停的接受客户端发过来的字符并且显示在服务端的console
然后客户端也不会被close.
只是演示概念
外面一个循环是不停的去accept新的连接
里面一个是循环处理已经accept上来链接的数据
由于没有close,所以外面一个循环是没意义的.呵呵
只是大家知道概念就OK了.

  1. #include<sys/socket.h>
  2. #include<netinet/in.h>
  3. #include<stdio.h>
  4. #include<stdlib.h>
  5. #include<string.h>
  6. int main(){
  7.     int listen_fd,accept_fd,flag;
  8.     struct sockaddr_in my_addr,remote_addr;
  9.     if ( (listen_fd = socket( AF_INET,SOCK_STREAM,0 )) == -1 ){
  10.         perror("create socket error");
  11.         exit(1);
  12.     }
  13.     if ( setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&flag,sizeof(flag)) == -1 ){
  14.         perror("setsockopt error");
  15.     }
  16.     my_addr.sin_family = AF_INET;
  17.     my_addr.sin_port = htons(3389);
  18.     my_addr.sin_addr.s_addr = INADDR_ANY;
  19.     bzero(&(my_addr.sin_zero),8);
  20.     if ( bind( listen_fd, (struct sockaddr *)&my_addr,
  21.                 sizeof(struct sockaddr_in)) == -1 ) {
  22.         perror("bind error");
  23.         exit(1);
  24.     }
  25.     if ( listen( listen_fd,1 ) == -1 ){
  26.         perror("listen error");
  27.         exit(1);
  28.     }
  29.     for(;;){
  30.         int addr_len = sizeof( struct sockaddr_in );
  31.         accept_fd = accept( listen_fd,
  32.                                 (struct sockaddr *)&remote_addr,&addr_len );
  33.         for(;;){
  34.             char in_buf[1024];
  35.             memset(in_buf, 0, 1024);
  36.             recv( accept_fd ,&in_buf ,1024 ,0 );
  37.             printf( "accept:%s\n", in_buf );
  38.         }
  39.     }
  40.     return 0;
  41. }

从头编写高性能服务程序2-双进程阻塞

上一个模型的缺点就是
当链接数据没被处理完时,
整个服务是无法处理新链接的
这种比较适合单词处理速度很快
而且并发量=0的服务
也就是服务端和客户端1:1的样子
我丢给你请求,你给我数据,我再给你请求

这对于一个比较高性能的服务肯定是不可接受的
所以我们产生了一个想法
用父子进程,父进程管listen
有链接进来就fork一个子进程
子进程accept之后去处理.处理完之后就退出
注意,下面这段代码还是没close
所以上面所说的只能是做了一半
大家理解概念就好

下载: code.txt
  1. #include<sys/socket.h>
  2. #include<netinet/in.h>
  3. #include<stdio.h>
  4. #include<stdlib.h>
  5. #include<string.h>
  6. int main(){
  7.     int listen_fd,accept_fd,flag;
  8.     struct sockaddr_in my_addr,remote_addr;
  9.     if ( (listen_fd = socket( AF_INET,SOCK_STREAM,0 )) == -1 ){
  10.         perror("create socket error");
  11.         exit(1);
  12.     }
  13.     if ( setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&flag,sizeof(flag)) == -1 ){
  14.         perror("setsockopt error");
  15.     }
  16.     my_addr.sin_family = AF_INET;
  17.     my_addr.sin_port = htons(3389);
  18.     my_addr.sin_addr.s_addr = INADDR_ANY;
  19.     bzero(&(my_addr.sin_zero),8);
  20.     if ( bind( listen_fd, (struct sockaddr *)&my_addr,
  21.                 sizeof(struct sockaddr_in)) == -1 ) {
  22.         perror("bind error");
  23.         exit(1);
  24.     }
  25.     if ( listen( listen_fd,1 ) == -1 ){
  26.         perror("listen error");
  27.         exit(1);
  28.     }
  29.     int pid;
  30.     pid=fork();
  31.     for(;;){
  32.         if( pid==0 ){
  33.             int addr_len = sizeof( struct sockaddr_in );
  34.             accept_fd = accept( listen_fd,
  35.                     (struct sockaddr *)&remote_addr,&addr_len );
  36.             for(;;){
  37.                 char in_buf[1024];
  38.                 memset(in_buf, 0, 1024);
  39.                 recv( accept_fd ,&in_buf ,1024 ,0 );
  40.                 printf( "accept:%s\n", in_buf );
  41.             }
  42.         }
  43.         else{
  44.             //do nothing
  45.         }
  46.     }
  47.     return 0;
  48. }

fork产生子进程之后返回值
如果是0,那么这个进程就是子进程
如果-1,就是错误
否则就是产生子进程的pid,可以用这个pid来控制子进程,比如kill掉

大家实际用起来会发现和单进程阻塞一样
只能处理一个客户端连接.根本没好处么
呵呵.其实只是演示一下fork
然后好处还是有的.如果子进程因为处理request而挂了
主进程可以控制,然后再产生一个
所以这种样子比单进程要可靠一点
一个干活.一个监控

那么下一步就是对应每个链接产生进程


从头编写高性能服务程序3-多进程阻塞

多fork几次
那fork到底放在哪里呢?
因为阻塞的特性会中断进程的运行知道阻塞情况接触
所以我们把fork放在accept之后
这样有实际链接之后才会产生子进程
现在这个程序在多个客户端连上来之后都能有反应了

下载: code.txt
  1. #include<sys/socket.h>
  2. #include<netinet/in.h>
  3. #include<stdio.h>
  4. #include<stdlib.h>
  5. #include<string.h>
  6. int main(){
  7.     int listen_fd,accept_fd,flag;
  8.     struct sockaddr_in my_addr,remote_addr;
  9.     if ( (listen_fd = socket( AF_INET,SOCK_STREAM,0 )) == -1 ){
  10.         perror("create socket error");
  11.         exit(1);
  12.     }
  13.     if ( setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&flag,sizeof(flag)) == -1 ){
  14.         perror("setsockopt error");
  15.     }
  16.     my_addr.sin_family = AF_INET;
  17.     my_addr.sin_port = htons(3389);
  18.     my_addr.sin_addr.s_addr = INADDR_ANY;
  19.     bzero(&(my_addr.sin_zero),8);
  20.     if ( bind( listen_fd, (struct sockaddr *)&my_addr,
  21.                 sizeof(struct sockaddr_in)) == -1 ) {
  22.         perror("bind error");
  23.         exit(1);
  24.     }
  25.     if ( listen( listen_fd,1 ) == -1 ){
  26.         perror("listen error");
  27.         exit(1);
  28.     }
  29.     int pid;
  30.     int addr_len = sizeof( struct sockaddr_in );
  31.     for(;;){
  32.         accept_fd = accept( listen_fd,
  33.                     (struct sockaddr *)&remote_addr,&addr_len );
  34.         pid=fork();
  35.         if( pid==0 ){
  36.             for(;;){
  37.                 char in_buf[1024];
  38.                 memset(in_buf, 0, 1024);
  39.                 recv( accept_fd ,&in_buf ,1024 ,0 );
  40.                 printf( "accept:%s\n", in_buf );
  41.             }
  42.         }
  43.         else{
  44.             //manager the process
  45.         }
  46.     }
  47.     return 0;
  48. }

从头编写高性能服务程序4-单进程非阻塞select

前面都是阻塞的
当我们在accept或者recv的时候
文件描述符如果没有就绪
整个进程就会被挂起
浪费啊…如果只有就绪的时候才去处理,然后调用之后立刻返回
我们就可以继续做下面的工作了
所以我们引入非阻塞概念
记住,试用setsockopt是无法设置socket接口非阻塞的
只能用fd专用的fcntl去设置fd的非阻塞性质
需要include头文件fcntl.h

还有一点,select所监听的fd_set
在每次select之后就会被清掉
所以一开始我发现每次accept一个数据之后就再也获取不到数据了
然后才发现这点,而这个在我找的资料里面都没提到
所以要把FD_SET放在每次循环里面
我想这也是select性能比较低的原因之一

下载: code.txt
  1. #include <sys/socket.h>
  2. #include <netinet/in.h>
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include <string.h>
  6. #include <fcntl.h>
  7. int main(){
  8.     int listen_fd,accept_fd,flag;
  9.     struct sockaddr_in my_addr,remote_addr;
  10.     if ( (listen_fd = socket( AF_INET,SOCK_STREAM,0 )) == -1 ){
  11.         perror("create socket error");
  12.         exit(1);
  13.     }
  14.     if ( setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&flag,sizeof(flag)) == -1 ){
  15.         perror("setsockopt error");
  16.     }
  17.     int flags = fcntl(listen_fd, F_GETFL, 0);
  18.     fcntl(listen_fd, F_SETFL, flags|O_NONBLOCK);
  19.     my_addr.sin_family = AF_INET;
  20.     my_addr.sin_port = htons(3389);
  21.     my_addr.sin_addr.s_addr = INADDR_ANY;
  22.     bzero(&(my_addr.sin_zero),8);
  23.     if ( bind( listen_fd, (struct sockaddr *)&my_addr,
  24.                 sizeof(struct sockaddr_in)) == -1 ) {
  25.         perror("bind error");
  26.         exit(1);
  27.     }
  28.     if ( listen( listen_fd,1 ) == -1 ){
  29.         perror("listen error");
  30.         exit(1);
  31.     }
  32.     fd_set fd_sets;
  33.  
  34.     int max_fd = listen_fd ;
  35.     int events=0;
  36.     int accept_fds[1024];
  37.     int i = 0;
  38.  
  39.     for(;;){
  40.         FD_ZERO( &fd_sets );
  41.         FD_SET(listen_fd,&fd_sets);
  42.         int k=0;
  43.         for(k=0; k<=i; k++){
  44.             FD_SET(accept_fds[k],&fd_sets);
  45.         }
  46.         events = select( max_fd + 1, &fd_sets, NULL, NULL, NULL );
  47.         if( events ){
  48.                 printf("events:%d\n",events);
  49.                 if( FD_ISSET(listen_fd,&fd_sets) ){
  50.                         printf("listen event :1\n");
  51.                         int addr_len = sizeof( struct sockaddr_in );
  52.                         accept_fd = accept( listen_fd,
  53.                                                 (struct sockaddr *)&remote_addr,&addr_len );
  54.                         int flags = fcntl(accept_fd, F_GETFL, 0);
  55.                         fcntl(accept_fd, F_SETFL, flags|O_NONBLOCK);
  56.                         accept_fds[i] = accept_fd;
  57.                         i++;
  58.                         max_fd = accept_fd ;
  59.                         printf("new accept fd:%d\n",accept_fd);
  60.                 }
  61.                 int j=0;
  62.                 for( j=0; j<=i; j++ ){
  63.                     if( FD_ISSET(accept_fds[j],&fd_sets) ){
  64.                         printf("accept event :%d\n",j);
  65.                         char in_buf[1024];
  66.                         memset(in_buf, 0, 1024);
  67.                         recv( accept_fds[j] ,&in_buf ,1024 ,0 );
  68.                         printf( "accept:%s\n", in_buf );
  69.                     }
  70.                 }
  71.         }
  72.     }
  73.     return 0;
  74. }

从头编写高性能服务程序5-单进程非阻塞epoll

 

select已经是很古老的非阻塞复用I/O方式了
Linux下最常用的IO复用是epoll了
所以我们用单进程来实验epoll
epoll_wait和select一样
都是用一个fd的阻塞来代替一堆fd

下面程序有一个问题
由于使用了ET模式
所以当listen_fd有客户端连接上来时
只会被通知一次
如果是并发连接,如果只accept了一次
则第二个链接会被忽略
也就是真正高并发情况下.会出现丢客户端请求的情况

由于使用了复用IO
所以即使是单进程也能处理多客户端连接了

下载: code.txt
  1. #include<sys/socket.h>
  2. #include<netinet/in.h>
  3. #include<stdio.h>
  4. #include<stdlib.h>
  5. #include<string.h>
  6. #include <fcntl.h>
  7. #include<sys/epoll.h>
  8. int main(){
  9.     int listen_fd,accept_fd,flag;
  10.     struct sockaddr_in my_addr,remote_addr;
  11.     if ( (listen_fd = socket( AF_INET,SOCK_STREAM,0 )) == -1 ){
  12.         perror("create socket error");
  13.         exit(1);
  14.     }
  15.     if ( setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&flag,sizeof(flag)) == -1 ){
  16.         perror("setsockopt error");
  17.     }
  18.     int flags = fcntl(listen_fd, F_GETFL, 0);
  19.     fcntl(listen_fd, F_SETFL, flags|O_NONBLOCK);
  20.     my_addr.sin_family = AF_INET;
  21.     my_addr.sin_port = htons(3389);
  22.     my_addr.sin_addr.s_addr = INADDR_ANY;
  23.     bzero(&(my_addr.sin_zero),8);
  24.     if ( bind( listen_fd, (struct sockaddr *)&my_addr,
  25.                 sizeof(struct sockaddr_in)) == -1 ) {
  26.         perror("bind error");
  27.         exit(1);
  28.     }
  29.     if ( listen( listen_fd,1 ) == -1 ){
  30.         perror("listen error");
  31.         exit(1);
  32.     }
  33.     struct epoll_event ev,events[20];
  34.     int epfd = epoll_create(256);
  35.     int ev_s=0;
  36.     ev.data.fd=listen_fd;
  37.     ev.events=EPOLLIN|EPOLLET;
  38.     epoll_ctl(epfd,EPOLL_CTL_ADD,listen_fd,&ev);
  39.     for(;;){
  40.         ev_s = epoll_wait( epfd,events,20,500 );
  41.         int i=0;
  42.         for(i=0; i<ev_s; i++){
  43.             if(events[i].data.fd==listen_fd){
  44.                 printf("listen event :1\n");
  45.                 int addr_len = sizeof( struct sockaddr_in );
  46.                 accept_fd = accept( listen_fd,
  47.                                         (struct sockaddr *)&remote_addr,&addr_len );
  48.                 int flags = fcntl(accept_fd, F_GETFL, 0);
  49.                 fcntl(accept_fd, F_SETFL, flags|O_NONBLOCK);
  50.                 ev.data.fd=accept_fd;
  51.                 ev.events=EPOLLIN|EPOLLET;
  52.                 epoll_ctl(epfd,EPOLL_CTL_ADD,accept_fd,&ev);
  53.                 printf("new accept fd:%d\n",accept_fd);
  54.             }
  55.             else if(events[i].events&EPOLLIN){
  56.                 printf("accept event :%d\n",i);
  57.                 char in_buf[1024];
  58.                 memset(in_buf, 0, 1024);
  59.                 recv( events[i].data.fd ,&in_buf ,1024 ,0 );
  60.                 printf( "accept:%s\n", in_buf );
  61.             }
  62.         }
  63.    }
  64.     return 0;
  65. }

从头编写高性能服务程序6-多进程阻塞完整版

前面所有的程序都是缺少链接的后期管理的
我们在多进程的情况下测试子进程处理完后自行退出
而主进程继续监听的情况
一开始我认为只要子进程处理完之后exit就可以了
结果碰到了问题
子进程在主进程未退出的情况下先行退出
会变成僵尸进程
反之则会变成孤儿进程

僵尸进程的表现是子进程在ps下后面多个

defunct

解决方法是在主进程这里需要wait
用来处理子进程的各种状态

还有我们加上了客户端退出时候的处理
当客户端退出时
我们recv到的数据返回值会是0
而之前会被阻塞在那边
所以加上对recv的返回值0的处理就可以了

下载: code.txt
  1. #include<sys/socket.h>
  2. #include<netinet/in.h>
  3. #include<stdio.h>
  4. #include<stdlib.h>
  5. #include<string.h>
  6. int main(){
  7.     int listen_fd,accept_fd,flag;
  8.     struct sockaddr_in my_addr,remote_addr;
  9.     if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1){
  10.         perror("create socket error");
  11.         exit(1);
  12.     }
  13.     if (setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR
  14.                     ,(char *)&flag,sizeof(flag)) == -1){
  15.         perror("setsockopt error");
  16.     }
  17.     my_addr.sin_family = AF_INET;
  18.     my_addr.sin_port = htons(3389);
  19.     my_addr.sin_addr.s_addr = INADDR_ANY;
  20.     bzero(&(my_addr.sin_zero), 8);
  21.     if (bind(listen_fd, (struct sockaddr *)&my_addr,
  22.                 sizeof(struct sockaddr_in)) == -1) {
  23.         perror("bind error");
  24.         exit(1);
  25.     }
  26.     if (listen(listen_fd,1) == -1){
  27.         perror("listen error");
  28.         exit(1);
  29.     }
  30.     int pid=-1;
  31.     int addr_len = sizeof(struct sockaddr_in);
  32.     int max_process_num = 10;
  33.     int i;
  34.     int child_process_status;
  35.     for(i = 0; i< max_process_num; i++){
  36.         if(pid != 0){
  37.             pid = fork();
  38.         }
  39.     }
  40.     if(pid == 0){
  41.         for(;;){
  42.             accept_fd = accept(listen_fd
  43.                                 ,(struct sockaddr *)&remote_addr,&addr_len);
  44.             int recv_num;
  45.             for(;;){
  46.                 char in_buf[1024];
  47.                 memset(in_buf, 0, 1024);
  48.                 recv_num = recv(accept_fd ,&in_buf ,1024 ,0);
  49.                 if(recv_num == 0){
  50.                     printf("close socket\n");
  51.                     close(accept_fd);
  52.                     break;
  53.                 }
  54.                 else{
  55.                     printf("accept:%d:%s\n", recv_num, in_buf);
  56.                 }
  57.             }
  58.         }
  59.     }
  60.     else{
  61.         //manager the process
  62.         wait(&child_process_status);
  63.     }
  64.    
  65.     return 0;
  66. }

从头编写高性能服务程序7-多进程非阻塞epoll版

我们有了前面的所有经验
多进程,非阻塞,IO复用
然后我们要把这些特定揉合起来了
先是在accept上阻塞
然后当有新链接过来就会fork
当然这样一个结构并不比多进程阻塞版本好太多
因为每个进程虽然使用epoll来处理
但是一个进程内只有一个accept_fd在被处理
所以效果是一样的
在下一个版本中
我们将在一个进程使用epoll处理多个accept

下载: code.txt
  1. #include <sys/socket.h>
  2. #include <netinet/in.h>
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include <string.h>
  6. #include <fcntl.h>
  7. #include <sys/epoll.h>
  8. int main(){
  9.     int listen_fd,accept_fd,flag;
  10.     struct sockaddr_in my_addr,remote_addr;
  11.     if ( (listen_fd = socket( AF_INET,SOCK_STREAM,0 )) == -1 ){
  12.         perror("create socket error");
  13.         exit(1);
  14.     }
  15.     if ( setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&flag,sizeof(flag)) == -1 ){
  16.         perror("setsockopt error");
  17.     }
  18.     my_addr.sin_family = AF_INET;
  19.     my_addr.sin_port = htons(3389);
  20.     my_addr.sin_addr.s_addr = INADDR_ANY;
  21.     bzero(&(my_addr.sin_zero),8);
  22.     if ( bind( listen_fd, (struct sockaddr *)&my_addr,
  23.                 sizeof(struct sockaddr_in)) == -1 ) {
  24.         perror("bind error");
  25.         exit(1);
  26.     }
  27.     if ( listen( listen_fd,1 ) == -1 ){
  28.         perror("listen error");
  29.         exit(1);
  30.     }
  31.     for(;;){
  32.         int addr_len = sizeof( struct sockaddr_in );
  33.         accept_fd = accept( listen_fd,
  34.                                 (struct sockaddr *)&remote_addr,&addr_len );
  35.         int flags = fcntl(accept_fd, F_GETFL, 0);
  36.         fcntl(accept_fd, F_SETFL, flags|O_NONBLOCK);
  37.        
  38.         int pid = fork();
  39.         if( pid == 0 ){
  40.                 struct epoll_event ev,events[20];
  41.                 int epfd = epoll_create(256);
  42.                 int ev_s=0;
  43.  
  44.                 ev.data.fd=accept_fd;
  45.                 ev.events=EPOLLIN|EPOLLET;
  46.                 epoll_ctl(epfd,EPOLL_CTL_ADD,accept_fd,&ev);
  47.                 for(;;){
  48.                     ev_s = epoll_wait( epfd,events,20,500 );
  49.                     int i=0;
  50.                     for(i=0; i<ev_s; i++){
  51.                         if(events[i].events&EPOLLIN){
  52.                             printf("accept event :%d\n",i);
  53.                             char in_buf[1024];
  54.                             memset(in_buf, 0, 1024);
  55.                             recv( events[i].data.fd ,&in_buf ,1024 ,0 );
  56.                             printf( "accept:%s\n", in_buf );
  57.                         }
  58.                     }
  59.                 }
  60.         }
  61.         else{
  62.             //do nothing
  63.         }
  64.        
  65.     }
  66.     return 0;
  67. }

从头编写高性能服务程序8-多进程非阻塞epoll-prefork

上一个版本在accept的位置仍旧会被阻塞
这样当一个链接进来的时候就会产生一个新的进程
进程的不断开启和释放会造成很大的性能影响
而一般Apache和Nginx的做法就是先产生N个进程用以备用
当有实际链接的时候,就由这些进程获取并进行处理
(注:Nginx的线程模式只在Windows下有效,在Linux下是使用进程模型的)
这样我们就有两个地方需要改造

第一个是将listen端口也变为非阻塞
这样当有新链接进来的时候我们得到通知才去处理
这样accept调用就不会被阻塞在进程导致进程无法进行后续的工作

第二是进程一启动之后就fork N个进程
这些进程启动之后就自行获取各自的accept
然后自行处理各自获取的链接并管理其生命周期内的所有内容

将listen也放置到epoll中
就需要在每次获得epoll events的时候判断下
这个events是前面放进去的listen,如果是listen就要accept
如果是accept的就进行数据传输处理

下载: code.txt
  1. #include <sys/socket.h>
  2. #include <netinet/in.h>
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include <string.h>
  6. #include <fcntl.h>
  7. #include <sys/epoll.h>
  8. int main(){
  9.     int listen_fd,accept_fd,flag;
  10.     struct sockaddr_in my_addr,remote_addr;
  11.     if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1){
  12.         perror("create socket error");
  13.         exit(1);
  14.     }
  15.     if (setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR
  16.                     ,(char *)&flag,sizeof(flag)) == -1){
  17.         perror("setsockopt error");
  18.     }
  19.     int flags = fcntl(listen_fd, F_GETFL, 0);
  20.     fcntl(listen_fd, F_SETFL, flags|O_NONBLOCK);
  21.     my_addr.sin_family = AF_INET;
  22.     my_addr.sin_port = htons(3389);
  23.     my_addr.sin_addr.s_addr = INADDR_ANY;
  24.     bzero(&(my_addr.sin_zero), 8);
  25.     if (bind(listen_fd, (struct sockaddr *)&my_addr,
  26.                 sizeof(struct sockaddr_in)) == -1) {
  27.         perror("bind error");
  28.         exit(1);
  29.     }
  30.     if (listen(listen_fd,1) == -1){
  31.         perror("listen error");
  32.         exit(1);
  33.     }
  34.     int pid=-1;
  35.     int addr_len = sizeof(struct sockaddr_in);
  36.     int max_process_num = 3;
  37.     int child_id;
  38.     int i;
  39.     int child_process_status;
  40.     for(i = 0; i < max_process_num; i++){
  41.         if(pid != 0){
  42.             pid = fork();
  43.         }
  44.         else{
  45.             child_id = i;
  46.         }
  47.     }
  48.     if(pid == 0){
  49.         int accept_handles = 0;
  50.         struct epoll_event ev,events[20];
  51.         int epfd = epoll_create(256);
  52.         int ev_s = 0;
  53.         ev.data.fd = listen_fd;
  54.         ev.events = EPOLLIN|EPOLLET;
  55.         epoll_ctl(epfd,EPOLL_CTL_ADD,listen_fd,&ev);
  56.         for(;;){
  57.             ev_s = epoll_wait( epfd,events, 20, 500 );
  58.             int i = 0;
  59.             for(i = 0; i<ev_s; i++){
  60.                 if(events[i].data.fd == listen_fd){
  61.                     int max_process_accept = 3;
  62.                     if(accept_handles < max_process_accept){
  63.                         accept_handles++;
  64.                         int addr_len = sizeof( struct sockaddr_in );
  65.                         accept_fd = accept( listen_fd,
  66.                                                 (struct sockaddr *)&remote_addr, &addr_len );
  67.                         int flags = fcntl(accept_fd, F_GETFL, 0);
  68.                         fcntl(accept_fd, F_SETFL, flags|O_NONBLOCK);
  69.                         ev.data.fd = accept_fd;
  70.                         ev.events = EPOLLIN|EPOLLET;
  71.                         epoll_ctl(epfd,EPOLL_CTL_ADD,accept_fd,&ev);
  72.                         printf("Child:%d,ProcessID:%d,EPOLLIN,fd:%d,accept:%d\n", child_id, getpid(), listen_fd, accept_fd);
  73.                     }
  74.                 }
  75.                 else if(events[i].events&EPOLLIN){
  76.                     char in_buf[1024];
  77.                     memset(in_buf, 0, 1024);
  78.                     int recv_num = recv( events[i].data.fd, &in_buf, 1024, 0 );
  79.                     if( recv_num ==0 ){
  80.                         close(events[i].data.fd);
  81.                         accept_handles--;
  82.                         printf("Child:%d,ProcessID:%d,EPOLLIN,fd:%d,closed\n", child_id, getpid(), events[i].data.fd);
  83.                     }
  84.                     else{
  85.                         printf("Child:%d,ProcessID:%d,EPOLLIN,fd:%d,recv:%s\n", child_id, getpid(), events[i].data.fd, in_buf);
  86.                     }
  87.                 }
  88.             }
  89.         }
  90.     }
  91.     else{
  92.         //manager the process
  93.         wait(&child_process_status);
  94.     }
  95.    
  96.     return 0;
  97. }

这个问题就是多进程里面的群惊现象
当所有进程都阻塞在listen端口时
一旦系统获得listen端口的状态变化
会把所有的线程都唤醒,而只有一个获取后进行处理
其他的在被阻塞后继续沉睡
不过在Linux 2.4以后这个群惊的现象已经被内核给处理掉了
实际测试下来只有一个进程会被再次唤醒,而其他保持阻塞,
这样阻塞在listen端口其实效率不比阻塞在同享锁差.毕竟调度是在内核级别的.


 

从头编写高性能服务程序9-多进程非阻塞epoll-prefork-hook


 

整个基础结构已经基本确定了
接下来做一些细节工作
首先把一些函数抽取出来.
例如prefork独立出来.socket->bind->listen独立出来

这里我们引入一个新的思路
原先由统一的函数在epoll_wait之后对events里面的fd进行处理
但是每个fd可能需要处理的方式都不同.
怎么样针对不同的fd来调用特定的函数呢?

首先在epoll_event结构中有data成员
而data的定义如下

typedef union epoll_data {
                void *ptr;
                int fd;
                __uint32_t u32;
                __uint64_t u64;
        } epoll_data_t;

        struct epoll_event {
                __uint32_t events;      /* Epoll events */
                epoll_data_t data;      /* User data variable */
        };

可见既可以在events里面放data.fd
也可以使用data.ptr来指向一个指针
当fd有消息时内核将对应的ev变量塞入events数组的时候
如果我们只是用fd来指向注册的,那么获取数据的时候只能得到对应的fd
这样使用什么函数来处理这个fd就需要另行判断

那么如果使用ptr来指向一个结构
而结构内保存了fd以及处理这个fd所使用的函数指针
那当我们得到events数组内的事件时
就可以直接调用ptr指向的函数指针了.
这就类似Nginx中的hook函数.
在Nginx中几乎任何一种事件都会绑定其处理函数
而由模块实现距离的函数,然后在hook上去.

那么下面的代码我们就模拟这个方法:
我们建立一个数据结构来保存每个fd以及对应的处理函数

struct event_handle{
    int fd;
    int (* handle)(int fd);
};

handle_hook是我们为每个fd注册的处理函数
当accept获得新的accept_fd之后
我们使用

ev_handles[accept_handles].handle = handle_hook

来将对应的函数注册到对应的events内
在fd得到通知的时候
使用

(*current_handle)(current_fd)

来进行处理

下载: code.txt
  1. #include <sys/socket.h>
  2. #include <sys/wait.h>
  3. #include <netinet/in.h>
  4. #include <sys/epoll.h>
  5. #include <sys/sendfile.h> 
  6. #include <unistd.h>
  7. #include <stdio.h>
  8. #include <stdlib.h>
  9. #include <string.h>
  10. #include <strings.h>
  11. #include <fcntl.h>
  12.  
  13. int create_listen_fd(int port){
  14.     int listen_fd;
  15.     struct sockaddr_in my_addr;
  16.     if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1){
  17.         perror("create socket error");
  18.         exit(1);
  19.     }
  20.     int flag;
  21.     if (setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR
  22.     ,(char *)&flag,sizeof(flag)) == -1){
  23.         perror("setsockopt error");
  24.     }
  25.     int flags = fcntl(listen_fd, F_GETFL, 0);
  26.     fcntl(listen_fd, F_SETFL, flags|O_NONBLOCK);
  27.     my_addr.sin_family = AF_INET;
  28.     my_addr.sin_port = htons(port);
  29.     my_addr.sin_addr.s_addr = INADDR_ANY;
  30.     bzero(&(my_addr.sin_zero), 8);
  31.     if (bind(listen_fd, (struct sockaddr *)&my_addr,
  32.     sizeof(struct sockaddr_in)) == -1) {
  33.         perror("bind error");
  34.         exit(1);
  35.     }
  36.     if (listen(listen_fd,1) == -1){
  37.         perror("listen error");
  38.         exit(1);
  39.     }
  40.     return listen_fd;
  41. }
  42.  
  43. int create_accept_fd(int listen_fd){
  44.     int addr_len = sizeof( struct sockaddr_in );
  45.     struct sockaddr_in remote_addr;
  46.     int accept_fd = accept( listen_fd,
  47.         (struct sockaddr *)&remote_addr, &addr_len );
  48.     int flags = fcntl(accept_fd, F_GETFL, 0);
  49.     fcntl(accept_fd, F_SETFL, flags|O_NONBLOCK);
  50.     return accept_fd;
  51. }
  52.  
  53. int fork_process(int process_num){
  54.     int i;
  55.     int pid=-1;
  56.     for(i = 0; i < process_num; i++){
  57.         if(pid != 0){
  58.             pid = fork();
  59.         }
  60.     }
  61.     return pid;
  62. }
  63.  
  64. int handle_normal(int socket_fd){
  65.     char in_buf[1024];
  66.     memset(in_buf, 0, 1024);
  67.     int recv_num = recv( socket_fd, &in_buf, 1024, 0 );
  68.     if( recv_num ==0 ){
  69.         close(socket_fd);
  70.         printf("ProcessID:%d,EPOLLIN,fd:%d,closed\n", getpid(), socket_fd);
  71.     }
  72.     else{
  73.         printf("ProcessID:%d,EPOLLIN,fd:%d,recv:%s\n", getpid(), socket_fd, in_buf);
  74.     }
  75.     return recv_num;
  76. }
  77.  
  78. int handle_hook(int socket_fd){
  79.     char in_buf[1024];
  80.     memset(in_buf, 0, 1024);
  81.     int recv_num = recv( socket_fd, &in_buf, 1024, 0 );
  82.     if( recv_num ==0 ){
  83.         close(socket_fd);
  84.         printf("ProcessID:%d,EPOLLIN,fd:%d,closed\n", getpid(), socket_fd);
  85.     }
  86.     else{
  87.         printf("ProcessID:%d,EPOLLIN,fd:%d,recv_num:%d;recv:", getpid(), socket_fd, recv_num);
  88.         for (int i = 0; i<recv_num; i++){
  89.         printf("%02x ",in_buf[i]);
  90.         }
  91.         printf("\n");
  92.     }
  93.     return recv_num;
  94. }
  95.  
  96. struct event_handle{
  97.     int fd;
  98.     int (* handle)(int fd);
  99. };
  100. typedef int (* EVENT_HANDLE)(int);
  101. typedef struct event_handle * EH;
  102.  
  103. int main(){
  104.     int listen_fd = create_listen_fd(3389);
  105.     int pid = fork_process(3);
  106.     if(pid == 0){
  107.         int accept_handles = 0;
  108.         struct epoll_event ev,events[20];
  109.         int epfd = epoll_create(256);
  110.         int ev_s = 0;
  111.        
  112.         ev.data.fd = listen_fd;
  113.         ev.events = EPOLLIN|EPOLLET;
  114.         epoll_ctl(epfd,EPOLL_CTL_ADD,listen_fd,&ev);
  115.         struct event_handle ev_handles[256];
  116.         for(;;){
  117.             ev_s = epoll_wait( epfd,events, 20, 500 );
  118.             int i = 0;
  119.             for(i = 0; i<ev_s; i++){
  120.                 if(events[i].data.fd == listen_fd){
  121.                     int max_process_accept = 3;
  122.                     if(accept_handles < max_process_accept){
  123.                         accept_handles++;
  124.                         int accept_fd = create_accept_fd(listen_fd);
  125.                         ev_handles[accept_handles].fd = accept_fd;
  126.                         ev_handles[accept_handles].handle = handle_hook;
  127.                         ev.data.ptr = &ev_handles[accept_handles];
  128.                         ev.events = EPOLLIN|EPOLLET;
  129.                         epoll_ctl(epfd,EPOLL_CTL_ADD,accept_fd,&ev);
  130.                         printf("ProcessID:%d,EPOLLIN,fd:%d,accept:%d\n", getpid(), listen_fd, accept_fd);
  131.                     }
  132.                 }
  133.                 else if(events[i].events&EPOLLIN){
  134.                     EVENT_HANDLE current_handle = ((EH)(events[i].data.ptr))->handle;
  135.                     int current_fd = ((EH)(events[i].data.ptr))->fd;
  136.                     if( (*current_handle)(current_fd)  == 0){ 
  137.                         accept_handles--;
  138.                     }
  139.                 }
  140.                 else if(events[i].events&EPOLLOUT){
  141.                     //need add write event process
  142.                 }
  143.             }
  144.         }
  145.     }
  146.     else{
  147.         //manager the process
  148.         int child_process_status;
  149.         wait(&child_process_status);
  150.     }
  151.    
  152.     return 0;
  153. }
从头编写高性能服务程序10-请求解析

最终的形态基本上在上次的结构中就定型了
当然有些细节需要完善
不过基本上用这个结构来写service已经是OK了
那么现在就是继续细化这个结构.用来写个比较靠近实际的应用

有了链接的管理
接下来就是对通讯协议的实现
由于是从头开始写
所以协议也由我们自己来实现
先是对请求的解析
从客户端telnet传送过来的数据
回行是用/r/n结尾的
所以我们不停的接受数据
然后判断数据的最后是否是/r/n
如果是的话.就把它和以前的数据一起拼接起来
然后调用请求分析来解析指令

在event_handle结构中
我们加入了command数组
用来存放每次传输过来的数据
直至遇到以/r/n结尾的数据.然后拼接起来,输出,再清空这个数组
从头再接受新的指令

由于使用了epoll和非阻塞accept_fd
所以每次接受到的数据是零散的
需要将每次recv的数据连续的拼接到一个变量中
这就是command数组存在的理由
而command_pos用来保存的是每次拼接后数组的实际存放数据的量
也可以认为是最后一个数据所在数组中的位置
便于下次拼接

下载: code.txt
  1. #include <sys/socket.h>
  2. #include <sys/wait.h>
  3. #include <netinet/in.h>
  4. #include <sys/epoll.h>
  5. #include <sys/sendfile.h> 
  6. #include <unistd.h>
  7. #include <stdio.h>
  8. #include <stdlib.h>
  9. #include <string.h>
  10. #include <strings.h>
  11. #include <fcntl.h>
  12.  
  13. typedef struct event_handle{
  14.     int fd;
  15.     int ( * handle )( struct event_handle * ev );
  16.     char command[1024];
  17.     int command_pos;
  18. } EV,* EH;
  19. typedef int ( * EVENT_HANDLE )( struct event_handle * ev );
  20.  
  21.  
  22. int create_listen_fd( int port ){
  23.     int listen_fd;
  24.     struct sockaddr_in my_addr;
  25.     if ( ( listen_fd = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1 ){
  26.         perror( "create socket error" );
  27.         exit( 1 );
  28.     }
  29.     int flag;
  30.     if ( setsockopt( listen_fd, SOL_SOCKET, SO_REUSEADDR
  31.     , ( char * )&flag, sizeof( flag ) ) == -1 ){
  32.         perror( "setsockopt error" );
  33.     }
  34.     int flags = fcntl( listen_fd, F_GETFL, 0 );
  35.     fcntl( listen_fd, F_SETFL, flags|O_NONBLOCK );
  36.     my_addr.sin_family = AF_INET;
  37.     my_addr.sin_port = htons( port );
  38.     my_addr.sin_addr.s_addr = INADDR_ANY;
  39.     bzero( &( my_addr.sin_zero ), 8 );
  40.     if ( bind( listen_fd, ( struct sockaddr * )&my_addr,
  41.     sizeof( struct sockaddr_in ) ) == -1 ) {
  42.         perror( "bind error" );
  43.         exit( 1 );
  44.     }
  45.     if ( listen( listen_fd, 1 ) == -1 ){
  46.         perror( "listen error" );
  47.         exit( 1 );
  48.     }
  49.     return listen_fd;
  50. }
  51.  
  52. int create_accept_fd( int listen_fd ){
  53.     int addr_len = sizeof( struct sockaddr_in );
  54.     struct sockaddr_in remote_addr;
  55.     int accept_fd = accept( listen_fd,
  56.         ( struct sockaddr * )&remote_addr, &addr_len );
  57.     int flags = fcntl( accept_fd, F_GETFL, 0 );
  58.     fcntl( accept_fd, F_SETFL, flags|O_NONBLOCK );
  59.     return accept_fd;
  60. }
  61.  
  62. int fork_process( int process_num ){
  63.     int i;
  64.     int pid=-1;
  65.     for( i = 0; i < process_num; i++ ){
  66.         if( pid != 0 ){
  67.             pid = fork();
  68.         }
  69.     }
  70.     return pid;
  71. }
  72.  
  73. int handle_hook_v2( EH ev ){
  74.     char in_buf[1024];
  75.     memset( in_buf, 0, 1024 );
  76.     int recv_num = recv( ev->fd, &in_buf, 1024, 0 );
  77.     if( recv_num ==0 ){
  78.         printf( "ProcessID:%d, EPOLLIN, fd:%d, closed\n", getpid(), ev->fd );
  79.         printf( "  recved:%s\n", ev->command );
  80.         close( ev->fd );
  81.     }
  82.     else{
  83.         printf( "ProcessID:%d, EPOLLIN, fd:%d, recv_num:%d;recv:", getpid(), ev->fd, recv_num );
  84.         int i;
  85.         for( i = 0; i<recv_num; i++ ){
  86.             printf( "%02x ", in_buf[i] );
  87.         }
  88.         printf( "\n" );
  89.         memcpy( ev->command + ev->command_pos, in_buf, recv_num );
  90.         ev->command_pos += recv_num;
  91.         if( recv_num == 2 && ( !memcmp( &in_buf[recv_num-2], "\r\n", 2 ) ) ){
  92.             printf( "  recved:%s\n", ev->command );
  93.             memset( ev->command, 0, 1024 );
  94.             ev->command_pos = 0;
  95.         }
  96.     }
  97.     return recv_num;
  98. }
  99.  
  100.  
  101.  
  102. int main(){
  103.     int listen_fd = create_listen_fd( 3389 );
  104.     int pid = fork_process( 3 );
  105.     if( pid == 0 ){
  106.         int accept_handles = 0;
  107.         struct epoll_event ev, events[20];
  108.         int epfd = epoll_create( 256 );
  109.         int ev_s = 0;
  110.        
  111.         ev.data.fd = listen_fd;
  112.         ev.events = EPOLLIN|EPOLLET;
  113.         epoll_ctl( epfd, EPOLL_CTL_ADD, listen_fd, &ev );
  114.         struct event_handle ev_handles[256];
  115.         for( ;; ){
  116.             ev_s = epoll_wait( epfd, events, 20, 500 );
  117.             int i = 0;
  118.             for( i = 0; i<ev_s; i++ ){
  119.                 if( events[i].data.fd == listen_fd ){
  120.                     int max_process_accept = 3;
  121.                     if( accept_handles < max_process_accept ){
  122.                         accept_handles++;
  123.                         int accept_fd = create_accept_fd( listen_fd );
  124.                         ev_handles[accept_handles].fd = accept_fd;
  125.                         ev_handles[accept_handles].handle = handle_hook_v2;
  126.                         ev_handles[accept_handles].command_pos = 0;
  127.                         memset( ev_handles[accept_handles].command, 0, 1024 );
  128.                         ev.data.ptr = &ev_handles[accept_handles];
  129.                         ev.events = EPOLLIN|EPOLLET;
  130.                         epoll_ctl( epfd, EPOLL_CTL_ADD, accept_fd, &ev );
  131.                         printf( "ProcessID:%d, EPOLLIN, fd:%d, accept:%d\n", getpid(), listen_fd, accept_fd );
  132.                     }
  133.                 }
  134.                 else if( events[i].events&EPOLLIN ){
  135.                     EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->handle;
  136.                     EH current_event = ( EH )( events[i].data.ptr );
  137.                     if( ( *current_handle )( current_event )  == 0 ){ 
  138.                         accept_handles--;
  139.                     }
  140.                 }
  141.                 else if( events[i].events&EPOLLOUT ){
  142.                     //need add write event process
  143.                 }
  144.             }
  145.         }
  146.     }
  147.     else{
  148.         //manager the process
  149.         int child_process_status;
  150.         wait( &child_process_status );
  151.     }
  152.    
  153.     return 0;
  154. }





从头编写高性能服务程序11-指令处理&sendfile

实现命令的获取之后
现在是增加对command的解析以及对应的反馈
为了做些稍微有意义的事情.我把这个service做的工作定位在以下内容
1.查询文件大小
2.返回文件内容
3.删除文件

协议的格式如下
请求模式:文件名称
例如
1:new.txt

请求模式见源代码中的宏定义

这次代码也对原来的程序作了一定的修改
从丑陋的代码渐渐修改.
希望在最终程序完成的时候
能够有比较好的代码风格

这个版本中有一个bug
就是sendfile只是调用一次
而实际上如果是较大的文件
需要在判断EPLLOUT之后不停的sendfile
直到EAGAIN遇见accept_fd阻塞为止
这样直至下次EPOLLOUT发生
再从上次暂停的位置继续发送
下一个版本中将会有这个BUG的修正

下载: code.txt
  1. #include <sys/socket.h>
  2. #include <sys/wait.h>
  3. #include <netinet/in.h>
  4. #include <netinet/tcp.h>
  5. #include <sys/epoll.h>
  6. #include <sys/sendfile.h> 
  7. #include <sys/stat.h>
  8. #include <unistd.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <strings.h>
  13. #include <fcntl.h>
  14.  
  15. #define HANDLE_INFO   1
  16. #define HANDLE_SEND   2
  17. #define HANDLE_DEL    3
  18. #define HANDLE_CLOSE  4
  19.  
  20. #define MAX_REQLEN          1024
  21. #define MAX_PROCESS_CONN    3
  22. #define FIN_CHAR            0x00
  23. #define SUCCESS  0
  24. #define ERROR   -1
  25.  
  26. typedef struct event_handle{
  27.     int socket_fd;
  28.     int file_fd;
  29.     char request[MAX_REQLEN];
  30.     int request_len;
  31.     int ( * handle )( struct event_handle * ev );
  32.     int handle_method;
  33.  
  34. } EV,* EH;
  35. typedef int ( * EVENT_HANDLE )( struct event_handle * ev );
  36.  
  37. int create_listen_fd( int port ){
  38.     int listen_fd;
  39.     struct sockaddr_in my_addr;
  40.     if( ( listen_fd = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1 ){
  41.         perror( "create socket error" );
  42.         exit( 1 );
  43.     }
  44.     int flag;
  45.     int olen = sizeof(int);
  46.     if( setsockopt( listen_fd, SOL_SOCKET, SO_REUSEADDR
  47.                         , (const void *)&flag, olen ) == -1 ){
  48.         perror( "setsockopt error" );
  49.     }
  50.     flag = 1;
  51.     if( setsockopt( listen_fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &flag, olen ) == -1 ){
  52.         perror( "setsockopt error" );
  53.     }
  54.     int flags = fcntl( listen_fd, F_GETFL, 0 );
  55.     fcntl( listen_fd, F_SETFL, flags|O_NONBLOCK );
  56.     my_addr.sin_family = AF_INET;
  57.     my_addr.sin_port = htons( port );
  58.     my_addr.sin_addr.s_addr = INADDR_ANY;
  59.     bzero( &( my_addr.sin_zero ), 8 );
  60.     if( bind( listen_fd, ( struct sockaddr * )&my_addr,
  61.     sizeof( struct sockaddr_in ) ) == -1 ) {
  62.         perror( "bind error" );
  63.         exit( 1 );
  64.     }
  65.     if( listen( listen_fd, 1 ) == -1 ){
  66.         perror( "listen error" );
  67.         exit( 1 );
  68.     }
  69.     return listen_fd;
  70. }
  71.  
  72. int create_accept_fd( int listen_fd ){
  73.     int addr_len = sizeof( struct sockaddr_in );
  74.     struct sockaddr_in remote_addr;
  75.     int accept_fd = accept( listen_fd,
  76.         ( struct sockaddr * )&remote_addr, &addr_len );
  77.     int flags = fcntl( accept_fd, F_GETFL, 0 );
  78.     fcntl( accept_fd, F_SETFL, flags|O_NONBLOCK );
  79.     return accept_fd;
  80. }
  81.  
  82. int fork_process( int process_num ){
  83.     int i;
  84.     int pid=-1;
  85.     for( i = 0; i < process_num; i++ ){
  86.         if( pid != 0 ){
  87.             pid = fork();
  88.         }
  89.     }
  90.     return pid;
  91. }
  92.  
  93. int init_evhandle(EH ev,int socket_fd,EVENT_HANDLE handle){
  94.     ev->socket_fd = socket_fd;
  95.     ev->handle = handle;
  96.     ev->request_len = 0;
  97.     ev->handle_method = 0;
  98.     memset( ev->request, 0, 1024 );
  99. }
  100. //accept->accept_queue->request->request_queue->output->output_queue
  101. //multi process sendfile
  102. int parse_request(EH ev){
  103.     ev->request_len--;
  104.     *( ev->request + ev->request_len - 1 ) = 0x00;
  105.     int i;
  106.     for( i=0; i<ev->request_len; i++ ){
  107.         if( ev->request[i] == ':' ){
  108.             ev->request_len = ev->request_len-i-1;
  109.             char temp[MAX_REQLEN];
  110.             memcpy( temp, ev->request, i );
  111.             ev->handle_method = atoi( temp );
  112.             memcpy( temp, ev->request+i+1, ev->request_len );
  113.             memcpy( ev->request, temp, ev->request_len );
  114.             break;
  115.         }
  116.     }
  117.     handle_request( ev );
  118.     return SUCCESS;
  119. }
  120.  
  121. int handle_request(EH ev){
  122.     struct stat file_info;
  123.     switch( ev->handle_method ){
  124.         case HANDLE_INFO:
  125.             ev->file_fd = open( ev->request, O_RDONLY );
  126.             if( ev->file_fd == -1 ){
  127.                 send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 );
  128.                 return -1;
  129.             }
  130.             fstat(ev->file_fd, &file_info);
  131.             char info[MAX_REQLEN];
  132.             sprintf(info,"file len:%d\n",file_info.st_size);
  133.             send( ev->socket_fd, info, strlen( info ), 0 );
  134.             break;
  135.         case HANDLE_SEND:
  136.             ev->file_fd = open( ev->request, O_RDONLY );
  137.             if( ev->file_fd == -1 ){
  138.                 send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 );
  139.                 return -1;
  140.             }
  141.             fstat(ev->file_fd, &file_info);
  142.             sendfile( ev->socket_fd, ev->file_fd, 0, file_info.st_size );
  143.             break;
  144.         case HANDLE_DEL:
  145.             break;
  146.         case HANDLE_CLOSE:
  147.             break;
  148.     }
  149.     finish_request( ev );
  150.     return SUCCESS;
  151. }
  152.  
  153. int finish_request(EH ev){
  154.     close(ev->socket_fd);
  155.     close(ev->file_fd);
  156.     ev->handle_method = -1;
  157.     clean_request( ev );
  158.     return SUCCESS;
  159. }
  160.  
  161. int clean_request(EH ev){
  162.     memset( ev->request, 0, MAX_REQLEN );
  163.     ev->request_len = 0;
  164. }
  165.  
  166. int handle_hook_v2( EH ev ){
  167.     char in_buf[MAX_REQLEN];
  168.     memset( in_buf, 0, MAX_REQLEN );
  169.     int recv_num = recv( ev->socket_fd, &in_buf, MAX_REQLEN, 0 );
  170.     if( recv_num ==0 ){
  171.         close( ev->socket_fd );
  172.         return ERROR;
  173.     }
  174.     else{
  175.         //check ifoverflow
  176.         if( ev->request_len > MAX_REQLEN-recv_num ){
  177.             close( ev->socket_fd );
  178.             clean_request( ev );
  179.         }
  180.         memcpy( ev->request + ev->request_len, in_buf, recv_num );
  181.         ev->request_len += recv_num;
  182.         if( recv_num == 2 && ( !memcmp( &in_buf[recv_num-2], "\r\n", 2 ) ) ){
  183.             parse_request(ev);
  184.         }
  185.     }
  186.     return recv_num;
  187. }
  188.  
  189. int main(){
  190.     int listen_fd = create_listen_fd( 3389 );
  191.     int pid = fork_process( 3 );
  192.     if( pid == 0 ){
  193.         int accept_handles = 0;
  194.         struct epoll_event ev, events[20];
  195.         int epfd = epoll_create( 256 );
  196.         int ev_s = 0;
  197.        
  198.         ev.data.fd = listen_fd;
  199.         ev.events = EPOLLIN|EPOLLET;
  200.         epoll_ctl( epfd, EPOLL_CTL_ADD, listen_fd, &ev );
  201.         struct event_handle ev_handles[256];
  202.         for( ;; ){
  203.             ev_s = epoll_wait( epfd, events, 20, 500 );
  204.             int i = 0;
  205.             for( i = 0; i<ev_s; i++ ){
  206.                 if( events[i].data.fd == listen_fd ){
  207.                     if( accept_handles < MAX_PROCESS_CONN ){
  208.                         accept_handles++;
  209.                         int accept_fd = create_accept_fd( listen_fd );
  210.                         init_evhandle(&ev_handles[accept_handles],accept_fd,handle_hook_v2);
  211.                         ev.data.ptr = &ev_handles[accept_handles];
  212.                         ev.events = EPOLLIN|EPOLLET;
  213.                         epoll_ctl( epfd, EPOLL_CTL_ADD, accept_fd, &ev );
  214.                     }
  215.                 }
  216.                 else if( events[i].events&EPOLLIN ){
  217.                     EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->handle;
  218.                     EH current_event = ( EH )( events[i].data.ptr );
  219.                     if( ( *current_handle )( current_event )  == 0 ){ 
  220.                         accept_handles--;
  221.                     }
  222.                 }
  223.                 else if( events[i].events&EPOLLOUT ){
  224.                     //need add write event process
  225.                 }
  226.             }
  227.         }
  228.     }
  229.     else{
  230.         //manager the process
  231.         int child_process_status;
  232.         wait( &child_process_status );
  233.     }
  234.    
  235.     return SUCCESS;
  236. }


从头编写高性能服务程序12-区分读写事件

前一个版本很重要的BUG
就是sendfile在发送大文件的时候会发送不完整
这个bug指出了另一需求
就是需要用到EPOLLOUT事件

前面版本我们事件处理都是在EPOLLIN中进行
当有accept_fd数据进来之后
我们判断指令的内容
再直接进行数据的处理

我们更换一种方式
当accept_fd获取到数据之后
解析数据.
当需要输出的时候,将accept_fd的事件变为EPOLLOUT

这样一种fd会有两种状态
接受指令状态以及输出数据状态
慢慢的.我们会发现这个程序越来越像Nginx或者Lighttpd了
因为一个连接会有不同的状态
事件+状态机就是Nginx以及Lighttpd高效的原因
将一个链接分成不同的生命周期然后处理

经过进化
我们的event_handle结构拥有了读写两种hook钩子

typedef struct event_handle{
    ...
    int ( * read_handle  )( struct event_handle * ev );
    int ( * write_handle )( struct event_handle * ev );
    ...
}

而我们在针对的处理过程中
初始化时会针对不同的事件挂上不同的钩子函数

int init_evhandle(...){
    ...
    ev->read_handle = r_handle;
    ev->write_handle = w_handle;
    ...
}

接着在事件发生时调用不同的钩子函数

else if( events[i].events&EPOLLIN ){
    EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->read_handle;
    ...
}
else if( events[i].events&EPOLLOUT ){
    EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->write_handle;
    ...
}

当分析完指令的时候
将fd变为EPOLLOUT

int parse_request(
    ...
    ev_temp.data.ptr = ev;
    ev_temp.events = EPOLLOUT|EPOLLET;
    epoll_ctl( ev->epoll_fd, EPOLL_CTL_MOD, ev->socket_fd, &ev_temp );
    ...
}

加入这些处理之后
代码越来越长了
我们还需要加很多东西
比如进程管理,等等
这之后会发现一个mini的nginx骨架或者lighttpd骨架出现了

下载: code.txt
  1. #include <sys/socket.h>
  2. #include <sys/wait.h>
  3. #include <netinet/in.h>
  4. #include <netinet/tcp.h>
  5. #include <sys/epoll.h>
  6. #include <sys/sendfile.h> 
  7. #include <sys/stat.h>
  8. #include <unistd.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <strings.h>
  13. #include <fcntl.h>
  14. #include <errno.h>
  15.  
  16. #define HANDLE_INFO   1
  17. #define HANDLE_SEND   2
  18. #define HANDLE_DEL    3
  19. #define HANDLE_CLOSE  4
  20.  
  21. #define MAX_REQLEN          1024
  22. #define MAX_PROCESS_CONN    3
  23. #define FIN_CHAR            0x00
  24. #define SUCCESS  0
  25. #define ERROR   -1
  26.  
  27. typedef struct event_handle{
  28.     int socket_fd;
  29.     int file_fd;
  30.     int file_pos;
  31.     int epoll_fd;
  32.     char request[MAX_REQLEN];
  33.     int request_len;
  34.     int ( * read_handle )( struct event_handle * ev );
  35.     int ( * write_handle )( struct event_handle * ev );
  36.     int handle_method;
  37. } EV,* EH;
  38. typedef int ( * EVENT_HANDLE )( struct event_handle * ev );
  39.  
  40. int create_listen_fd( int port ){
  41.     int listen_fd;
  42.     struct sockaddr_in my_addr;
  43.     if( ( listen_fd = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1 ){
  44.         perror( "create socket error" );
  45.         exit( 1 );
  46.     }
  47.     int flag;
  48.     int olen = sizeof(int);
  49.     if( setsockopt( listen_fd, SOL_SOCKET, SO_REUSEADDR
  50.                         , (const void *)&flag, olen ) == -1 ){
  51.         perror( "setsockopt error" );
  52.     }
  53.     flag = 5;
  54.     if( setsockopt( listen_fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &flag, olen ) == -1 ){
  55.         perror( "setsockopt error" );
  56.     }
  57.     flag = 1;
  58.     if( setsockopt( listen_fd, IPPROTO_TCP, TCP_CORK, &flag, olen ) == -1 ){
  59.         perror( "setsockopt error" );
  60.     }
  61.     int flags = fcntl( listen_fd, F_GETFL, 0 );
  62.     fcntl( listen_fd, F_SETFL, flags|O_NONBLOCK );
  63.     my_addr.sin_family = AF_INET;
  64.     my_addr.sin_port = htons( port );
  65.     my_addr.sin_addr.s_addr = INADDR_ANY;
  66.     bzero( &( my_addr.sin_zero ), 8 );
  67.     if( bind( listen_fd, ( struct sockaddr * )&my_addr,
  68.     sizeof( struct sockaddr_in ) ) == -1 ) {
  69.         perror( "bind error" );
  70.         exit( 1 );
  71.     }
  72.     if( listen( listen_fd, 1 ) == -1 ){
  73.         perror( "listen error" );
  74.         exit( 1 );
  75.     }
  76.     return listen_fd;
  77. }
  78.  
  79. int create_accept_fd( int listen_fd ){
  80.     int addr_len = sizeof( struct sockaddr_in );
  81.     struct sockaddr_in remote_addr;
  82.     int accept_fd = accept( listen_fd,
  83.         ( struct sockaddr * )&remote_addr, &addr_len );
  84.     int flags = fcntl( accept_fd, F_GETFL, 0 );
  85.     fcntl( accept_fd, F_SETFL, flags|O_NONBLOCK );
  86.     return accept_fd;
  87. }
  88.  
  89. int fork_process( int process_num ){
  90.     int i;
  91.     int pid=-1;
  92.     for( i = 0; i < process_num; i++ ){
  93.         if( pid != 0 ){
  94.             pid = fork();
  95.         }
  96.     }
  97.     return pid;
  98. }
  99.  
  100. int init_evhandle(EH ev,int socket_fd,int epoll_fd,EVENT_HANDLE r_handle,EVENT_HANDLE w_handle){
  101.     ev->epoll_fd = epoll_fd;
  102.     ev->socket_fd = socket_fd;
  103.     ev->read_handle = r_handle;
  104.     ev->write_handle = w_handle;
  105.     ev->file_pos = 0;
  106.     ev->request_len = 0;
  107.     ev->handle_method = 0;
  108.     memset( ev->request, 0, 1024 );
  109. }
  110. //accept->accept_queue->request->request_queue->output->output_queue
  111. //multi process sendfile
  112. int parse_request(EH ev){
  113.     ev->request_len--;
  114.     *( ev->request + ev->request_len - 1 ) = 0x00;
  115.     int i;
  116.     for( i=0; i<ev->request_len; i++ ){
  117.         if( ev->request[i] == ':' ){
  118.             ev->request_len = ev->request_len-i-1;
  119.             char temp[MAX_REQLEN];
  120.             memcpy( temp, ev->request, i );
  121.             ev->handle_method = atoi( temp );
  122.             memcpy( temp, ev->request+i+1, ev->request_len );
  123.             memcpy( ev->request, temp, ev->request_len );
  124.             break;
  125.         }
  126.     }
  127.     //handle_request( ev );
  128.     //register to epoll EPOLLOUT
  129.    
  130.     struct epoll_event ev_temp;
  131.     ev_temp.data.ptr = ev;
  132.     ev_temp.events = EPOLLOUT|EPOLLET;
  133.     epoll_ctl( ev->epoll_fd, EPOLL_CTL_MOD, ev->socket_fd, &ev_temp );
  134.     return SUCCESS;
  135. }
  136.  
  137. int handle_request(EH ev){
  138.     struct stat file_info;
  139.     switch( ev->handle_method ){
  140.         case HANDLE_INFO:
  141.             ev->file_fd = open( ev->request, O_RDONLY );
  142.             if( ev->file_fd == -1 ){
  143.                 send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 );
  144.                 return -1;
  145.             }
  146.             fstat(ev->file_fd, &file_info);
  147.             char info[MAX_REQLEN];
  148.             sprintf(info,"file len:%d\n",file_info.st_size);
  149.             send( ev->socket_fd, info, strlen( info ), 0 );
  150.             break;
  151.         case HANDLE_SEND:
  152.             ev->file_fd = open( ev->request, O_RDONLY );
  153.             if( ev->file_fd == -1 ){
  154.                 send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 );
  155.                 return -1;
  156.             }
  157.             fstat(ev->file_fd, &file_info);
  158.             sendfile( ev->socket_fd, ev->file_fd, 0, file_info.st_size );
  159.             break;
  160.         case HANDLE_DEL:
  161.             break;
  162.         case HANDLE_CLOSE:
  163.             break;
  164.     }
  165.     finish_request( ev );
  166.     return SUCCESS;
  167. }
  168.  
  169. int finish_request(EH ev){
  170.     close(ev->socket_fd);
  171.     close(ev->file_fd);
  172.     ev->handle_method = -1;
  173.     clean_request( ev );
  174.     return SUCCESS;
  175. }
  176.  
  177. int clean_request(EH ev){
  178.     memset( ev->request, 0, MAX_REQLEN );
  179.     ev->request_len = 0;
  180. }
  181.  
  182. int read_hook_v2( EH ev ){
  183.     char in_buf[MAX_REQLEN];
  184.     memset( in_buf, 0, MAX_REQLEN );
  185.     int recv_num = recv( ev->socket_fd, &in_buf, MAX_REQLEN, 0 );
  186.     if( recv_num ==0 ){
  187.         close( ev->socket_fd );
  188.         return ERROR;
  189.     }
  190.     else{
  191.         //check ifoverflow
  192.         if( ev->request_len > MAX_REQLEN-recv_num ){
  193.             close( ev->socket_fd );
  194.             clean_request( ev );
  195.         }
  196.         memcpy( ev->request + ev->request_len, in_buf, recv_num );
  197.         ev->request_len += recv_num;
  198.         if( recv_num == 2 && ( !memcmp( &in_buf[recv_num-2], "\r\n", 2 ) ) ){
  199.             parse_request(ev);
  200.         }
  201.     }
  202.     return recv_num;
  203. }
  204.  
  205. int write_hook_v1( EH ev ){
  206.     struct stat file_info;
  207.     ev->file_fd = open( ev->request, O_RDONLY );
  208.     if( ev->file_fd == ERROR ){
  209.         send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 );
  210.         return ERROR;
  211.     }
  212.     fstat(ev->file_fd, &file_info);
  213.     int write_num;
  214.     while(1){
  215.         write_num = sendfile( ev->socket_fd, ev->file_fd, (off_t *)&ev->file_pos, 10240 );
  216.         ev->file_pos += write_num;
  217.         if( write_num == ERROR ){
  218.             if( errno == EAGAIN ){
  219.                 break;
  220.             }
  221.         }
  222.         else if( write_num == 0 ){
  223.             printf( "writed:%d\n", ev->file_pos );
  224.             //finish_request( ev );
  225.             break;
  226.         }
  227.     }
  228.     return SUCCESS;
  229. }
  230.  
  231. int main(){
  232.     int listen_fd = create_listen_fd( 3389 );
  233.     int pid = fork_process( 3 );
  234.     if( pid == 0 ){
  235.         int accept_handles = 0;
  236.         struct epoll_event ev, events[20];
  237.         int epfd = epoll_create( 256 );
  238.         int ev_s = 0;
  239.        
  240.         ev.data.fd = listen_fd;
  241.         ev.events = EPOLLIN|EPOLLET;
  242.         epoll_ctl( epfd, EPOLL_CTL_ADD, listen_fd, &ev );
  243.         struct event_handle ev_handles[256];
  244.         for( ;; ){
  245.             ev_s = epoll_wait( epfd, events, 20, 500 );
  246.             int i = 0;
  247.             for( i = 0; i<ev_s; i++ ){
  248.                 if( events[i].data.fd == listen_fd ){
  249.                     if( accept_handles < MAX_PROCESS_CONN ){
  250.                         accept_handles++;
  251.                         int accept_fd = create_accept_fd( listen_fd );
  252.                         init_evhandle(&ev_handles[accept_handles],accept_fd,epfd,read_hook_v2,write_hook_v1);
  253.                         ev.data.ptr = &ev_handles[accept_handles];
  254.                         ev.events = EPOLLIN|EPOLLET;
  255.                         epoll_ctl( epfd, EPOLL_CTL_ADD, accept_fd, &ev );
  256.                     }
  257.                 }
  258.                 else if( events[i].events&EPOLLIN ){
  259.                     EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->read_handle;
  260.                     EH current_event = ( EH )( events[i].data.ptr );
  261.                     ( *current_handle )( current_event );
  262.                 }
  263.                 else if( events[i].events&EPOLLOUT ){
  264.                     EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->write_handle;
  265.                     EH current_event = ( EH )( events[i].data.ptr );
  266.                     if( ( *current_handle )( current_event )  == 0 ){ 
  267.                         accept_handles--;
  268.                     }
  269.                 }
  270.             }
  271.         }
  272.     }
  273.     else{
  274.         //manager the process
  275.         int child_process_status;
  276.         wait( &child_process_status );
  277.     }
  278.    
  279.     return SUCCESS;
  280. }
posted on 2010-05-09 11:22 doing5552 阅读(814) 评论(0)  编辑 收藏 引用

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理