随笔 - 298  文章 - 377  trackbacks - 0
<2007年8月>
2930311234
567891011
12131415161718
19202122232425
2627282930311
2345678

常用链接

留言簿(34)

随笔分类

随笔档案

文章档案

相册

收藏夹

搜索

  •  

最新评论

阅读排行榜

评论排行榜

最近要做一个网络方面的小东东,基于C/S模式的。都说IOCP可以使系统达到最佳的性能,因此我就比划了两下,献丑了。抄书开始。
    从本质上说,完成端口模型要求创建一个windows完成端口对象,该对象通过指定数量的线程,对重叠I/O请求进行管理,以便为已经完成的重叠I/O请求提供服务。
    首先要创建一个I/O完成端口对象,用它面向任意数量的套接字句柄,管理多个I/O请求。调用以下函数创建完成端口对象:

HANDLE CreateIoCompletionPort(
  HANDLE FileHandle
,// 同IOCP关联在一起的套接字句柄
  HANDLE ExistingCompletionPort
,// IOCP句柄
  ULONG_PTR CompletionKey
,        // 完成健
  DWORD NumberOfConcurrentThreads // 在IOCP上,同时允许执行的线程数量
);

    该函数有两个作用:
    (1)创建一个完成端口对象
    (2)将一个句柄同完成端口关联到一起
    
    然后就要创建一定数量的工作者线程,以便在套接字的I/O请求投递给完成端口后,为完成端口提供服务。写文字描述很烦,还是看代码吧:

// NetServer3.cpp : Defines the entry point for the console application.
//

#include 
"stdafx.h"
#include 
"NetServer3.h"

#include 
<winsock2.h>
#pragma comment(lib, 
"ws2_32.lib")

#include 
<iostream>
using namespace std;

//////////////////////////////////////////////////////////////////////////

#ifdef _DEBUG
#define new DEBUG_NEW
#undef THIS_FILE
static char THIS_FILE[] = __FILE__;
#endif

//////////////////////////////////////////////////////////////////////////

// 单句柄数据
typedef struct tagPER_HANDLE_DATA
{
    SOCKET Socket;
    SOCKADDR_STORAGE ClientAddr;
    
// 将和这个句柄关联的其他有用信息,尽管放在这里面吧
}
PER_HANDLE_DATA, *LPPER_HANDLE_DATA;

// 但I/O操作数据
typedef struct tagPER_IO_DATA
{
    OVERLAPPED Overlapped;
    WSABUF DataBuf;
    
char buffer[1024];
    
int BufferLen;
    
int OperationType;   // 可以作为读写的标志,为简单,我忽略了
}
PER_IO_DATA, *LPPER_IO_DATA;

DWORD WINAPI ServerWorkerThread(LPVOID lpParam);

/////////////////////////////////////////////////////////////////////////////
// The one and only application object

CWinApp theApp;

using namespace std;

