<<3D游戏程序设计大师技巧>>这本书.
网上讨论完成端口资料很多,大宝收集的最多,有些是错误的,有些说的比较模糊.我就进可能清晰说明一下完成端口在游戏开发中一般模型.并解
决几个难点问题.
由于完成端口是多线模型(当然可以把工作线程设定为一个)所以设计到资源的时候要线程安全,所以我开始简单封装了一下几个stl的容器.
/***************************************以下代码要与后边的代码一起编译*******************************************/
//memorypool.h
//particle_allocator.h
#pragma once
#include <iostream>
#include <list>
#include <queue>
#include <windows.h>
using namespace std;
//关键区锁
class CLock
{
CRITICAL_SECTION _crisection;
public:
CLock()
{
InitializeCriticalSection( &_crisection );
}
~CLock()
{
DeleteCriticalSection( &_crisection );
}
void Lock()
{
EnterCriticalSection( &_crisection );
}
void Unlock()
{
LeaveCriticalSection( &_crisection );
}
};
//链表模板
template <class T>
class CList : public list<T>
{
public:
CList(void){};
virtual ~CList(void){};
CLock _guard;
DWORD Size()
{
DWORD dwSize = 0;
_guard.Lock();
dwSize = size();
_guard.Unlock();
return dwSize;
};
void Clear()
{
_guard.Lock();
clear();
_guard.Unlock();
}
//添加数据到链表末尾,并返回添加后的该数据的节点指针
iterator Push_Back(T lpData)
{
iterator lpNode = NULL;
_guard.Lock();
lpNode = insert(end(), lpData);
_guard.Unlock();
return lpNode;
};
//删除一个节点
void Erase(iterator lpNode)
{
_guard.Lock();
erase(lpNode);
_guard.Unlock();
};
};
template <class T>
class CQueue : private CLock
{
private:
queue<T> m_Queue;
public:
CQueue(void) {};
virtual ~CQueue(void) {};
DWORD GetSize()
{
return (DWORD)m_Queue.size();
};
virtual void Push(T lpData)
{
Lock();
m_Queue.push(lpData);
Unlock();
};
virtual T Pop()
{
T lpData = NULL;
Lock();
if(m_Queue.size())
{
lpData = m_Queue.front();
m_Queue.pop();
}
Unlock();
return lpData;
};
};
template<class ElementClass, int NumElement>
class CMemoryPool
{
enum
{
chunk_size =NumElement
};
typedef unsigned char byte;
list<byte *> chunks;//内存指针
queue<ElementClass *> free_list;//空闲块队列
CLock guard;
public:
CMemoryPool()
{
InitMemPool();
}
~CMemoryPool()
{
DestroyMemPool();
}
//初时化内存池
void InitMemPool()
{
guard.Lock();
used = 0;
free = 0;
//byte *memory;
//memory = new byte[chunk_size*sizeof(ElementClass)];
//if (!memory)
//{
// cout << "内存分配出错....."<< endl;
// return ;
//}
//chunks.push_front(memory);
//for(int i =0;i< chunk_size;i++,free++)
//{
// ElementClass *newnode = (ElementClass *)(memory + i*sizeof(ElementClass));
// free_list.push(newnode);
//}
guard.Unlock();
}
//销毁内存池
void DestroyMemPool()
{
while(!free_list.empty())
{
free_list.pop();
}
for(std::list<byte *>::iterator all_iter = chunks.begin();all_iter!=chunks.end();++all_iter)
{
delete [](*all_iter);
}
chunks.clear();
used = 0;
free= 0;
}
//从内存池里分配一块内存
ElementClass* MemPoolAlloc()
{
guard.Lock();
byte *memory;
if(free_list.empty())
{
memory = new byte[chunk_size*sizeof(ElementClass)];
if (!memory)
{
cout << "内存分配出错....."<< endl;
return NULL;
}
chunks.push_front(memory);
for(int i =0;i< chunk_size;i++,free++)
{
ElementClass *newnode = (ElementClass *)(memory + i*sizeof(ElementClass));
free_list.push(newnode);
}
}
ElementClass *redata = NULL;
redata = free_list.front();
ZeroMemory(redata,sizeof(ElementClass));
free_list.pop();
++used;
--free;
guard.Unlock();
return redata;
}
//还原内存池
void MemPoolFree(ElementClass *p)
{
guard.Lock();
ZeroMemory(p,sizeof(ElementClass));
free_list.push(p);
-- used;
++ free;
guard.Unlock();
}
//内存池信息统计.
void GetMemPoolInfo()
{
cout << "used is "
<< used
<< " ; free is "
<< free << endl;
}
private:
UINT used;
UINT free;
};
/********************************************************************************************************************/
代码都非常简单.相信有点c++基础的朋友都能看懂.我只是简单说一下内存池代码.内存池里ElementClass类型一定要结构形的.也就是那种POD
类型还是什么类型我刚刚听说这个名词,所以记不住.内存池第一次分派的时候,分派NumElement>个ElementClass类型节点,这些节点构成一个队
列free_list,这些都是通过模版参数传入的.每次用的时候就就从头找出一个节点,释放的时候又在尾部加入一个节点,很好理解.用完的时候再
分配NumElement>个ElementClass类型节点,节点到最后统一释放.
接下来我们看看程序用到的头文件
//iocpsever.cpp
#include <winsock2.h>
#include "memorypool.h"
#include <Mswsock.h>
#include <process.h>
using namespace std;
#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "Mswsock.lib")
using namespace std;
1.头文件有一个郁闷的问题就是winsock2.h与windows互相包含问题.不知道有什么完美解决办法,我再项目中用afxsock.h替代winsock2.h.
需要用到库两个库文件我们都通过编译器指令连接进来.
2.完成端口模型用一句话来讲就是,把所有的socket都绑定到完成端口上,通过完成端口统一来处理.绑定以后这些socket上有任何的情况,都通
过完成端口反映出来,达到简化处理逻辑的作用.完成端口确实不会让你失望,逻辑结构非常简单.
3.我们首先为socket进行分类.服务器的socket可以分为三类,一类就是用于监听的socket,监听socket绑定时机是它成功绑定到某一个端口上的
时候我们就可以把它绑定到完成端口上.所以我把它放进初始化间断.第二类客户连接进入的socket,这个只能放进当连接进入完成端口,accept
事件触发的时候我们把它绑定到完成端口上.第三类就是服务器连接出去的socket,我把它放进线程里,在线程内部进行绑定,因为这类socket相
对比较少.
4.接下来我们考虑连接问题.由于完成端口是一种异步模型,它工作机制和我们传统的同步socket连接模型不一样,我们先投递许多连接消息,当
连接成功的时候,对应的完成端口上发生accept事件.这里的accept是我们自己定义的,并且在AcceptEx的时候通过重叠结构传入的,你也可以任
意定义,但是我想你不会把accept定义成send,而send定义成 receive吧.由于我们在AcceptEx的时候传入单io数据结构,所以一定要关闭完成端
口的时候把它释放掉.关于资源的释放我们放在下边来考虑.话题不要扯远,回来连接上来,我们到底该怎么样投递呢.如果我们一下投递的过多,
就会造成资源浪费,过少如果用完该怎么办.这样客户就不能那个连接了.通用的做法就是每次投递10个,如果用完继续投机10个.怎么样才能知道
用完呢?需要在监听socket上注册一个FD_ACCEPT事件,当投递用完的时候事件触发.这就就可以继续投机,我也把它放进线程里边去做这件事.
g_hAcceptExOverEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if(!g_hAcceptExOverEvent)
{
return false;
}
//帮定事件,由于我们用AcceptEx是一个异步的投递,这样帮定之后,如果投递的AcceptEx事件全部完成
//则g_hAcceptExOverEvent事件得到通知,进而同步AcceptEx调用
if(WSAEventSelect(Listen, g_hAcceptExOverEvent, FD_ACCEPT) == SOCKET_ERROR)
{
return false;
}
//由于开始是复位,变成置位
SetEvent(g_hAcceptExOverEvent);
5.最后讨论资源释放问题,分析如下代码
closesocket(Listen);
g_session._guard.Lock();
for(CList<LPPER_HANDLE_DATA>::iterator lpNode = g_session.begin(); lpNode != g_session.end();lpNode++)
{
closesocket((*lpNode)->Socket);
printf(" close socket = %d\n",(*lpNode)->Socket);
}
g_session._guard.Unlock();
Sleep(1000);
//向IOCP发送结束线程信号
for(DWORD i = 0; i < threadcnt; i++)
PostQueuedCompletionStatus(CompletionPort, 0, NULL, NULL);
//等待工作线程结束,等待时间10秒
if(WaitForMultipleObjects(threadcnt, m_arrayIOCPThreadHandle, TRUE, 10000) != WAIT_OBJECT_0)
{
//如果10秒内没有结束所有线程,就强制结束
for(DWORD i = 0; i < threadcnt; i++)
TerminateThread(m_arrayIOCPThreadHandle[i], 0);
}
//关闭所有工作线程句柄
for(DWORD i = 0; i < threadcnt; i++)
CloseHandle(m_arrayIOCPThreadHandle[i]);
//释放线程句柄数组
delete [] m_arrayIOCPThreadHandle;
m_arrayIOCPThreadHandle = NULL;
CloseHandle(CompletionPort);
g_per_io_data.DestroyMemPool();
g_per_handle_data.DestroyMemPool();
g_session.Clear();
我们也分三类来考虑
首先第一类监听socket,监听socket关闭之后,由于连接事件与监听socket有关系.所以所有未完成的连接都会从完成端口返回来.这些连接里我
们分配了单io数据结构一定要释放.
释放逻辑如下
bRet = GetQueuedCompletionStatus(CompletionPort,
&BytesTransferred,
(PULONG_PTR)
&lpCompletionKey,
(LPOVERLAPPED*)
&lpPerIoData,
INFINITE);
if(!bRet)//没有取到任何东西
{
if(!lpPerIoData)
continue;
}
//收到线程结束信号
if(!lpCompletionKey)
return 0;
LPPER_HANDLE_DATA lpSession = (LPPER_HANDLE_DATA)lpCompletionKey;
if(!bRet)
{
if(BytesTransferred == 0 )//断开连接
{
if (lpPerIoData->OperationType == OP_READ)
{
printf("client socket %d disconnect\n",lpPerIoData->Socket);
closesocket(lpPerIoData->Socket); //关闭socket
g_session.Erase(lpSession->node); //从Session链表中删除
g_per_handle_data.MemPoolFree(lpSession); //将Session归还到池
g_per_io_data.MemPoolFree(lpPerIoData);
continue;
}
}
}
switch (lpPerIoData->OperationType)
{
case
OP_ACCEPT://有连接进来 {
if (!bRet)
{
closesocket(lpPerIoData->Socket); //关闭socket
g_per_io_data.MemPoolFree(lpPerIoData); //归还结构体到内存池
printf("关闭已经投递空闲连接\n");
break;
}
.........................后边代码省略掉当关闭监听socket的时候GetQueuedCompletionStatus会返回false并且BytesTransferred等于零,
并且lpPerIoData,不为空lpPerIoData->OperationType操作符号就是我们AcceptEx传入的自定义操作类型accept,这样就释放掉了与连接相关的
资源.所以我们在关掉监听socket之后要sleep一下让资源能充分释放.
第二类就是客户socket.
分两种情况考虑.(1)服务器主动关闭,这又可以分为两种情况,第一,当你在工作线程里关掉socket的时候GetQueuedCompletionStatus是收不到
任何消息的,所以直接释放资源就是了.这样产生的一个问题就是如果你把拆包逻辑放进完成端口工作线程的时候,当数据包发生错误的时候,你
想关闭socket,一定要记住连释放掉与这个socket相关的资源.
if(WSARecv(lpPerIoData->Socket,
&(lpPerIoData->DataBuf),
1,
&lpPerIoData->RecvBytes,
&lpPerIoData->Flags,
&(lpPerIoData->Overlapped),
NULL)== SOCKET_ERROR)
{
if(WSAGetLastError() != ERROR_IO_PENDING)//读操作失败
{
closesocket(lpPerIoData->Socket); //关闭socket
g_session.Erase(lpSession->node); //从Session链表中删除
g_per_handle_data.MemPoolFree(lpSession); //将Session归还到池
g_per_io_data.MemPoolFree(lpPerIoData);
}
}
第二,就是当在其他线程里关闭的时候,GetQueuedCompletionStatus就会收到消息.由于我们每次收到自定义recv事件的时候,接着就投递了一个
recv事件.所以这个事件失败消息肯定会从完成端口返回.处理如下
bRet = GetQueuedCompletionStatus(CompletionPort,
&BytesTransferred,
(PULONG_PTR)
&lpCompletionKey,
(LPOVERLAPPED*)
&lpPerIoData,
INFINITE);
if(!bRet)//没有取到任何东西
{
if(!lpPerIoData)
continue;
}
//收到线程结束信号
if(!lpCompletionKey)
return 0;
LPPER_HANDLE_DATA lpSession = (LPPER_HANDLE_DATA)lpCompletionKey;
if(!bRet)
{
if(BytesTransferred == 0 )//断开连接
{
if (lpPerIoData->OperationType == OP_READ)
{
printf("client socket %d disconnect\n",lpPerIoData->Socket);
closesocket(lpPerIoData->Socket); //关闭socket
g_session.Erase(lpSession->node); //从Session链表中删除
g_per_handle_data.MemPoolFree(lpSession); //将Session归还到池
g_per_io_data.MemPoolFree(lpPerIoData);
continue;
}
}
}
返回代码为false并且BytesTransferred为零且lpPerIoData->OperationType == OP_READ.
(2)用户断开的处理逻辑与服务器断开第二种情况是一样的.有些资料讲服务器主动断开的时候GetQueuedCompletionStatus会返回true是错误的
.
6.ERROR_IO_PENDING消息处理这个消息说明现在这个重叠操作还没有完成,我们必须进行等待,所以必须在错误判断的时候忽略掉这个消息.
if(WSAGetLastError() != ERROR_IO_PENDING)//读操作失败
{
closesocket(lpPerIoData->Socket); //关闭socket
g_session.Erase(lpSession->node); //从Session链表中删除
g_per_handle_data.MemPoolFree(lpSession); //将Session归还到池
g_per_io_data.MemPoolFree(lpPerIoData);
}
7.GetQueuedCompletionStatus返回之后的逻辑判断按下边的代码进行,我们必须知道为什么这样做,理由就是上边六条再加上msdn里
GetQueuedCompletionStatus这个函数的说明.
DWORD WINAPI ServerWorkerThread(LPVOID lpParam)
{
DWORD BytesTransferred;
LPPER_IO_DATA lpPerIoData = NULL;
LPVOID lpCompletionKey = NULL;
BOOL bRet = FALSE;
while (1)
{
bRet = GetQueuedCompletionStatus(CompletionPort,
&BytesTransferred,
(PULONG_PTR)
&lpCompletionKey,
(LPOVERLAPPED*)
&lpPerIoData,
INFINITE);
if(!bRet)//没有取到任何东西
{
if(!lpPerIoData)
continue;
}
//收到线程结束信号
if(!lpCompletionKey)
return 0;
LPPER_HANDLE_DATA lpSession = (LPPER_HANDLE_DATA)lpCompletionKey;
if(!bRet)
{
if(BytesTransferred == 0 )//断开连接
{
if (lpPerIoData->OperationType == OP_READ)
{
printf("client socket %d disconnect\n",lpPerIoData->Socket);
closesocket(lpPerIoData->Socket); //关闭socket
g_session.Erase(lpSession->node); //从Session链表中删除
g_per_handle_data.MemPoolFree(lpSession); //将Session归还到池
g_per_io_data.MemPoolFree(lpPerIoData);
continue;
}
}
}
switch (lpPerIoData->OperationType)
{
case
OP_ACCEPT://有连接进来 {
if (!bRet)
{
closesocket(lpPerIoData->Socket); //关闭socket
g_per_io_data.MemPoolFree(lpPerIoData); //归还结构体到内存池
printf("关闭已经投递空闲连接\n");
break;
}
LPPER_HANDLE_DATA lpCurSession;
lpCurSession=g_per_handle_data.MemPoolAlloc();
printf(" connect socket = %d\n",lpPerIoData->Socket);
lpCurSession->Socket = lpPerIoData->Socket;//就是我们AcceptEx时传入的socket
lpCurSession->node = g_session.Push_Back(lpCurSession);
lpPerIoData->lpSession = lpCurSession;
if(!CreateIoCompletionPort(
(HANDLE)lpPerIoData->Socket,
CompletionPort,
(ULONG_PTR)lpCurSession,
0))
{
closesocket(lpPerIoData->Socket); //关闭socket
g_per_handle_data.MemPoolFree(lpSession);
}
else
{
//g_per_io_data与每次读写相关联
lpPerIoData->OperationType = OP_READ;
ZeroMemory(&(lpPerIoData->Overlapped), sizeof(OVERLAPPED));
lpPerIoData->Flags = 0;
lpPerIoData->DataBuf.len = 1024;
lpPerIoData->DataBuf.buf = lpPerIoData->buffer;
lpPerIoData->RecvBytes =0;
lpPerIoData->lpSession = lpSession;
ZeroMemory(lpPerIoData->buffer,1024);
if(WSARecv(lpPerIoData->Socket,
&(lpPerIoData->DataBuf),
1,
&lpPerIoData->RecvBytes,
&lpPerIoData->Flags,
&(lpPerIoData->Overlapped),
NULL)== SOCKET_ERROR)
{
if(WSAGetLastError() != ERROR_IO_PENDING)//读操作失败
{
closesocket(lpPerIoData->Socket); //关闭socket
g_session.Erase(lpSession->node); //从Session链表中删除
g_per_handle_data.MemPoolFree(lpSession); //将Session归还到池
g_per_io_data.MemPoolFree(lpPerIoData);
}
}
}
}
break;
case OP_READ:
{
//cout << lpPerIoData->DataBuf.buf << endl;
send(lpPerIoData->Socket,"9876543210",lstrlen("9876543210")+1,0);
lpPerIoData->OperationType = OP_READ;
ZeroMemory(&(lpPerIoData->Overlapped), sizeof(OVERLAPPED));
lpPerIoData->Flags = 0;
lpPerIoData->DataBuf.len = 1024;
lpPerIoData->DataBuf.buf = lpPerIoData->buffer;
lpPerIoData->RecvBytes =0;
lpPerIoData->lpSession = lpSession;
ZeroMemory(lpPerIoData->buffer,1024);
if(WSARecv(lpPerIoData->Socket,
&(lpPerIoData->DataBuf),
1,
&lpPerIoData->RecvBytes,
&lpPerIoData->Flags,
&(lpPerIoData->Overlapped),
NULL)== SOCKET_ERROR)
{
if(WSAGetLastError() != ERROR_IO_PENDING)
{
closesocket(lpPerIoData->Socket); //关闭socket
g_session.Erase(lpSession->node); //从Session链表中删除
g_per_handle_data.MemPoolFree(lpSession); //将Session归还到池
g_per_io_data.MemPoolFree(lpPerIoData);
}
}
}
break;
case OP_WRITE:
break;
default:
break;
}
}
}
下边就是整个控制台代码.
/********************************以下代码要与前边的容器头文件一起进行编译***********************************************/
//iocpsever.cpp
#include <winsock2.h>
#include "memorypool.h"
#include <Mswsock.h>
#include <process.h>
using namespace std;
#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "Mswsock.lib")
using namespace std;
// 单句柄数据,每个连接(客户端)对应一个这样的结构。
//只有连接断开,或者服务器关闭的时候才释放
struct tagPER_HANDLE_DATA;
typedef unsigned (WINAPI *PBEGINTHREADX_THREADFUN)(LPVOID lpThreadParameter); //线程函数原型
typedef struct tagPER_HANDLE_DATA *LPPER_HANDLE_DATA;
typedef CList<LPPER_HANDLE_DATA>::iterator LISTSESSIONNODE;
typedef struct tagPER_HANDLE_DATA
{
SOCKET Socket;
LISTSESSIONNODE node;
// 将和这个句柄关联的其他有用信息,尽管放在这里面吧
}PER_HANDLE_DATA;
// 单I/O操作数据。每次收发数据的时候
//收发数据操作完成数之后释放。
//需要注意的是OVERLAPPED Overlapped一定要放在最前
typedef struct tagPER_IO_DATA
{
OVERLAPPED Overlapped;
WSABUF DataBuf;
char buffer[1024];
DWORD RecvBytes;
DWORD Flags;
int OperationType;
SOCKET Socket;
LPPER_HANDLE_DATA lpSession;
}PER_IO_DATA,*LPPER_IO_DATA;
//操作标志
#define OP_READ 0
#define OP_WRITE 1
#define OP_ACCEPT 2
//连接数据内存池
CMemoryPool<PER_IO_DATA,1024> g_per_io_data;
//数据收发内存池
CMemoryPool<PER_HANDLE_DATA,1024> g_per_handle_data;
//完成线程
//保存当前连接数据指针
CList<LPPER_HANDLE_DATA> g_session;
SOCKET Listen;
HANDLE CompletionPort;
HANDLE *m_arrayIOCPThreadHandle;
DWORD threadcnt;
DWORD WINAPI ServerWorkerThread(LPVOID lpParam);
BOOL b;
HANDLE g_hAcceptExOverEvent;
HANDLE g_hAcceptExThread;
DWORD WINAPI WinSockAcceptEXThread(LPVOID lpParam);
bool StartUpWinSockAcceptEXThread();
void CloseWinSockAcceptEXThread();
void InitCompletionPort();
void CloseCompletionPort();
int main(void)
{
InitCompletionPort();
StartUpWinSockAcceptEXThread();
{
HANDLE hWaitEvent;
hWaitEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
WaitForSingleObject(hWaitEvent, 30000/*INFINITE*/);
CloseHandle(hWaitEvent);
}
CloseCompletionPort();
return 0;
}
DWORD WINAPI ServerWorkerThread(LPVOID lpParam)
{
DWORD BytesTransferred;
LPPER_IO_DATA lpPerIoData = NULL;
LPVOID lpCompletionKey = NULL;
BOOL bRet = FALSE;
while (1)
{
bRet = GetQueuedCompletionStatus(CompletionPort,
&BytesTransferred,
(PULONG_PTR)
&lpCompletionKey,
(LPOVERLAPPED*)
&lpPerIoData,
INFINITE);
if(!bRet)//没有取到任何东西
{
if(!lpPerIoData)
continue;
}
//收到线程结束信号
if(!lpCompletionKey)
return 0;
LPPER_HANDLE_DATA lpSession = (LPPER_HANDLE_DATA)lpCompletionKey;
if(!bRet)
{
if(BytesTransferred == 0 )//断开连接
{
if (lpPerIoData->OperationType == OP_READ)
{
printf("client socket %d disconnect\n",lpPerIoData->Socket);
closesocket(lpPerIoData->Socket); //关闭socket
g_session.Erase(lpSession->node); //从Session链表中删除
g_per_handle_data.MemPoolFree(lpSession); //将Session归还到池
g_per_io_data.MemPoolFree(lpPerIoData);
continue;
}
}
}
switch (lpPerIoData->OperationType)
{
case
OP_ACCEPT://有连接进来 {
if (!bRet)
{
closesocket(lpPerIoData->Socket); //关闭socket
g_per_io_data.MemPoolFree(lpPerIoData); //归还结构体到内存池
printf("关闭已经投递空闲连接\n");
break;
}
LPPER_HANDLE_DATA lpCurSession;
lpCurSession=g_per_handle_data.MemPoolAlloc();
printf(" connect socket = %d\n",lpPerIoData->Socket);
lpCurSession->Socket = lpPerIoData->Socket;//就是我们AcceptEx时传入的socket
lpCurSession->node = g_session.Push_Back(lpCurSession);
lpPerIoData->lpSession = lpCurSession;
if(!CreateIoCompletionPort(
(HANDLE)lpPerIoData->Socket,
CompletionPort,
(ULONG_PTR)lpCurSession,
0))
{
closesocket(lpPerIoData->Socket); //关闭socket
g_per_handle_data.MemPoolFree(lpSession);
}
else
{
//g_per_io_data与每次读写相关联
lpPerIoData->OperationType = OP_READ;
ZeroMemory(&(lpPerIoData->Overlapped), sizeof(OVERLAPPED));
lpPerIoData->Flags = 0;
lpPerIoData->DataBuf.len = 1024;
lpPerIoData->DataBuf.buf = lpPerIoData->buffer;
lpPerIoData->RecvBytes =0;
lpPerIoData->lpSession = lpSession;
ZeroMemory(lpPerIoData->buffer,1024);
if(WSARecv(lpPerIoData->Socket,
&(lpPerIoData->DataBuf),
1,
&lpPerIoData->RecvBytes,
&lpPerIoData->Flags,
&(lpPerIoData->Overlapped),
NULL)== SOCKET_ERROR)
{
if(WSAGetLastError() != ERROR_IO_PENDING)//读操作失败
{
closesocket(lpPerIoData->Socket); //关闭socket
g_session.Erase(lpSession->node); //从Session链表中删除
g_per_handle_data.MemPoolFree(lpSession); //将Session归还到池
g_per_io_data.MemPoolFree(lpPerIoData);
}
}
}
}
break;
case OP_READ:
{
//cout << lpPerIoData->DataBuf.buf << endl;
send(lpPerIoData->Socket,"9876543210",lstrlen("9876543210")+1,0);
lpPerIoData->OperationType = OP_READ;
ZeroMemory(&(lpPerIoData->Overlapped), sizeof(OVERLAPPED));
lpPerIoData->Flags = 0;
lpPerIoData->DataBuf.len = 1024;
lpPerIoData->DataBuf.buf = lpPerIoData->buffer;
lpPerIoData->RecvBytes =0;
lpPerIoData->lpSession = lpSession;
ZeroMemory(lpPerIoData->buffer,1024);
if(WSARecv(lpPerIoData->Socket,
&(lpPerIoData->DataBuf),
1,
&lpPerIoData->RecvBytes,
&lpPerIoData->Flags,
&(lpPerIoData->Overlapped),
NULL)== SOCKET_ERROR)
{
if(WSAGetLastError() != ERROR_IO_PENDING)
{
closesocket(lpPerIoData->Socket); //关闭socket
g_session.Erase(lpSession->node); //从Session链表中删除
g_per_handle_data.MemPoolFree(lpSession); //将Session归还到池
g_per_io_data.MemPoolFree(lpPerIoData);
}
}
}
break;
case OP_WRITE:
break;
default:
break;
}
}
}
DWORD WINAPI WinSockAcceptEXThread(LPVOID lpParam)
{
LINGER lingerStruct = { 0x01, 0x00 };
BOOL bNodelay = TRUE;
//创建事件
while (b)
{
//每次投递10次,进入等待状态,当AcceptEx全部完成之后,继续投递
if(WaitForSingleObject(g_hAcceptExOverEvent, INFINITE) == WAIT_FAILED)
continue;
for(int i =0;i<10 && b;i++)
{
int zero =0;
PER_IO_DATA * pper_io_data = NULL;
DWORD dwAddrLen = sizeof(sockaddr_in)+16;
pper_io_data = g_per_io_data.MemPoolAlloc();
pper_io_data ->Socket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if(pper_io_data->Socket == INVALID_SOCKET)
{
g_per_io_data.MemPoolFree(pper_io_data);
continue;
}
pper_io_data->OperationType = OP_ACCEPT;
pper_io_data->lpSession=NULL;
ZeroMemory(&pper_io_data->Overlapped,sizeof(OVERLAPPED));//一定要注意清零
pper_io_data->RecvBytes =0;
pper_io_data->Flags =0;
ZeroMemory(pper_io_data->buffer,1024);
setsockopt(pper_io_data->Socket, IPPROTO_TCP, TCP_NODELAY, (char*)&bNodelay, sizeof(BOOL));
setsockopt(pper_io_data->Socket, SOL_SOCKET, SO_LINGER, (char*)&lingerStruct, sizeof(LINGER));
if(!AcceptEx(Listen,pper_io_data ->Socket,pper_io_data ->buffer,0,dwAddrLen,dwAddrLen,&pper_io_data -
>RecvBytes,&pper_io_data->Overlapped))
{
if(WSAGetLastError() != ERROR_IO_PENDING)//对于AcceptEx,WSARecv,WSASend一定要有这样的判断,因为是异步的所以
不会立即完成
{
closesocket(pper_io_data->Socket);
g_per_io_data.MemPoolFree(pper_io_data); //归还结构体到内存池
continue;
}
}
}
}
return 0;
}
bool StartUpWinSockAcceptEXThread()
{
g_hAcceptExOverEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if(!g_hAcceptExOverEvent)
{
return false;
}
//帮定事件,由于我们用AcceptEx是一个异步的投递,这样帮定之后,如果投递的AcceptEx事件全部完成
//则g_hAcceptExOverEvent事件得到通知,进而同步AcceptEx调用
if(WSAEventSelect(Listen, g_hAcceptExOverEvent, FD_ACCEPT) == SOCKET_ERROR)
{
return false;
}
//由于开始是复位,变成置位
SetEvent(g_hAcceptExOverEvent);
b = TRUE;
g_hAcceptExThread = (HANDLE)_beginthreadex(NULL, 0, (PBEGINTHREADX_THREADFUN)WinSockAcceptEXThread, NULL, 0, NULL);
if(!g_hAcceptExThread)
return false;
return true;
}
void CloseWinSockAcceptEXThread()
{
b=false;
SetEvent(g_hAcceptExOverEvent);
if(WaitForSingleObject(g_hAcceptExThread,10000)!= WAIT_OBJECT_0)
TerminateThread(g_hAcceptExThread, 0);
CloseHandle(g_hAcceptExThread);
CloseHandle(g_hAcceptExOverEvent);
};
void CloseCompletionPort()
{
closesocket(Listen);
g_session._guard.Lock();
for(CList<LPPER_HANDLE_DATA>::iterator lpNode = g_session.begin(); lpNode != g_session.end();lpNode++)
{
closesocket((*lpNode)->Socket);
printf(" close socket = %d\n",(*lpNode)->Socket);
}
g_session._guard.Unlock();
Sleep(1000);
//向IOCP发送结束线程信号
for(DWORD i = 0; i < threadcnt; i++)
PostQueuedCompletionStatus(CompletionPort, 0, NULL, NULL);
//等待工作线程结束,等待时间10秒
if(WaitForMultipleObjects(threadcnt, m_arrayIOCPThreadHandle, TRUE, 10000) != WAIT_OBJECT_0)
{
//如果10秒内没有结束所有线程,就强制结束
for(DWORD i = 0; i < threadcnt; i++)
TerminateThread(m_arrayIOCPThreadHandle[i], 0);
}
//关闭所有工作线程句柄
for(DWORD i = 0; i < threadcnt; i++)
CloseHandle(m_arrayIOCPThreadHandle[i]);
//释放线程句柄数组
delete [] m_arrayIOCPThreadHandle;
m_arrayIOCPThreadHandle = NULL;
CloseHandle(CompletionPort);
g_per_io_data.DestroyMemPool();
g_per_handle_data.DestroyMemPool();
g_session.Clear();
}
void InitCompletionPort()
{
WSADATA wsd;
SYSTEM_INFO SystemInfo;
SOCKADDR_IN InternetAddr;
WSAStartup(MAKEWORD(2, 2), &wsd);
//创建完成端口
CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
NULL,
0,
threadcnt);
//得到处理器数量
GetSystemInfo(&SystemInfo);
//经验公式:一般按公式创建工作线程
threadcnt = 2*SystemInfo.dwNumberOfProcessors+2;
m_arrayIOCPThreadHandle = new HANDLE[threadcnt];
for(DWORD i = 0; i < threadcnt; i++)
{
m_arrayIOCPThreadHandle[i] = (HANDLE)_beginthreadex(NULL, 0, (PBEGINTHREADX_THREADFUN)ServerWorkerThread, NULL, 0,
NULL);
if(!m_arrayIOCPThreadHandle[i])
return ;
}
//创建监听socket
Listen = WSASocket(AF_INET,
SOCK_STREAM,
0,
NULL,
0,
WSA_FLAG_OVERLAPPED);
InternetAddr.sin_family = PF_INET;
InternetAddr.sin_port = htons(20000);
InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
//帮定到指定端口
bind(Listen, (SOCKADDR*)&InternetAddr, sizeof(InternetAddr));
//开始监听
listen(Listen, SOMAXCONN);
//完成端口帮定到监听socket
printf(" listen socket = %d\n",Listen);
if (CreateIoCompletionPort((HANDLE) Listen, CompletionPort, (ULONG_PTR)&Listen, threadcnt) == NULL)
{
printf("CreateIoCompletionPort failed with error %d\n", GetLastError());
return ;
}
}
FROM:http://www.vchelp.net/cndevforum/subject_view.asp?subject_id=176818&forum_id=55
作者:
lustskyboy沟通无限
posted on 2007-01-31 14:10
我风 阅读(1122)
评论(2) 编辑 收藏 引用