从头编写高性能服务程序1-单进程阻塞
标题有些大.其实自己也是在不断的摸索 觉得BLOG的作用应该和日记一样 把自己获取到的东西记录下来 到时候回头看能找到自己跌倒过的地方避免再次跌倒 同时别人看的时候也能避开这些地方 所以在自己开始写服务程序一段时间之后 把原来的代码整理一下一次次的发上来 从零开始完全手写一个服务,经过慢慢的演化 期间的收获还是很多的
最开始, 程序是从最简单的socket连接开始 只有一个进程,创建socket之后开始listen 然后accept连上来的链接 整个程序是阻塞的.当一个链接上来之后 其他链接是无法连入的 之后当程序处理完并close之后,后面的才能被处理 这个例子是不停的接受客户端发过来的字符并且显示在服务端的console 然后客户端也不会被close. 只是演示概念 外面一个循环是不停的去accept新的连接 里面一个是循环处理已经accept上来链接的数据 由于没有close,所以外面一个循环是没意义的.呵呵 只是大家知道概念就OK了.
- #include<sys/socket.h>
- #include<netinet/in.h>
- #include<stdio.h>
- #include<stdlib.h>
- #include<string.h>
- int main(){
- int listen_fd,accept_fd,flag;
- struct sockaddr_in my_addr,remote_addr;
- if ( (listen_fd = socket( AF_INET,SOCK_STREAM,0 )) == -1 ){
- perror("create socket error");
- exit(1);
- }
- if ( setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&flag,sizeof(flag)) == -1 ){
- perror("setsockopt error");
- }
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons(3389);
- my_addr.sin_addr.s_addr = INADDR_ANY;
- bzero(&(my_addr.sin_zero),8);
- if ( bind( listen_fd, (struct sockaddr *)&my_addr,
- sizeof(struct sockaddr_in)) == -1 ) {
- perror("bind error");
- exit(1);
- }
- if ( listen( listen_fd,1 ) == -1 ){
- perror("listen error");
- exit(1);
- }
- for(;;){
- int addr_len = sizeof( struct sockaddr_in );
- accept_fd = accept( listen_fd,
- (struct sockaddr *)&remote_addr,&addr_len );
- for(;;){
- char in_buf[1024];
- memset(in_buf, 0, 1024);
- recv( accept_fd ,&in_buf ,1024 ,0 );
- printf( "accept:%s\n", in_buf );
- }
- }
- return 0;
- }
从头编写高性能服务程序2-双进程阻塞
上一个模型的缺点就是 当链接数据没被处理完时, 整个服务是无法处理新链接的 这种比较适合单词处理速度很快 而且并发量=0的服务 也就是服务端和客户端1:1的样子 我丢给你请求,你给我数据,我再给你请求
这对于一个比较高性能的服务肯定是不可接受的 所以我们产生了一个想法 用父子进程,父进程管listen 有链接进来就fork一个子进程 子进程accept之后去处理.处理完之后就退出 注意,下面这段代码还是没close 所以上面所说的只能是做了一半 大家理解概念就好
- #include<sys/socket.h>
- #include<netinet/in.h>
- #include<stdio.h>
- #include<stdlib.h>
- #include<string.h>
- int main(){
- int listen_fd,accept_fd,flag;
- struct sockaddr_in my_addr,remote_addr;
- if ( (listen_fd = socket( AF_INET,SOCK_STREAM,0 )) == -1 ){
- perror("create socket error");
- exit(1);
- }
- if ( setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&flag,sizeof(flag)) == -1 ){
- perror("setsockopt error");
- }
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons(3389);
- my_addr.sin_addr.s_addr = INADDR_ANY;
- bzero(&(my_addr.sin_zero),8);
- if ( bind( listen_fd, (struct sockaddr *)&my_addr,
- sizeof(struct sockaddr_in)) == -1 ) {
- perror("bind error");
- exit(1);
- }
- if ( listen( listen_fd,1 ) == -1 ){
- perror("listen error");
- exit(1);
- }
- int pid;
- pid=fork();
- for(;;){
- if( pid==0 ){
- int addr_len = sizeof( struct sockaddr_in );
- accept_fd = accept( listen_fd,
- (struct sockaddr *)&remote_addr,&addr_len );
- for(;;){
- char in_buf[1024];
- memset(in_buf, 0, 1024);
- recv( accept_fd ,&in_buf ,1024 ,0 );
- printf( "accept:%s\n", in_buf );
- }
- }
- else{
- //do nothing
- }
- }
- return 0;
- }
fork产生子进程之后返回值 如果是0,那么这个进程就是子进程 如果-1,就是错误 否则就是产生子进程的pid,可以用这个pid来控制子进程,比如kill掉
大家实际用起来会发现和单进程阻塞一样 只能处理一个客户端连接.根本没好处么 呵呵.其实只是演示一下fork 然后好处还是有的.如果子进程因为处理request而挂了 主进程可以控制,然后再产生一个 所以这种样子比单进程要可靠一点 一个干活.一个监控
那么下一步就是对应每个链接产生进程
从头编写高性能服务程序3-多进程阻塞
多fork几次 那fork到底放在哪里呢? 因为阻塞的特性会中断进程的运行知道阻塞情况接触 所以我们把fork放在accept之后 这样有实际链接之后才会产生子进程 现在这个程序在多个客户端连上来之后都能有反应了
- #include<sys/socket.h>
- #include<netinet/in.h>
- #include<stdio.h>
- #include<stdlib.h>
- #include<string.h>
- int main(){
- int listen_fd,accept_fd,flag;
- struct sockaddr_in my_addr,remote_addr;
- if ( (listen_fd = socket( AF_INET,SOCK_STREAM,0 )) == -1 ){
- perror("create socket error");
- exit(1);
- }
- if ( setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&flag,sizeof(flag)) == -1 ){
- perror("setsockopt error");
- }
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons(3389);
- my_addr.sin_addr.s_addr = INADDR_ANY;
- bzero(&(my_addr.sin_zero),8);
- if ( bind( listen_fd, (struct sockaddr *)&my_addr,
- sizeof(struct sockaddr_in)) == -1 ) {
- perror("bind error");
- exit(1);
- }
- if ( listen( listen_fd,1 ) == -1 ){
- perror("listen error");
- exit(1);
- }
- int pid;
- int addr_len = sizeof( struct sockaddr_in );
- for(;;){
- accept_fd = accept( listen_fd,
- (struct sockaddr *)&remote_addr,&addr_len );
- pid=fork();
- if( pid==0 ){
- for(;;){
- char in_buf[1024];
- memset(in_buf, 0, 1024);
- recv( accept_fd ,&in_buf ,1024 ,0 );
- printf( "accept:%s\n", in_buf );
- }
- }
- else{
- //manager the process
- }
- }
- return 0;
- }
从头编写高性能服务程序4-单进程非阻塞select
前面都是阻塞的 当我们在accept或者recv的时候 文件描述符如果没有就绪 整个进程就会被挂起 浪费啊…如果只有就绪的时候才去处理,然后调用之后立刻返回 我们就可以继续做下面的工作了 所以我们引入非阻塞概念 记住,试用setsockopt是无法设置socket接口非阻塞的 只能用fd专用的fcntl去设置fd的非阻塞性质 需要include头文件fcntl.h
还有一点,select所监听的fd_set 在每次select之后就会被清掉 所以一开始我发现每次accept一个数据之后就再也获取不到数据了 然后才发现这点,而这个在我找的资料里面都没提到 所以要把FD_SET放在每次循环里面 我想这也是select性能比较低的原因之一
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <fcntl.h>
- int main(){
- int listen_fd,accept_fd,flag;
- struct sockaddr_in my_addr,remote_addr;
- if ( (listen_fd = socket( AF_INET,SOCK_STREAM,0 )) == -1 ){
- perror("create socket error");
- exit(1);
- }
- if ( setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&flag,sizeof(flag)) == -1 ){
- perror("setsockopt error");
- }
- int flags = fcntl(listen_fd, F_GETFL, 0);
- fcntl(listen_fd, F_SETFL, flags|O_NONBLOCK);
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons(3389);
- my_addr.sin_addr.s_addr = INADDR_ANY;
- bzero(&(my_addr.sin_zero),8);
- if ( bind( listen_fd, (struct sockaddr *)&my_addr,
- sizeof(struct sockaddr_in)) == -1 ) {
- perror("bind error");
- exit(1);
- }
- if ( listen( listen_fd,1 ) == -1 ){
- perror("listen error");
- exit(1);
- }
- fd_set fd_sets;
-
- int max_fd = listen_fd ;
- int events=0;
- int accept_fds[1024];
- int i = 0;
-
- for(;;){
- FD_ZERO( &fd_sets );
- FD_SET(listen_fd,&fd_sets);
- int k=0;
- for(k=0; k<=i; k++){
- FD_SET(accept_fds[k],&fd_sets);
- }
- events = select( max_fd + 1, &fd_sets, NULL, NULL, NULL );
- if( events ){
- printf("events:%d\n",events);
- if( FD_ISSET(listen_fd,&fd_sets) ){
- printf("listen event :1\n");
- int addr_len = sizeof( struct sockaddr_in );
- accept_fd = accept( listen_fd,
- (struct sockaddr *)&remote_addr,&addr_len );
- int flags = fcntl(accept_fd, F_GETFL, 0);
- fcntl(accept_fd, F_SETFL, flags|O_NONBLOCK);
- accept_fds[i] = accept_fd;
- i++;
- max_fd = accept_fd ;
- printf("new accept fd:%d\n",accept_fd);
- }
- int j=0;
- for( j=0; j<=i; j++ ){
- if( FD_ISSET(accept_fds[j],&fd_sets) ){
- printf("accept event :%d\n",j);
- char in_buf[1024];
- memset(in_buf, 0, 1024);
- recv( accept_fds[j] ,&in_buf ,1024 ,0 );
- printf( "accept:%s\n", in_buf );
- }
- }
- }
- }
- return 0;
- }
从头编写高性能服务程序5-单进程非阻塞epoll
select已经是很古老的非阻塞复用I/O方式了 Linux下最常用的IO复用是epoll了 所以我们用单进程来实验epoll epoll_wait和select一样 都是用一个fd的阻塞来代替一堆fd
下面程序有一个问题 由于使用了ET模式 所以当listen_fd有客户端连接上来时 只会被通知一次 如果是并发连接,如果只accept了一次 则第二个链接会被忽略 也就是真正高并发情况下.会出现丢客户端请求的情况
由于使用了复用IO 所以即使是单进程也能处理多客户端连接了
- #include<sys/socket.h>
- #include<netinet/in.h>
- #include<stdio.h>
- #include<stdlib.h>
- #include<string.h>
- #include <fcntl.h>
- #include<sys/epoll.h>
- int main(){
- int listen_fd,accept_fd,flag;
- struct sockaddr_in my_addr,remote_addr;
- if ( (listen_fd = socket( AF_INET,SOCK_STREAM,0 )) == -1 ){
- perror("create socket error");
- exit(1);
- }
- if ( setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&flag,sizeof(flag)) == -1 ){
- perror("setsockopt error");
- }
- int flags = fcntl(listen_fd, F_GETFL, 0);
- fcntl(listen_fd, F_SETFL, flags|O_NONBLOCK);
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons(3389);
- my_addr.sin_addr.s_addr = INADDR_ANY;
- bzero(&(my_addr.sin_zero),8);
- if ( bind( listen_fd, (struct sockaddr *)&my_addr,
- sizeof(struct sockaddr_in)) == -1 ) {
- perror("bind error");
- exit(1);
- }
- if ( listen( listen_fd,1 ) == -1 ){
- perror("listen error");
- exit(1);
- }
- struct epoll_event ev,events[20];
- int epfd = epoll_create(256);
- int ev_s=0;
- ev.data.fd=listen_fd;
- ev.events=EPOLLIN|EPOLLET;
- epoll_ctl(epfd,EPOLL_CTL_ADD,listen_fd,&ev);
- for(;;){
- ev_s = epoll_wait( epfd,events,20,500 );
- int i=0;
- for(i=0; i<ev_s; i++){
- if(events[i].data.fd==listen_fd){
- printf("listen event :1\n");
- int addr_len = sizeof( struct sockaddr_in );
- accept_fd = accept( listen_fd,
- (struct sockaddr *)&remote_addr,&addr_len );
- int flags = fcntl(accept_fd, F_GETFL, 0);
- fcntl(accept_fd, F_SETFL, flags|O_NONBLOCK);
- ev.data.fd=accept_fd;
- ev.events=EPOLLIN|EPOLLET;
- epoll_ctl(epfd,EPOLL_CTL_ADD,accept_fd,&ev);
- printf("new accept fd:%d\n",accept_fd);
- }
- else if(events[i].events&EPOLLIN){
- printf("accept event :%d\n",i);
- char in_buf[1024];
- memset(in_buf, 0, 1024);
- recv( events[i].data.fd ,&in_buf ,1024 ,0 );
- printf( "accept:%s\n", in_buf );
- }
- }
- }
- return 0;
- }
从头编写高性能服务程序6-多进程阻塞完整版
前面所有的程序都是缺少链接的后期管理的 我们在多进程的情况下测试子进程处理完后自行退出 而主进程继续监听的情况 一开始我认为只要子进程处理完之后exit就可以了 结果碰到了问题 子进程在主进程未退出的情况下先行退出 会变成僵尸进程 反之则会变成孤儿进程
僵尸进程的表现是子进程在ps下后面多个
defunct
解决方法是在主进程这里需要wait 用来处理子进程的各种状态
还有我们加上了客户端退出时候的处理 当客户端退出时 我们recv到的数据返回值会是0 而之前会被阻塞在那边 所以加上对recv的返回值0的处理就可以了
- #include<sys/socket.h>
- #include<netinet/in.h>
- #include<stdio.h>
- #include<stdlib.h>
- #include<string.h>
- int main(){
- int listen_fd,accept_fd,flag;
- struct sockaddr_in my_addr,remote_addr;
- if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1){
- perror("create socket error");
- exit(1);
- }
- if (setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR
- ,(char *)&flag,sizeof(flag)) == -1){
- perror("setsockopt error");
- }
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons(3389);
- my_addr.sin_addr.s_addr = INADDR_ANY;
- bzero(&(my_addr.sin_zero), 8);
- if (bind(listen_fd, (struct sockaddr *)&my_addr,
- sizeof(struct sockaddr_in)) == -1) {
- perror("bind error");
- exit(1);
- }
- if (listen(listen_fd,1) == -1){
- perror("listen error");
- exit(1);
- }
- int pid=-1;
- int addr_len = sizeof(struct sockaddr_in);
- int max_process_num = 10;
- int i;
- int child_process_status;
- for(i = 0; i< max_process_num; i++){
- if(pid != 0){
- pid = fork();
- }
- }
- if(pid == 0){
- for(;;){
- accept_fd = accept(listen_fd
- ,(struct sockaddr *)&remote_addr,&addr_len);
- int recv_num;
- for(;;){
- char in_buf[1024];
- memset(in_buf, 0, 1024);
- recv_num = recv(accept_fd ,&in_buf ,1024 ,0);
- if(recv_num == 0){
- printf("close socket\n");
- close(accept_fd);
- break;
- }
- else{
- printf("accept:%d:%s\n", recv_num, in_buf);
- }
- }
- }
- }
- else{
- //manager the process
- wait(&child_process_status);
- }
-
- return 0;
- }
从头编写高性能服务程序7-多进程非阻塞epoll版
我们有了前面的所有经验 多进程,非阻塞,IO复用 然后我们要把这些特定揉合起来了 先是在accept上阻塞 然后当有新链接过来就会fork 当然这样一个结构并不比多进程阻塞版本好太多 因为每个进程虽然使用epoll来处理 但是一个进程内只有一个accept_fd在被处理 所以效果是一样的 在下一个版本中 我们将在一个进程使用epoll处理多个accept
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <fcntl.h>
- #include <sys/epoll.h>
- int main(){
- int listen_fd,accept_fd,flag;
- struct sockaddr_in my_addr,remote_addr;
- if ( (listen_fd = socket( AF_INET,SOCK_STREAM,0 )) == -1 ){
- perror("create socket error");
- exit(1);
- }
- if ( setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&flag,sizeof(flag)) == -1 ){
- perror("setsockopt error");
- }
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons(3389);
- my_addr.sin_addr.s_addr = INADDR_ANY;
- bzero(&(my_addr.sin_zero),8);
- if ( bind( listen_fd, (struct sockaddr *)&my_addr,
- sizeof(struct sockaddr_in)) == -1 ) {
- perror("bind error");
- exit(1);
- }
- if ( listen( listen_fd,1 ) == -1 ){
- perror("listen error");
- exit(1);
- }
- for(;;){
- int addr_len = sizeof( struct sockaddr_in );
- accept_fd = accept( listen_fd,
- (struct sockaddr *)&remote_addr,&addr_len );
- int flags = fcntl(accept_fd, F_GETFL, 0);
- fcntl(accept_fd, F_SETFL, flags|O_NONBLOCK);
-
- int pid = fork();
- if( pid == 0 ){
- struct epoll_event ev,events[20];
- int epfd = epoll_create(256);
- int ev_s=0;
-
- ev.data.fd=accept_fd;
- ev.events=EPOLLIN|EPOLLET;
- epoll_ctl(epfd,EPOLL_CTL_ADD,accept_fd,&ev);
- for(;;){
- ev_s = epoll_wait( epfd,events,20,500 );
- int i=0;
- for(i=0; i<ev_s; i++){
- if(events[i].events&EPOLLIN){
- printf("accept event :%d\n",i);
- char in_buf[1024];
- memset(in_buf, 0, 1024);
- recv( events[i].data.fd ,&in_buf ,1024 ,0 );
- printf( "accept:%s\n", in_buf );
- }
- }
- }
- }
- else{
- //do nothing
- }
-
- }
- return 0;
- }
从头编写高性能服务程序8-多进程非阻塞epoll-prefork
上一个版本在accept的位置仍旧会被阻塞 这样当一个链接进来的时候就会产生一个新的进程 进程的不断开启和释放会造成很大的性能影响 而一般Apache和Nginx的做法就是先产生N个进程用以备用 当有实际链接的时候,就由这些进程获取并进行处理 (注:Nginx的线程模式只在Windows下有效,在Linux下是使用进程模型的) 这样我们就有两个地方需要改造
第一个是将listen端口也变为非阻塞 这样当有新链接进来的时候我们得到通知才去处理 这样accept调用就不会被阻塞在进程导致进程无法进行后续的工作
第二是进程一启动之后就fork N个进程 这些进程启动之后就自行获取各自的accept 然后自行处理各自获取的链接并管理其生命周期内的所有内容
将listen也放置到epoll中 就需要在每次获得epoll events的时候判断下 这个events是前面放进去的listen,如果是listen就要accept 如果是accept的就进行数据传输处理
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <fcntl.h>
- #include <sys/epoll.h>
- int main(){
- int listen_fd,accept_fd,flag;
- struct sockaddr_in my_addr,remote_addr;
- if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1){
- perror("create socket error");
- exit(1);
- }
- if (setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR
- ,(char *)&flag,sizeof(flag)) == -1){
- perror("setsockopt error");
- }
- int flags = fcntl(listen_fd, F_GETFL, 0);
- fcntl(listen_fd, F_SETFL, flags|O_NONBLOCK);
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons(3389);
- my_addr.sin_addr.s_addr = INADDR_ANY;
- bzero(&(my_addr.sin_zero), 8);
- if (bind(listen_fd, (struct sockaddr *)&my_addr,
- sizeof(struct sockaddr_in)) == -1) {
- perror("bind error");
- exit(1);
- }
- if (listen(listen_fd,1) == -1){
- perror("listen error");
- exit(1);
- }
- int pid=-1;
- int addr_len = sizeof(struct sockaddr_in);
- int max_process_num = 3;
- int child_id;
- int i;
- int child_process_status;
- for(i = 0; i < max_process_num; i++){
- if(pid != 0){
- pid = fork();
- }
- else{
- child_id = i;
- }
- }
- if(pid == 0){
- int accept_handles = 0;
- struct epoll_event ev,events[20];
- int epfd = epoll_create(256);
- int ev_s = 0;
- ev.data.fd = listen_fd;
- ev.events = EPOLLIN|EPOLLET;
- epoll_ctl(epfd,EPOLL_CTL_ADD,listen_fd,&ev);
- for(;;){
- ev_s = epoll_wait( epfd,events, 20, 500 );
- int i = 0;
- for(i = 0; i<ev_s; i++){
- if(events[i].data.fd == listen_fd){
- int max_process_accept = 3;
- if(accept_handles < max_process_accept){
- accept_handles++;
- int addr_len = sizeof( struct sockaddr_in );
- accept_fd = accept( listen_fd,
- (struct sockaddr *)&remote_addr, &addr_len );
- int flags = fcntl(accept_fd, F_GETFL, 0);
- fcntl(accept_fd, F_SETFL, flags|O_NONBLOCK);
- ev.data.fd = accept_fd;
- ev.events = EPOLLIN|EPOLLET;
- epoll_ctl(epfd,EPOLL_CTL_ADD,accept_fd,&ev);
- printf("Child:%d,ProcessID:%d,EPOLLIN,fd:%d,accept:%d\n", child_id, getpid(), listen_fd, accept_fd);
- }
- }
- else if(events[i].events&EPOLLIN){
- char in_buf[1024];
- memset(in_buf, 0, 1024);
- int recv_num = recv( events[i].data.fd, &in_buf, 1024, 0 );
- if( recv_num ==0 ){
- close(events[i].data.fd);
- accept_handles--;
- printf("Child:%d,ProcessID:%d,EPOLLIN,fd:%d,closed\n", child_id, getpid(), events[i].data.fd);
- }
- else{
- printf("Child:%d,ProcessID:%d,EPOLLIN,fd:%d,recv:%s\n", child_id, getpid(), events[i].data.fd, in_buf);
- }
- }
- }
- }
- }
- else{
- //manager the process
- wait(&child_process_status);
- }
-
- return 0;
- }
这个问题就是多进程里面的群惊现象 当所有进程都阻塞在listen端口时 一旦系统获得listen端口的状态变化 会把所有的线程都唤醒,而只有一个获取后进行处理 其他的在被阻塞后继续沉睡 不过在Linux 2.4以后这个群惊的现象已经被内核给处理掉了 实际测试下来只有一个进程会被再次唤醒,而其他保持阻塞, 这样阻塞在listen端口其实效率不比阻塞在同享锁差.毕竟调度是在内核级别的.
从头编写高性能服务程序9-多进程非阻塞epoll-prefork-hook
整个基础结构已经基本确定了 接下来做一些细节工作 首先把一些函数抽取出来. 例如prefork独立出来.socket->bind->listen独立出来
这里我们引入一个新的思路 原先由统一的函数在epoll_wait之后对events里面的fd进行处理 但是每个fd可能需要处理的方式都不同. 怎么样针对不同的fd来调用特定的函数呢?
首先在epoll_event结构中有data成员 而data的定义如下
typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;
struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };
可见既可以在events里面放data.fd 也可以使用data.ptr来指向一个指针 当fd有消息时内核将对应的ev变量塞入events数组的时候 如果我们只是用fd来指向注册的,那么获取数据的时候只能得到对应的fd 这样使用什么函数来处理这个fd就需要另行判断
那么如果使用ptr来指向一个结构 而结构内保存了fd以及处理这个fd所使用的函数指针 那当我们得到events数组内的事件时 就可以直接调用ptr指向的函数指针了. 这就类似Nginx中的hook函数. 在Nginx中几乎任何一种事件都会绑定其处理函数 而由模块实现距离的函数,然后在hook上去.
那么下面的代码我们就模拟这个方法: 我们建立一个数据结构来保存每个fd以及对应的处理函数
struct event_handle{ int fd; int (* handle)(int fd); };
handle_hook是我们为每个fd注册的处理函数 当accept获得新的accept_fd之后 我们使用
ev_handles[accept_handles].handle = handle_hook
来将对应的函数注册到对应的events内 在fd得到通知的时候 使用
(*current_handle)(current_fd)
来进行处理
- #include <sys/socket.h>
- #include <sys/wait.h>
- #include <netinet/in.h>
- #include <sys/epoll.h>
- #include <sys/sendfile.h>
- #include <unistd.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <strings.h>
- #include <fcntl.h>
-
- int create_listen_fd(int port){
- int listen_fd;
- struct sockaddr_in my_addr;
- if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1){
- perror("create socket error");
- exit(1);
- }
- int flag;
- if (setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR
- ,(char *)&flag,sizeof(flag)) == -1){
- perror("setsockopt error");
- }
- int flags = fcntl(listen_fd, F_GETFL, 0);
- fcntl(listen_fd, F_SETFL, flags|O_NONBLOCK);
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons(port);
- my_addr.sin_addr.s_addr = INADDR_ANY;
- bzero(&(my_addr.sin_zero), 8);
- if (bind(listen_fd, (struct sockaddr *)&my_addr,
- sizeof(struct sockaddr_in)) == -1) {
- perror("bind error");
- exit(1);
- }
- if (listen(listen_fd,1) == -1){
- perror("listen error");
- exit(1);
- }
- return listen_fd;
- }
-
- int create_accept_fd(int listen_fd){
- int addr_len = sizeof( struct sockaddr_in );
- struct sockaddr_in remote_addr;
- int accept_fd = accept( listen_fd,
- (struct sockaddr *)&remote_addr, &addr_len );
- int flags = fcntl(accept_fd, F_GETFL, 0);
- fcntl(accept_fd, F_SETFL, flags|O_NONBLOCK);
- return accept_fd;
- }
-
- int fork_process(int process_num){
- int i;
- int pid=-1;
- for(i = 0; i < process_num; i++){
- if(pid != 0){
- pid = fork();
- }
- }
- return pid;
- }
-
- int handle_normal(int socket_fd){
- char in_buf[1024];
- memset(in_buf, 0, 1024);
- int recv_num = recv( socket_fd, &in_buf, 1024, 0 );
- if( recv_num ==0 ){
- close(socket_fd);
- printf("ProcessID:%d,EPOLLIN,fd:%d,closed\n", getpid(), socket_fd);
- }
- else{
- printf("ProcessID:%d,EPOLLIN,fd:%d,recv:%s\n", getpid(), socket_fd, in_buf);
- }
- return recv_num;
- }
-
- int handle_hook(int socket_fd){
- char in_buf[1024];
- memset(in_buf, 0, 1024);
- int recv_num = recv( socket_fd, &in_buf, 1024, 0 );
- if( recv_num ==0 ){
- close(socket_fd);
- printf("ProcessID:%d,EPOLLIN,fd:%d,closed\n", getpid(), socket_fd);
- }
- else{
- printf("ProcessID:%d,EPOLLIN,fd:%d,recv_num:%d;recv:", getpid(), socket_fd, recv_num);
- for (int i = 0; i<recv_num; i++){
- printf("%02x ",in_buf[i]);
- }
- printf("\n");
- }
- return recv_num;
- }
-
- struct event_handle{
- int fd;
- int (* handle)(int fd);
- };
- typedef int (* EVENT_HANDLE)(int);
- typedef struct event_handle * EH;
-
- int main(){
- int listen_fd = create_listen_fd(3389);
- int pid = fork_process(3);
- if(pid == 0){
- int accept_handles = 0;
- struct epoll_event ev,events[20];
- int epfd = epoll_create(256);
- int ev_s = 0;
-
- ev.data.fd = listen_fd;
- ev.events = EPOLLIN|EPOLLET;
- epoll_ctl(epfd,EPOLL_CTL_ADD,listen_fd,&ev);
- struct event_handle ev_handles[256];
- for(;;){
- ev_s = epoll_wait( epfd,events, 20, 500 );
- int i = 0;
- for(i = 0; i<ev_s; i++){
- if(events[i].data.fd == listen_fd){
- int max_process_accept = 3;
- if(accept_handles < max_process_accept){
- accept_handles++;
- int accept_fd = create_accept_fd(listen_fd);
- ev_handles[accept_handles].fd = accept_fd;
- ev_handles[accept_handles].handle = handle_hook;
- ev.data.ptr = &ev_handles[accept_handles];
- ev.events = EPOLLIN|EPOLLET;
- epoll_ctl(epfd,EPOLL_CTL_ADD,accept_fd,&ev);
- printf("ProcessID:%d,EPOLLIN,fd:%d,accept:%d\n", getpid(), listen_fd, accept_fd);
- }
- }
- else if(events[i].events&EPOLLIN){
- EVENT_HANDLE current_handle = ((EH)(events[i].data.ptr))->handle;
- int current_fd = ((EH)(events[i].data.ptr))->fd;
- if( (*current_handle)(current_fd) == 0){
- accept_handles--;
- }
- }
- else if(events[i].events&EPOLLOUT){
- //need add write event process
- }
- }
- }
- }
- else{
- //manager the process
- int child_process_status;
- wait(&child_process_status);
- }
-
- return 0;
- }
从头编写高性能服务程序10-请求解析
最终的形态基本上在上次的结构中就定型了 当然有些细节需要完善 不过基本上用这个结构来写service已经是OK了 那么现在就是继续细化这个结构.用来写个比较靠近实际的应用
有了链接的管理 接下来就是对通讯协议的实现 由于是从头开始写 所以协议也由我们自己来实现 先是对请求的解析 从客户端telnet传送过来的数据 回行是用/r/n结尾的 所以我们不停的接受数据 然后判断数据的最后是否是/r/n 如果是的话.就把它和以前的数据一起拼接起来 然后调用请求分析来解析指令
在event_handle结构中 我们加入了command数组 用来存放每次传输过来的数据 直至遇到以/r/n结尾的数据.然后拼接起来,输出,再清空这个数组 从头再接受新的指令
由于使用了epoll和非阻塞accept_fd 所以每次接受到的数据是零散的 需要将每次recv的数据连续的拼接到一个变量中 这就是command数组存在的理由 而command_pos用来保存的是每次拼接后数组的实际存放数据的量 也可以认为是最后一个数据所在数组中的位置 便于下次拼接
- #include <sys/socket.h>
- #include <sys/wait.h>
- #include <netinet/in.h>
- #include <sys/epoll.h>
- #include <sys/sendfile.h>
- #include <unistd.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <strings.h>
- #include <fcntl.h>
-
- typedef struct event_handle{
- int fd;
- int ( * handle )( struct event_handle * ev );
- char command[1024];
- int command_pos;
- } EV,* EH;
- typedef int ( * EVENT_HANDLE )( struct event_handle * ev );
-
-
- int create_listen_fd( int port ){
- int listen_fd;
- struct sockaddr_in my_addr;
- if ( ( listen_fd = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1 ){
- perror( "create socket error" );
- exit( 1 );
- }
- int flag;
- if ( setsockopt( listen_fd, SOL_SOCKET, SO_REUSEADDR
- , ( char * )&flag, sizeof( flag ) ) == -1 ){
- perror( "setsockopt error" );
- }
- int flags = fcntl( listen_fd, F_GETFL, 0 );
- fcntl( listen_fd, F_SETFL, flags|O_NONBLOCK );
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons( port );
- my_addr.sin_addr.s_addr = INADDR_ANY;
- bzero( &( my_addr.sin_zero ), 8 );
- if ( bind( listen_fd, ( struct sockaddr * )&my_addr,
- sizeof( struct sockaddr_in ) ) == -1 ) {
- perror( "bind error" );
- exit( 1 );
- }
- if ( listen( listen_fd, 1 ) == -1 ){
- perror( "listen error" );
- exit( 1 );
- }
- return listen_fd;
- }
-
- int create_accept_fd( int listen_fd ){
- int addr_len = sizeof( struct sockaddr_in );
- struct sockaddr_in remote_addr;
- int accept_fd = accept( listen_fd,
- ( struct sockaddr * )&remote_addr, &addr_len );
- int flags = fcntl( accept_fd, F_GETFL, 0 );
- fcntl( accept_fd, F_SETFL, flags|O_NONBLOCK );
- return accept_fd;
- }
-
- int fork_process( int process_num ){
- int i;
- int pid=-1;
- for( i = 0; i < process_num; i++ ){
- if( pid != 0 ){
- pid = fork();
- }
- }
- return pid;
- }
-
- int handle_hook_v2( EH ev ){
- char in_buf[1024];
- memset( in_buf, 0, 1024 );
- int recv_num = recv( ev->fd, &in_buf, 1024, 0 );
- if( recv_num ==0 ){
- printf( "ProcessID:%d, EPOLLIN, fd:%d, closed\n", getpid(), ev->fd );
- printf( " recved:%s\n", ev->command );
- close( ev->fd );
- }
- else{
- printf( "ProcessID:%d, EPOLLIN, fd:%d, recv_num:%d;recv:", getpid(), ev->fd, recv_num );
- int i;
- for( i = 0; i<recv_num; i++ ){
- printf( "%02x ", in_buf[i] );
- }
- printf( "\n" );
- memcpy( ev->command + ev->command_pos, in_buf, recv_num );
- ev->command_pos += recv_num;
- if( recv_num == 2 && ( !memcmp( &in_buf[recv_num-2], "\r\n", 2 ) ) ){
- printf( " recved:%s\n", ev->command );
- memset( ev->command, 0, 1024 );
- ev->command_pos = 0;
- }
- }
- return recv_num;
- }
-
-
-
- int main(){
- int listen_fd = create_listen_fd( 3389 );
- int pid = fork_process( 3 );
- if( pid == 0 ){
- int accept_handles = 0;
- struct epoll_event ev, events[20];
- int epfd = epoll_create( 256 );
- int ev_s = 0;
-
- ev.data.fd = listen_fd;
- ev.events = EPOLLIN|EPOLLET;
- epoll_ctl( epfd, EPOLL_CTL_ADD, listen_fd, &ev );
- struct event_handle ev_handles[256];
- for( ;; ){
- ev_s = epoll_wait( epfd, events, 20, 500 );
- int i = 0;
- for( i = 0; i<ev_s; i++ ){
- if( events[i].data.fd == listen_fd ){
- int max_process_accept = 3;
- if( accept_handles < max_process_accept ){
- accept_handles++;
- int accept_fd = create_accept_fd( listen_fd );
- ev_handles[accept_handles].fd = accept_fd;
- ev_handles[accept_handles].handle = handle_hook_v2;
- ev_handles[accept_handles].command_pos = 0;
- memset( ev_handles[accept_handles].command, 0, 1024 );
- ev.data.ptr = &ev_handles[accept_handles];
- ev.events = EPOLLIN|EPOLLET;
- epoll_ctl( epfd, EPOLL_CTL_ADD, accept_fd, &ev );
- printf( "ProcessID:%d, EPOLLIN, fd:%d, accept:%d\n", getpid(), listen_fd, accept_fd );
- }
- }
- else if( events[i].events&EPOLLIN ){
- EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->handle;
- EH current_event = ( EH )( events[i].data.ptr );
- if( ( *current_handle )( current_event ) == 0 ){
- accept_handles--;
- }
- }
- else if( events[i].events&EPOLLOUT ){
- //need add write event process
- }
- }
- }
- }
- else{
- //manager the process
- int child_process_status;
- wait( &child_process_status );
- }
-
- return 0;
- }
从头编写高性能服务程序11-指令处理&sendfile
实现命令的获取之后 现在是增加对command的解析以及对应的反馈 为了做些稍微有意义的事情.我把这个service做的工作定位在以下内容 1.查询文件大小 2.返回文件内容 3.删除文件
协议的格式如下 请求模式:文件名称 例如 1:new.txt
请求模式见源代码中的宏定义
这次代码也对原来的程序作了一定的修改 从丑陋的代码渐渐修改. 希望在最终程序完成的时候 能够有比较好的代码风格
这个版本中有一个bug 就是sendfile只是调用一次 而实际上如果是较大的文件 需要在判断EPLLOUT之后不停的sendfile 直到EAGAIN遇见accept_fd阻塞为止 这样直至下次EPOLLOUT发生 再从上次暂停的位置继续发送 下一个版本中将会有这个BUG的修正
- #include <sys/socket.h>
- #include <sys/wait.h>
- #include <netinet/in.h>
- #include <netinet/tcp.h>
- #include <sys/epoll.h>
- #include <sys/sendfile.h>
- #include <sys/stat.h>
- #include <unistd.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <strings.h>
- #include <fcntl.h>
-
- #define HANDLE_INFO 1
- #define HANDLE_SEND 2
- #define HANDLE_DEL 3
- #define HANDLE_CLOSE 4
-
- #define MAX_REQLEN 1024
- #define MAX_PROCESS_CONN 3
- #define FIN_CHAR 0x00
- #define SUCCESS 0
- #define ERROR -1
-
- typedef struct event_handle{
- int socket_fd;
- int file_fd;
- char request[MAX_REQLEN];
- int request_len;
- int ( * handle )( struct event_handle * ev );
- int handle_method;
-
- } EV,* EH;
- typedef int ( * EVENT_HANDLE )( struct event_handle * ev );
-
- int create_listen_fd( int port ){
- int listen_fd;
- struct sockaddr_in my_addr;
- if( ( listen_fd = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1 ){
- perror( "create socket error" );
- exit( 1 );
- }
- int flag;
- int olen = sizeof(int);
- if( setsockopt( listen_fd, SOL_SOCKET, SO_REUSEADDR
- , (const void *)&flag, olen ) == -1 ){
- perror( "setsockopt error" );
- }
- flag = 1;
- if( setsockopt( listen_fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &flag, olen ) == -1 ){
- perror( "setsockopt error" );
- }
- int flags = fcntl( listen_fd, F_GETFL, 0 );
- fcntl( listen_fd, F_SETFL, flags|O_NONBLOCK );
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons( port );
- my_addr.sin_addr.s_addr = INADDR_ANY;
- bzero( &( my_addr.sin_zero ), 8 );
- if( bind( listen_fd, ( struct sockaddr * )&my_addr,
- sizeof( struct sockaddr_in ) ) == -1 ) {
- perror( "bind error" );
- exit( 1 );
- }
- if( listen( listen_fd, 1 ) == -1 ){
- perror( "listen error" );
- exit( 1 );
- }
- return listen_fd;
- }
-
- int create_accept_fd( int listen_fd ){
- int addr_len = sizeof( struct sockaddr_in );
- struct sockaddr_in remote_addr;
- int accept_fd = accept( listen_fd,
- ( struct sockaddr * )&remote_addr, &addr_len );
- int flags = fcntl( accept_fd, F_GETFL, 0 );
- fcntl( accept_fd, F_SETFL, flags|O_NONBLOCK );
- return accept_fd;
- }
-
- int fork_process( int process_num ){
- int i;
- int pid=-1;
- for( i = 0; i < process_num; i++ ){
- if( pid != 0 ){
- pid = fork();
- }
- }
- return pid;
- }
-
- int init_evhandle(EH ev,int socket_fd,EVENT_HANDLE handle){
- ev->socket_fd = socket_fd;
- ev->handle = handle;
- ev->request_len = 0;
- ev->handle_method = 0;
- memset( ev->request, 0, 1024 );
- }
- //accept->accept_queue->request->request_queue->output->output_queue
- //multi process sendfile
- int parse_request(EH ev){
- ev->request_len--;
- *( ev->request + ev->request_len - 1 ) = 0x00;
- int i;
- for( i=0; i<ev->request_len; i++ ){
- if( ev->request[i] == ':' ){
- ev->request_len = ev->request_len-i-1;
- char temp[MAX_REQLEN];
- memcpy( temp, ev->request, i );
- ev->handle_method = atoi( temp );
- memcpy( temp, ev->request+i+1, ev->request_len );
- memcpy( ev->request, temp, ev->request_len );
- break;
- }
- }
- handle_request( ev );
- return SUCCESS;
- }
-
- int handle_request(EH ev){
- struct stat file_info;
- switch( ev->handle_method ){
- case HANDLE_INFO:
- ev->file_fd = open( ev->request, O_RDONLY );
- if( ev->file_fd == -1 ){
- send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 );
- return -1;
- }
- fstat(ev->file_fd, &file_info);
- char info[MAX_REQLEN];
- sprintf(info,"file len:%d\n",file_info.st_size);
- send( ev->socket_fd, info, strlen( info ), 0 );
- break;
- case HANDLE_SEND:
- ev->file_fd = open( ev->request, O_RDONLY );
- if( ev->file_fd == -1 ){
- send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 );
- return -1;
- }
- fstat(ev->file_fd, &file_info);
- sendfile( ev->socket_fd, ev->file_fd, 0, file_info.st_size );
- break;
- case HANDLE_DEL:
- break;
- case HANDLE_CLOSE:
- break;
- }
- finish_request( ev );
- return SUCCESS;
- }
-
- int finish_request(EH ev){
- close(ev->socket_fd);
- close(ev->file_fd);
- ev->handle_method = -1;
- clean_request( ev );
- return SUCCESS;
- }
-
- int clean_request(EH ev){
- memset( ev->request, 0, MAX_REQLEN );
- ev->request_len = 0;
- }
-
- int handle_hook_v2( EH ev ){
- char in_buf[MAX_REQLEN];
- memset( in_buf, 0, MAX_REQLEN );
- int recv_num = recv( ev->socket_fd, &in_buf, MAX_REQLEN, 0 );
- if( recv_num ==0 ){
- close( ev->socket_fd );
- return ERROR;
- }
- else{
- //check ifoverflow
- if( ev->request_len > MAX_REQLEN-recv_num ){
- close( ev->socket_fd );
- clean_request( ev );
- }
- memcpy( ev->request + ev->request_len, in_buf, recv_num );
- ev->request_len += recv_num;
- if( recv_num == 2 && ( !memcmp( &in_buf[recv_num-2], "\r\n", 2 ) ) ){
- parse_request(ev);
- }
- }
- return recv_num;
- }
-
- int main(){
- int listen_fd = create_listen_fd( 3389 );
- int pid = fork_process( 3 );
- if( pid == 0 ){
- int accept_handles = 0;
- struct epoll_event ev, events[20];
- int epfd = epoll_create( 256 );
- int ev_s = 0;
-
- ev.data.fd = listen_fd;
- ev.events = EPOLLIN|EPOLLET;
- epoll_ctl( epfd, EPOLL_CTL_ADD, listen_fd, &ev );
- struct event_handle ev_handles[256];
- for( ;; ){
- ev_s = epoll_wait( epfd, events, 20, 500 );
- int i = 0;
- for( i = 0; i<ev_s; i++ ){
- if( events[i].data.fd == listen_fd ){
- if( accept_handles < MAX_PROCESS_CONN ){
- accept_handles++;
- int accept_fd = create_accept_fd( listen_fd );
- init_evhandle(&ev_handles[accept_handles],accept_fd,handle_hook_v2);
- ev.data.ptr = &ev_handles[accept_handles];
- ev.events = EPOLLIN|EPOLLET;
- epoll_ctl( epfd, EPOLL_CTL_ADD, accept_fd, &ev );
- }
- }
- else if( events[i].events&EPOLLIN ){
- EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->handle;
- EH current_event = ( EH )( events[i].data.ptr );
- if( ( *current_handle )( current_event ) == 0 ){
- accept_handles--;
- }
- }
- else if( events[i].events&EPOLLOUT ){
- //need add write event process
- }
- }
- }
- }
- else{
- //manager the process
- int child_process_status;
- wait( &child_process_status );
- }
-
- return SUCCESS;
- }
从头编写高性能服务程序12-区分读写事件
前一个版本很重要的BUG 就是sendfile在发送大文件的时候会发送不完整 这个bug指出了另一需求 就是需要用到EPOLLOUT事件
前面版本我们事件处理都是在EPOLLIN中进行 当有accept_fd数据进来之后 我们判断指令的内容 再直接进行数据的处理
我们更换一种方式 当accept_fd获取到数据之后 解析数据. 当需要输出的时候,将accept_fd的事件变为EPOLLOUT
这样一种fd会有两种状态 接受指令状态以及输出数据状态 慢慢的.我们会发现这个程序越来越像Nginx或者Lighttpd了 因为一个连接会有不同的状态 事件+状态机就是Nginx以及Lighttpd高效的原因 将一个链接分成不同的生命周期然后处理
经过进化 我们的event_handle结构拥有了读写两种hook钩子
typedef struct event_handle{ ... int ( * read_handle )( struct event_handle * ev ); int ( * write_handle )( struct event_handle * ev ); ... }
而我们在针对的处理过程中 初始化时会针对不同的事件挂上不同的钩子函数
int init_evhandle(...){ ... ev->read_handle = r_handle; ev->write_handle = w_handle; ... }
接着在事件发生时调用不同的钩子函数
else if( events[i].events&EPOLLIN ){ EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->read_handle; ... } else if( events[i].events&EPOLLOUT ){ EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->write_handle; ... }
当分析完指令的时候 将fd变为EPOLLOUT
int parse_request( ... ev_temp.data.ptr = ev; ev_temp.events = EPOLLOUT|EPOLLET; epoll_ctl( ev->epoll_fd, EPOLL_CTL_MOD, ev->socket_fd, &ev_temp ); ... }
加入这些处理之后 代码越来越长了 我们还需要加很多东西 比如进程管理,等等 这之后会发现一个mini的nginx骨架或者lighttpd骨架出现了
- #include <sys/socket.h>
- #include <sys/wait.h>
- #include <netinet/in.h>
- #include <netinet/tcp.h>
- #include <sys/epoll.h>
- #include <sys/sendfile.h>
- #include <sys/stat.h>
- #include <unistd.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <strings.h>
- #include <fcntl.h>
- #include <errno.h>
-
- #define HANDLE_INFO 1
- #define HANDLE_SEND 2
- #define HANDLE_DEL 3
- #define HANDLE_CLOSE 4
-
- #define MAX_REQLEN 1024
- #define MAX_PROCESS_CONN 3
- #define FIN_CHAR 0x00
- #define SUCCESS 0
- #define ERROR -1
-
- typedef struct event_handle{
- int socket_fd;
- int file_fd;
- int file_pos;
- int epoll_fd;
- char request[MAX_REQLEN];
- int request_len;
- int ( * read_handle )( struct event_handle * ev );
- int ( * write_handle )( struct event_handle * ev );
- int handle_method;
- } EV,* EH;
- typedef int ( * EVENT_HANDLE )( struct event_handle * ev );
-
- int create_listen_fd( int port ){
- int listen_fd;
- struct sockaddr_in my_addr;
- if( ( listen_fd = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1 ){
- perror( "create socket error" );
- exit( 1 );
- }
- int flag;
- int olen = sizeof(int);
- if( setsockopt( listen_fd, SOL_SOCKET, SO_REUSEADDR
- , (const void *)&flag, olen ) == -1 ){
- perror( "setsockopt error" );
- }
- flag = 5;
- if( setsockopt( listen_fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &flag, olen ) == -1 ){
- perror( "setsockopt error" );
- }
- flag = 1;
- if( setsockopt( listen_fd, IPPROTO_TCP, TCP_CORK, &flag, olen ) == -1 ){
- perror( "setsockopt error" );
- }
- int flags = fcntl( listen_fd, F_GETFL, 0 );
- fcntl( listen_fd, F_SETFL, flags|O_NONBLOCK );
- my_addr.sin_family = AF_INET;
- my_addr.sin_port = htons( port );
- my_addr.sin_addr.s_addr = INADDR_ANY;
- bzero( &( my_addr.sin_zero ), 8 );
- if( bind( listen_fd, ( struct sockaddr * )&my_addr,
- sizeof( struct sockaddr_in ) ) == -1 ) {
- perror( "bind error" );
- exit( 1 );
- }
- if( listen( listen_fd, 1 ) == -1 ){
- perror( "listen error" );
- exit( 1 );
- }
- return listen_fd;
- }
-
- int create_accept_fd( int listen_fd ){
- int addr_len = sizeof( struct sockaddr_in );
- struct sockaddr_in remote_addr;
- int accept_fd = accept( listen_fd,
- ( struct sockaddr * )&remote_addr, &addr_len );
- int flags = fcntl( accept_fd, F_GETFL, 0 );
- fcntl( accept_fd, F_SETFL, flags|O_NONBLOCK );
- return accept_fd;
- }
-
- int fork_process( int process_num ){
- int i;
- int pid=-1;
- for( i = 0; i < process_num; i++ ){
- if( pid != 0 ){
- pid = fork();
- }
- }
- return pid;
- }
-
- int init_evhandle(EH ev,int socket_fd,int epoll_fd,EVENT_HANDLE r_handle,EVENT_HANDLE w_handle){
- ev->epoll_fd = epoll_fd;
- ev->socket_fd = socket_fd;
- ev->read_handle = r_handle;
- ev->write_handle = w_handle;
- ev->file_pos = 0;
- ev->request_len = 0;
- ev->handle_method = 0;
- memset( ev->request, 0, 1024 );
- }
- //accept->accept_queue->request->request_queue->output->output_queue
- //multi process sendfile
- int parse_request(EH ev){
- ev->request_len--;
- *( ev->request + ev->request_len - 1 ) = 0x00;
- int i;
- for( i=0; i<ev->request_len; i++ ){
- if( ev->request[i] == ':' ){
- ev->request_len = ev->request_len-i-1;
- char temp[MAX_REQLEN];
- memcpy( temp, ev->request, i );
- ev->handle_method = atoi( temp );
- memcpy( temp, ev->request+i+1, ev->request_len );
- memcpy( ev->request, temp, ev->request_len );
- break;
- }
- }
- //handle_request( ev );
- //register to epoll EPOLLOUT
-
- struct epoll_event ev_temp;
- ev_temp.data.ptr = ev;
- ev_temp.events = EPOLLOUT|EPOLLET;
- epoll_ctl( ev->epoll_fd, EPOLL_CTL_MOD, ev->socket_fd, &ev_temp );
- return SUCCESS;
- }
-
- int handle_request(EH ev){
- struct stat file_info;
- switch( ev->handle_method ){
- case HANDLE_INFO:
- ev->file_fd = open( ev->request, O_RDONLY );
- if( ev->file_fd == -1 ){
- send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 );
- return -1;
- }
- fstat(ev->file_fd, &file_info);
- char info[MAX_REQLEN];
- sprintf(info,"file len:%d\n",file_info.st_size);
- send( ev->socket_fd, info, strlen( info ), 0 );
- break;
- case HANDLE_SEND:
- ev->file_fd = open( ev->request, O_RDONLY );
- if( ev->file_fd == -1 ){
- send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 );
- return -1;
- }
- fstat(ev->file_fd, &file_info);
- sendfile( ev->socket_fd, ev->file_fd, 0, file_info.st_size );
- break;
- case HANDLE_DEL:
- break;
- case HANDLE_CLOSE:
- break;
- }
- finish_request( ev );
- return SUCCESS;
- }
-
- int finish_request(EH ev){
- close(ev->socket_fd);
- close(ev->file_fd);
- ev->handle_method = -1;
- clean_request( ev );
- return SUCCESS;
- }
-
- int clean_request(EH ev){
- memset( ev->request, 0, MAX_REQLEN );
- ev->request_len = 0;
- }
-
- int read_hook_v2( EH ev ){
- char in_buf[MAX_REQLEN];
- memset( in_buf, 0, MAX_REQLEN );
- int recv_num = recv( ev->socket_fd, &in_buf, MAX_REQLEN, 0 );
- if( recv_num ==0 ){
- close( ev->socket_fd );
- return ERROR;
- }
- else{
- //check ifoverflow
- if( ev->request_len > MAX_REQLEN-recv_num ){
- close( ev->socket_fd );
- clean_request( ev );
- }
- memcpy( ev->request + ev->request_len, in_buf, recv_num );
- ev->request_len += recv_num;
- if( recv_num == 2 && ( !memcmp( &in_buf[recv_num-2], "\r\n", 2 ) ) ){
- parse_request(ev);
- }
- }
- return recv_num;
- }
-
- int write_hook_v1( EH ev ){
- struct stat file_info;
- ev->file_fd = open( ev->request, O_RDONLY );
- if( ev->file_fd == ERROR ){
- send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 );
- return ERROR;
- }
- fstat(ev->file_fd, &file_info);
- int write_num;
- while(1){
- write_num = sendfile( ev->socket_fd, ev->file_fd, (off_t *)&ev->file_pos, 10240 );
- ev->file_pos += write_num;
- if( write_num == ERROR ){
- if( errno == EAGAIN ){
- break;
- }
- }
- else if( write_num == 0 ){
- printf( "writed:%d\n", ev->file_pos );
- //finish_request( ev );
- break;
- }
- }
- return SUCCESS;
- }
-
- int main(){
- int listen_fd = create_listen_fd( 3389 );
- int pid = fork_process( 3 );
- if( pid == 0 ){
- int accept_handles = 0;
- struct epoll_event ev, events[20];
- int epfd = epoll_create( 256 );
- int ev_s = 0;
-
- ev.data.fd = listen_fd;
- ev.events = EPOLLIN|EPOLLET;
- epoll_ctl( epfd, EPOLL_CTL_ADD, listen_fd, &ev );
- struct event_handle ev_handles[256];
- for( ;; ){
- ev_s = epoll_wait( epfd, events, 20, 500 );
- int i = 0;
- for( i = 0; i<ev_s; i++ ){
- if( events[i].data.fd == listen_fd ){
- if( accept_handles < MAX_PROCESS_CONN ){
- accept_handles++;
- int accept_fd = create_accept_fd( listen_fd );
- init_evhandle(&ev_handles[accept_handles],accept_fd,epfd,read_hook_v2,write_hook_v1);
- ev.data.ptr = &ev_handles[accept_handles];
- ev.events = EPOLLIN|EPOLLET;
- epoll_ctl( epfd, EPOLL_CTL_ADD, accept_fd, &ev );
- }
- }
- else if( events[i].events&EPOLLIN ){
- EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->read_handle;
- EH current_event = ( EH )( events[i].data.ptr );
- ( *current_handle )( current_event );
- }
- else if( events[i].events&EPOLLOUT ){
- EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->write_handle;
- EH current_event = ( EH )( events[i].data.ptr );
- if( ( *current_handle )( current_event ) == 0 ){
- accept_handles--;
- }
- }
- }
- }
- }
- else{
- //manager the process
- int child_process_status;
- wait( &child_process_status );
- }
-
- return SUCCESS;
- }
|