int _tmain(int argc, TCHAR* argv[], TCHAR* envp[])
{
    
int nRetCode = 0;

    
// initialize MFC and print and error on failure
    if (!AfxWinInit(::GetModuleHandle(NULL), NULL, ::GetCommandLine(), 0))
    
{
        
// TODO: change error code to suit your needs
        cerr << _T("Fatal Error: MFC initialization failed"<< endl;
        nRetCode 
= 1;
    }

    
else
    
{
        
// TODO: code your application's behavior here.
        CString strHello;
        strHello.LoadString(IDS_HELLO);
        cout 
<< (LPCTSTR)strHello << endl;
    }


//////////////////////////////////////////////////////////////////////////

    HANDLE CompletionPort;
    WSADATA wsd;
    SYSTEM_INFO SystemInfo;
    SOCKADDR_IN InternetAddr;
    SOCKET Listen;

    
// 加载WinSock2.2
    WSAStartup(MAKEWORD(22), &wsd);

    
// 1.创建一个I/O完成端口
    CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
                                            NULL,
                                            
0,
                                            
0);

    
// 2.确定系统中有多少个处理器
    GetSystemInfo(&SystemInfo);

    
// 3.基于系统中可用的处理器数量创建工作器线程
    for (int i = 0; i < SystemInfo.dwNumberOfProcessors; ++i)
    
{
        HANDLE ThreadHandle;

        
// 创建一个服务器的工作器线程,并将完成端口传递到该线程
        ThreadHandle = CreateThread(NULL,
                                    
0,
                                    ServerWorkerThread,
                                    CompletionPort,
                                    
0,
                                    NULL);

        CloseHandle(ThreadHandle);
    }


    
// 4.创建一个监听套接字,以下的套路都是固定的。
    Listen = WSASocket(AF_INET,
                       SOCK_STREAM,
                       
0,
                       NULL,
                       
0,
                       WSA_FLAG_OVERLAPPED);

    InternetAddr.sin_family 
= PF_INET;
    InternetAddr.sin_port 
= htons(5000);
    InternetAddr.sin_addr.s_addr 
= htonl(INADDR_ANY);

    bind(Listen, (SOCKADDR
*)&InternetAddr, sizeof(InternetAddr));

    listen(Listen, 
5);

    BOOL b 
= TRUE;

    
while (b)
    
{
        PER_HANDLE_DATA 
* PerHandleData = NULL;
        SOCKADDR_IN saRemote;
        SOCKET Accept;
        
int RemoteLen;

        
// 5.接收连接,并分配完成端口,这儿可以用AcceptEx来代替,以创
         // 建可伸缩的Winsock应用程序。

        RemoteLen = sizeof(saRemote);
        Accept 
= accept(Listen, (SOCKADDR*)&saRemote, &RemoteLen);

        
// 6.创建用来和套接字关联的单句柄数据信息结构
        PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, 
                                                       
sizeof(PER_HANDLE_DATA));

        cout 
<< "Socket number " << Accept << " connected" << endl;

        PerHandleData
->Socket = Accept;
        memcpy(
&PerHandleData->ClientAddr, &saRemote, RemoteLen);

        
// 7.将接受套接字和完成端口关联起来
        CreateIoCompletionPort((HANDLE)Accept,
                               CompletionPort,
                               (DWORD)PerHandleData,
                               
0);

        
// 开始在接受套接字上处理I/O
        
// 使用重叠I/O机制,在新建的套接字上投递一个或多个异步
         // WSARecv 或 WSASend请求。这些I/O请求完成后,工作者线程
         // 会为I/O请求提供服务,之后就可以坐享其成了

        static int const DATA_BUFSIZE = 4096; //

        DWORD RecvBytes 
= 0;
        DWORD Flags 
= 0;

        
// 单I/O操作数据
        LPPER_IO_DATA PerIoData = NULL;
        PerIoData 
= (LPPER_IO_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_DATA));
        ZeroMemory(
&(PerIoData->Overlapped), sizeof(OVERLAPPED));        

        PerIoData
->DataBuf.len = 1024;
        PerIoData
->DataBuf.buf = PerIoData->buffer;
        PerIoData
->OperationType = 0// read
        WSARecv(PerHandleData->Socket,
                
&(PerIoData->DataBuf),
                
1,
                
&RecvBytes,
                
&Flags,
                
&(PerIoData->Overlapped),
                NULL);
    }


//////////////////////////////////////////////////////////////////////////

    
return nRetCode;
}


//////////////////////////////////////////////////////////////////////////

DWORD WINAPI ServerWorkerThread(LPVOID lpParam)
{
    HANDLE CompletionPort 
= (HANDLE)lpParam;
    DWORD BytesTransferred;
    LPOVERLAPPED lpOverlapped;
    LPPER_HANDLE_DATA PerHandleData 
= NULL;
    LPPER_IO_DATA PerIoData 
= NULL;
    DWORD SendBytes;
    DWORD RecvBytes;
    DWORD Flags;
    BOOL bRet 
= FALSE;

    
while (TRUE)
    
{
        bRet 
= GetQueuedCompletionStatus(CompletionPort,
                                         
&BytesTransferred,
                                         (PULONG_PTR)
  
                                           &
PerHandleData,
                                         (LPOVERLAPPED
*)
                                           &lpOverlapped,
                                         INFINITE);

        
// 检查成功的返回,这儿要注意使用这个宏CONTAINING_RECORD
        PerIoData = (LPPER_IO_DATA)CONTAINING_RECORD(lpOverlapped, 
                                                     PER_IO_DATA, 
                                                     Overlapped);

        
// 先检查一下,看看是否在套接字上已有错误发生
        if (0 == BytesTransferred) 
        
{
            closesocket(PerHandleData
->Socket);
            GlobalFree(PerHandleData);
            GlobalFree(PerIoData);

            
continue;
        }


        
// 数据处理
        
// 成功了!!!这儿就收到了来自客户端的数据
        cout << PerIoData->DataBuf.buf << endl;

        Flags 
= 0;

        
// 为下一个重叠调用建立单I/O操作数据
        ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));

        PerIoData
->DataBuf.len = 1024;
        PerIoData
->DataBuf.buf = PerIoData->buffer;
        PerIoData
->OperationType = 0// read
        WSARecv(PerHandleData->Socket,
                
&(PerIoData->DataBuf),
                
1,
                
&RecvBytes,
                
&Flags,
                
&(PerIoData->Overlapped),
                NULL);
    }


    
return 0;
}


//////////////////////////////////////////////////////////////////////////




   当然为了测试,各种异常处理都没有写,大家不要学我哦。
posted on 2007-08-17 13:57 聂文龙 阅读(6646) 评论(5)  编辑 收藏 引用 所属分类: net work

FeedBack:
# re: 完成端口模型代码 2007-08-17 14:12 聂文龙
一起来分析一下最简单的完成端口代码

#include <winsock2.h>
#include <windows.h>
#include <stdio.h>
#include <assert.h>


#include "Socket5.h"

void main(void)
{
//变量声明
WSADATA wsaData;
DWORD Ret;
HANDLE CompletionPort;
SYSTEM_INFO SystemInfo;
DWORD i;
DWORD ThreadID;
SOCKET Listen;
SOCKADDR_IN InternetAddr;
SOCKET Accept;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
DWORD Flags;
DWORD RecvBytes;


//初始化WinSock2
if ((Ret = WSAStartup(0x0202, &wsaData)) != 0)
{
printf("WSAStartup failed with error %u\n", Ret);
return;
}
//创建完成端口对象
if((CompletionPort=CreateIoCompletionPort(INVALID_HANDLE_value,NULL,0,0))==NULL)
{
printf( "CreateIoCompletionPort failed with error: %u\n", GetLastError());
return;
}
//判断CPU数量
GetSystemInfo(&SystemInfo);
printf("您的机器有%d个CPU\n",SystemInfo.dwNumberOfProcessors);
//为每个CPU建立一个工作线程
for(i=0;i<SystemInfo.dwNumberOfProcessors;i++)
{
HANDLE ThreadHandle;
if((ThreadHandle=CreateThread(NULL,0,ServerWorkerThread,CompletionPort,0,&ThreadID)) == NULL)
{
printf("CreateThread() failed with error %u\n", GetLastError());
return;
}
CloseHandle(ThreadHandle);
}
//创建一个Socket用于监听服务端口
if((Listen=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED))==INVALID_SOCKET)
{
printf("WSASocket() failed with error %d\n", WSAGetLastError());
return;
}
//绑定监听端口
InternetAddr.sin_family=AF_INET;
InternetAddr.sin_addr.s_addr=htonl(INADDR_ANY);
InternetAddr.sin_port = htons(SOCKS_PORT);
if(bind(Listen,(PSOCKADDR)&InternetAddr,sizeof(InternetAddr))==SOCKET_ERROR)
{
printf("bind() failed with error %d\n", WSAGetLastError());
return;
}
//监听
if(listen(Listen,5)==SOCKET_ERROR)
{
printf("listen() failed with error %d\n", WSAGetLastError());
return;
}

printf("Server started at port : %d\n",SOCKS_PORT);

//接受每一个连接并将其关联到完成端口上
while(TRUE)
{
//接受连接
if((Accept=WSAAccept(Listen,NULL,NULL,NULL,0))==SOCKET_ERROR)
{
printf("WSAAccept() failed with error %d\n", WSAGetLastError());
return;
}
printf(" Client Socket number %d connected\n", Accept);

//创建包含接受的Socket信息的单句柄数据结构体
if((PerHandleData=(LPPER_HANDLE_DATA)GlobalAlloc(GPTR,sizeof(PER_HANDLE_DATA)))==NULL)
{
printf("GlobalAlloc() failed with error %u\n", GetLastError());
return;
}

//将Accept关联到完成端口

if(CreateIoCompletionPort((HANDLE)Accept,CompletionPort,(DWORD)PerHandleData,0)==NULL)
{
printf("CreateIoCompletionPort failed with error %u\n", GetLastError());
return;
}
//创建单I/O操作数据
if((PerIoData=(LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR,sizeof(PER_IO_OPERATION_DATA)))==NULL)
{
printf("GlobalAlloc() failed with error %u\n", GetLastError());
return;
}

PerHandleData->self=Accept;
PerHandleData->isClient=TRUE;
PerHandleData->SelfPerHandleData=PerHandleData;
PerHandleData->SelfPerIoData=PerIoData;

//接收客户端的版本信息
ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
PerIoData->Operation=OP_READ;
PerIoData->DataBuf.len = DATA_BUFSIZE;
PerIoData->DataBuf.buf = PerIoData->Buffer;

Flags = 0;
if((WSARecv(Accept,&(PerIoData->DataBuf),1,&RecvBytes,&Flags,&(PerIoData->Overlapped),NULL))==SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSARecv() failed with error %d\n", WSAGetLastError());
return;
}
}
}
}

DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)
{
HANDLE CompletionPort=(HANDLE)CompletionPortID;
DWORD BytesTransferred;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
DWORD Flags;
DWORD RecvBytes;

while(TRUE)
{
//使用GetQueuedCompletionStatus查询
if(GetQueuedCompletionStatus(CompletionPort,&BytesTransferred,(LPDWORD)&PerHandleData,(LPOVERLAPPED *)&PerIoData,INFINITE)==0)
{
int iError=GetLastError();
printf("GetQueuedCompletionStatus failed with error %u\n", iError);
if(iError==64)
{
continue;
}
else
{
printf("return 0\n");
return 0;
}
}
if(BytesTransferred==0)
{
//远端断开连接,关闭本机SOCKET
printf("Closing ");
if(PerHandleData->isClient==TRUE)
printf("Client");
else
printf("Dest");
printf(" socket %u\n", PerHandleData->self);

closesocket(PerHandleData->self);

//在此关闭和此SOCKET相关联的SOCKET
printf("Closing ");
if(PerHandleData->isClient==TRUE)
printf("Dest");
else
printf("Client");
printf(" socket %u\n", PerHandleData->other);
closesocket(PerHandleData->other);

GlobalFree(PerHandleData->OtherPerHandleData->SelfPerIoData);
GlobalFree(PerHandleData->OtherPerHandleData);

GlobalFree(PerHandleData);
GlobalFree(PerIoData);
continue;
}

switch(PerIoData->Operation)
{
case OP_READ:

WSARecv(PerHandleData->self,&PerHandleData->SelfPerIoData->DataBuf ,1,&RecvBytes,&Flags,&(PerHandleData->SelfPerIoData->Overlapped),NULL);
printf("%s\n",PerHandleData->SelfPerIoData->DataBuf.buf); //显示收到的数据
break;

case OP_WRITE:
printf("write...");
break;

case OP_ACCEPT:
printf("accept...");
break;
}//end of switch

}//end of while
return 0;
}//end of function

为什么我客户连接好以后 只能显示第一次发送的数据。。。以后发送数据就显示不出来了????

  回复  更多评论
  
# re: 完成端口模型代码 2007-08-17 14:13 聂文龙
//采用完成端口的代理服务器原型代码
http://www.vckbase.com/code/listcode.asp?mclsid=9&sclsid=901

---------------------------------------------------------------

http://www.vctop.com/View.Asp?ID=484&CateID=1
---------------------------------------------------------------

服务器程序:
http://www.cnxbb.com/bcb/xbb_server_iocp.rar

模拟多客户端程序
http://www.cnxbb.com/bcb/EchoClient.rar
---------------------------------------------------------------

// Compile:
//
// cl -o callback callback.cpp ws2_32.lib
//
// Command Line Options:
//
// callback.exe
//
// Note: There are no command line options for this sample.

//
// 阻塞模式+重叠模型+完成例程机制,王天平,2003-06-20
//

#include <winsock2.h>
#include <windows.h>
#include <stdio.h>

#define PORT 5150
#define DATA_BUFSIZE 8192

//套接字信息数组单元结构
typedef struct _SOCKET_INFORMATION {
OVERLAPPED Overlapped;//重叠结构
SOCKET Socket;//套接字
CHAR Buffer[DATA_BUFSIZE];//WSARecv/WSASend 数据缓冲区指针
WSABUF DataBuf;//WSARecv/WSASend 数据缓冲区
DWORD BytesSEND;//发送字节数
DWORD BytesRECV;//接收字节数
} SOCKET_INFORMATION, * LPSOCKET_INFORMATION;

void CALLBACK WorkerRoutine(DWORD Error, DWORD BytesTransferred,
LPWSAOVERLAPPED Overlapped, DWORD InFlags);

DWORD WINAPI WorkerThread(LPVOID lpParameter);

SOCKET AcceptSocket;

void main(void)
{
WSADATA wsaData;
SOCKET ListenSocket;
SOCKADDR_IN InternetAddr;
INT Ret;
HANDLE ThreadHandle;
DWORD ThreadId;
WSAEVENT AcceptEvent;

if ((Ret = WSAStartup(0x0202,&wsaData)) != 0)
{
printf("WSAStartup failed with error %d\n", Ret);
WSACleanup();
return;
}

if ((ListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0,
WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET)
{
printf("Failed to get a socket %d\n", WSAGetLastError());
return;
}

InternetAddr.sin_family = AF_INET;
InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
InternetAddr.sin_port = htons(PORT);

if (bind(ListenSocket, (PSOCKADDR) &InternetAddr,
sizeof(InternetAddr)) == SOCKET_ERROR)
{
printf("bind() failed with error %d\n", WSAGetLastError());
return;
}

if (listen(ListenSocket, 5))
{
printf("listen() failed with error %d\n", WSAGetLastError());
return;
}

if ((AcceptEvent = WSACreateEvent()) == WSA_INVALID_EVENT)
{
printf("WSACreateEvent() failed with error %d\n", WSAGetLastError());
return;
}

// Create a worker thread to service completed I/O requests.

if ((ThreadHandle = CreateThread(NULL, 0, WorkerThread, (LPVOID) AcceptEvent, 0, &ThreadId)) == NULL)
{
printf("CreateThread failed with error %d\n", GetLastError());
return;
}

while(TRUE)
{
AcceptSocket = accept(ListenSocket, NULL, NULL);

if (WSASetEvent(AcceptEvent) == FALSE)
{
printf("WSASetEvent failed with error %d\n", WSAGetLastError());
return;
}
}
}

DWORD WINAPI WorkerThread(LPVOID lpParameter)
{
DWORD Flags;
LPSOCKET_INFORMATION SocketInfo;
WSAEVENT EventArray[1];
DWORD Index;
DWORD RecvBytes;

// Save the accept event in the event array.

EventArray[0] = (WSAEVENT) lpParameter;

while(TRUE)
{
// Wait for accept() to signal an event and also process WorkerRoutine() returns.

while(TRUE)
{
Index = WSAWaitForMultipleEvents(1, EventArray, FALSE, WSA_INFINITE, TRUE);

if (Index == WSA_WAIT_FAILED)
{
printf("WSAWaitForMultipleEvents failed with error %d\n", WSAGetLastError());
return FALSE;
}

if (Index != WAIT_IO_COMPLETION)
{
// An accept() call event is ready - break the wait loop
break;
}
}

WSAResetEvent(EventArray[Index - WSA_WAIT_EVENT_0]);

// Create a socket information structure to associate with the accepted socket.

if ((SocketInfo = (LPSOCKET_INFORMATION) GlobalAlloc(GPTR,
sizeof(SOCKET_INFORMATION))) == NULL)
{
printf("GlobalAlloc() failed with error %d\n", GetLastError());
return FALSE;
}

// Fill in the details of our accepted socket.

SocketInfo->Socket = AcceptSocket;
ZeroMemory(&(SocketInfo->Overlapped), sizeof(WSAOVERLAPPED));
SocketInfo->BytesSEND = 0;
SocketInfo->BytesRECV = 0;
SocketInfo->DataBuf.len = DATA_BUFSIZE;
SocketInfo->DataBuf.buf = SocketInfo->Buffer;

Flags = 0;
if (WSARecv(SocketInfo->Socket, &(SocketInfo->DataBuf), 1, &RecvBytes, &Flags,
&(SocketInfo->Overlapped), WorkerRoutine) == SOCKET_ERROR)
{
if (WSAGetLastError() != WSA_IO_PENDING)
{
printf("WSARecv() failed with error %d\n", WSAGetLastError());
return FALSE;
}
}

printf("Socket %d connected\n", AcceptSocket);
}

return TRUE;
}

void CALLBACK WorkerRoutine(DWORD Error, DWORD BytesTransferred,
LPWSAOVERLAPPED Overlapped, DWORD InFlags)
{
DWORD SendBytes, RecvBytes;
DWORD Flags;

// Reference the WSAOVERLAPPED structure as a SOCKET_INFORMATION structure
LPSOCKET_INFORMATION SI = (LPSOCKET_INFORMATION) Overlapped;

if (Error != 0)
{
printf("I/O operation failed with error %d\n", Error);
}

if (BytesTransferred == 0)
{
printf("Closing socket %d\n", SI->Socket);
}

if (Error != 0 ¦ ¦ BytesTransferred == 0)
{
closesocket(SI->Socket);
GlobalFree(SI);
return;
}

// Check to see if the BytesRECV field equals zero. If this is so, then
// this means a WSARecv call just completed so update the BytesRECV field
// with the BytesTransferred value from the completed WSARecv() call.

if (SI->BytesRECV == 0)
{
SI->BytesRECV = BytesTransferred;
SI->BytesSEND = 0;
}
else
{
SI->BytesSEND += BytesTransferred;
}

if (SI->BytesRECV > SI->BytesSEND)
{

// Post another WSASend() request.
// Since WSASend() is not gauranteed to send all of the bytes requested,
// continue posting WSASend() calls until all received bytes are sent.

ZeroMemory(&(SI->Overlapped), sizeof(WSAOVERLAPPED));

SI->DataBuf.buf = SI->Buffer + SI->BytesSEND;
SI->DataBuf.len = SI->BytesRECV - SI->BytesSEND;

if (WSASend(SI->Socket, &(SI->DataBuf), 1, &SendBytes, 0,
&(SI->Overlapped), WorkerRoutine) == SOCKET_ERROR)
{
if (WSAGetLastError() != WSA_IO_PENDING)
{
printf("WSASend() failed with error %d\n", WSAGetLastError());
return;
}
}
}
else
{
SI->BytesRECV = 0;

// Now that there are no more bytes to send post another WSARecv() request.

Flags = 0;
ZeroMemory(&(SI->Overlapped), sizeof(WSAOVERLAPPED));

SI->DataBuf.len = DATA_BUFSIZE;
SI->DataBuf.buf = SI->Buffer;

if (WSARecv(SI->Socket, &(SI->DataBuf), 1, &RecvBytes, &Flags,
&(SI->Overlapped), WorkerRoutine) == SOCKET_ERROR)
{
if (WSAGetLastError() != WSA_IO_PENDING )
{
printf("WSARecv() failed with error %d\n", WSAGetLastError());
return;
}
}
}
}

  回复  更多评论
  
# re: 完成端口模型代码 2007-09-24 22:02 聂文龙
众所皆知,完成端口是在WINDOWS平台下效率最高,扩展性最好的IO模型,特别针对于WINSOCK的海量连接时,更能显示出其威力。其实建立一个完成端口的服务器也很简单,只要注意几个函数,了解一下关键的步骤也就行了。

这是篇完成端口入门级的文章,分为以下几步来说明完成端口:

函数
常见问题以及解答
步骤
例程
1、函数:

我们在完成端口模型下会使用到的最重要的两个函数是:
CreateIoCompletionPort、GetQueuedCompletionStatus

CreateIoCompletionPort 的作用是创建一个完成端口和把一个IO句柄和完成端口关联起来:

// 创建完成端口
HANDLE CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

// 把一个IO句柄和完成端口关联起来,这里的句柄是一个socket 句柄
CreateIoCompletionPort((HANDLE)sClient, CompletionPort, (DWORD)PerHandleData, 0);

其中第一个参数是句柄,可以是文件句柄、SOCKET句柄。
第二个就是我们上面创建出来的完成端口,这里就把两个东西关联在一起了。
第三个参数很关键,叫做PerHandleData,就是对应于每个句柄的数据块。我们可以使用这个参数在后面取到与这个SOCKET对应的数据。
最后一个参数给0,意思就是根据CPU的个数,允许尽可能多的线程并发执行。

GetQueuedCompletionStatus 的作用就是取得完成端口的结果:

// 从完成端口中取得结果
GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED*)&PerIoData, INFINITE)

第一个参数是完成端口
第二个参数是表明这次的操作传递了多少个字节的数据
第三个参数是OUT类型的参数,就是前面CreateIoCompletionPort传进去的单句柄数据,这里就是前面的SOCKET句柄以及与之相对应的数据,这里操作系统给我们返回,让我们不用自己去做列表查询等操作了。
第四个参数就是进行IO操作的结果,是我们在投递 WSARecv / WSASend 等操作时传递进去的,这里操作系统做好准备后,给我们返回了。非常省事!!

个人感觉完成端口就是操作系统为我们包装了很多重叠IO的不爽的地方,让我们可以更方便的去使用,下篇我将会尝试去讲述完成端口的原理。

2、常见问题和解答

a、什么是单句柄数据(PerHandle)和单IO数据(PerIO)

单句柄数据就是和句柄对应的数据,像socket句柄,文件句柄这种东西。

单IO数据,就是对应于每次的IO操作的数据。例如每次的WSARecv/WSASend等等

其实我觉得PER是每次的意思,翻译成每个句柄数据和每次IO数据还比较清晰一点。

在完成端口中,单句柄数据直接通过GetQueuedCompletionStatus 返回,省去了我们自己做容器去管理。单IO数据也容许我们自己扩展OVERLAPPED结构,所以,在这里所有与应用逻辑有关的东西都可以在此扩展。

b、如何判断客户端的断开

我们要处理几种情况

1) 如果客户端调用了closesocket,我们就可以这样判断他的断开:

if(0 == GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, 。。。)
{
}
if(BytesTransferred == 0)
{
// 客户端断开,释放资源
}

2) 如果是客户端直接退出,那就会出现64错误,指定的网络名不可再用。这种情况我们也要处理的:

if(0 == GetQueuedCompletionStatus(。。。))
{
if( (GetLastError() == WAIT_TIMEOUT) || (GetLastError() == ERROR_NETNAME_DELETED) )
{
// 客户端断开,释放资源
}
}

3、步骤

编写完成端口服务程序,无非就是以下几个步骤:

1、创建一个完成端口
2、根据CPU个数创建工作者线程,把完成端口传进去线程里
3、创建侦听SOCKET,把SOCKET和完成端口关联起来
4、创建PerIOData,向连接进来的SOCKET投递WSARecv操作

5、线程里所做的事情:
a、GetQueuedCompletionStatus,在退出的时候就可以使用PostQueudCompletionStatus使线程退出
b、取得数据并处理

4、例程

下面是服务端的例程,可以使用《WinSocket模型的探讨——Overlapped模型(一)》中的客户端程序来测试次服务端。稍微研究一下,也就会对完成端口模型有个大概的了解了。

/*

完成端口服务器

接收到客户端的信息,直接显示出来

*/

#include "winerror.h"
#include "Winsock2.h"
#pragma comment(lib, "ws2_32")

#include "windows.h"


#include <iostream>
using namespace std;


/// 宏定义
#define PORT 5050
#define DATA_BUFSIZE 8192

#define OutErr(a) cout << (a) << endl \
<< "出错代码:" << WSAGetLastError() << endl \
<< "出错文件:" << __FILE__ << endl \
<< "出错行数:" << __LINE__ << endl \

#define OutMsg(a) cout << (a) << endl;


/// 全局函数定义


///////////////////////////////////////////////////////////////////////
//
// 函数名 : InitWinsock
// 功能描述 : 初始化WINSOCK
// 返回值 : void
//
///////////////////////////////////////////////////////////////////////
void InitWinsock()
{
// 初始化WINSOCK
WSADATA wsd;
if( WSAStartup(MAKEWORD(2, 2), &wsd) != 0)
{
OutErr("WSAStartup()");
}
}

///////////////////////////////////////////////////////////////////////
//
// 函数名 : BindServerOverlapped
// 功能描述 : 绑定端口,并返回一个 Overlapped 的Listen Socket
// 参数 : int nPort
// 返回值 : SOCKET
//
///////////////////////////////////////////////////////////////////////
SOCKET BindServerOverlapped(int nPort)
{
// 创建socket
SOCKET sServer = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

// 绑定端口
struct sockaddr_in servAddr;
servAddr.sin_family = AF_INET;
servAddr.sin_port = htons(nPort);
servAddr.sin_addr.s_addr = htonl(INADDR_ANY);

if(bind(sServer, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0)
{
OutErr("bind Failed!");
return NULL;
}

// 设置监听队列为200
if(listen(sServer, 200) != 0)
{
OutErr("listen Failed!");
return NULL;
}
return sServer;
}


/// 结构体定义
typedef struct
{
OVERLAPPED Overlapped;
WSABUF DataBuf;
CHAR Buffer[DATA_BUFSIZE];
} PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;


typedef struct
{
SOCKET Socket;
} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;


DWORD WINAPI ProcessIO(LPVOID lpParam)
{
HANDLE CompletionPort = (HANDLE)lpParam;
DWORD BytesTransferred;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;

while(true)
{

if(0 == GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED*)&PerIoData, INFINITE))
{
if( (GetLastError() == WAIT_TIMEOUT) || (GetLastError() == ERROR_NETNAME_DELETED) )
{
cout << "closing socket" << PerHandleData->Socket << endl;

closesocket(PerHandleData->Socket);

delete PerIoData;
delete PerHandleData;
continue;
}
else
{
OutErr("GetQueuedCompletionStatus failed!");
}
return 0;
}

// 说明客户端已经退出
if(BytesTransferred == 0)
{
cout << "closing socket" << PerHandleData->Socket << endl;
closesocket(PerHandleData->Socket);
delete PerIoData;
delete PerHandleData;
continue;
}

// 取得数据并处理
cout << PerHandleData->Socket << "发送过来的消息:" << PerIoData->Buffer << endl;

// 继续向 socket 投递WSARecv操作
DWORD Flags = 0;
DWORD dwRecv = 0;
ZeroMemory(PerIoData, sizeof(PER_IO_OPERATION_DATA));
PerIoData->DataBuf.buf = PerIoData->Buffer;
PerIoData->DataBuf.len = DATA_BUFSIZE;
WSARecv(PerHandleData->Socket, &PerIoData->DataBuf, 1, &dwRecv, &Flags, &PerIoData->Overlapped, NULL);
}

return 0;
}

void main()
{
InitWinsock();

HANDLE CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

// 根据系统的CPU来创建工作者线程
SYSTEM_INFO SystemInfo;
GetSystemInfo(&SystemInfo);

for(int i = 0; i < SystemInfo.dwNumberOfProcessors * 2; i++)
{
HANDLE hProcessIO = CreateThread(NULL, 0, ProcessIO, CompletionPort, 0, NULL);
if(hProcessIO)
{
CloseHandle(hProcessIO);
}
}

// 创建侦听SOCKET
SOCKET sListen = BindServerOverlapped(PORT);


SOCKET sClient;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_OPERATION_DATA PerIoData;
while(true)
{
// 等待客户端接入
//sClient = WSAAccept(sListen, NULL, NULL, NULL, 0);
sClient = accept(sListen, 0, 0);

cout << "Socket " << sClient << "连接进来" << endl;

PerHandleData = new PER_HANDLE_DATA();
PerHandleData->Socket = sClient;

// 将接入的客户端和完成端口联系起来
CreateIoCompletionPort((HANDLE)sClient, CompletionPort, (DWORD)PerHandleData, 0);

// 建立一个Overlapped,并使用这个Overlapped结构对socket投递操作
PerIoData = new PER_IO_OPERATION_DATA();

ZeroMemory(PerIoData, sizeof(PER_IO_OPERATION_DATA));
PerIoData->DataBuf.buf = PerIoData->Buffer;
PerIoData->DataBuf.len = DATA_BUFSIZE;

// 投递一个WSARecv操作
DWORD Flags = 0;
DWORD dwRecv = 0;
WSARecv(sClient, &PerIoData->DataBuf, 1, &dwRecv, &Flags, &PerIoData->Overlapped, NULL);
}

DWORD dwByteTrans;
PostQueuedCompletionStatus(CompletionPort, dwByteTrans, 0, 0);
closesocket(sListen);
}

好了,本篇文章就到此为止,
  回复  更多评论
  
# re: 完成端口模型代码 2009-04-02 07:05 xxq57@163.com
个人的几点理解:这里共享下:
1.疑问:if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,(LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) == 0)。语句中参数PerIoData怎么能代用啊?这里参数类型不是(LPOVERLAPPED *) 吗
解惑:其实这里只是一个指针。
在ACCEPT后,有个WSARecv(Accept, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,&(PerIoData->Overlapped), NULL)异步接收。注意是异步接收,所以函数执行后,先返回,但他会接收一次数据,放在PerIoData->DataBuf。其中参数&(PerIoData->Overlapped)地址值就是后面(LPOVERLAPPED *) 要传出的地址值,二者刚好重合,其实就是指向同一内存地址。所以GetQueuedCompletionStatus()后PerIoData,和WSARecv()的PerIoData是同一个地址。所以PerIoData->DataBuf就有了。
  回复  更多评论
  
# re: 完成端口模型代码 2009-04-02 07:09 xxq57@163.com
指针是个很巧妙的东西,上解答,如果PerIoData->Overlapped的Overlapped成员不是放在第一个位置,那结果就不是那么回事了。不信的读者可以试试看。  回复  更多评论
  

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理