S.l.e!ep.¢%

像打了激速一样,以四倍的速度运转,开心的工作
简单、开放、平等的公司文化;尊重个性、自由与个人价值;
posts - 1098, comments - 335, trackbacks - 0, articles - 1
  C++博客 :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

封装了IOCP

Posted on 2009-01-31 19:47 S.l.e!ep.¢% 阅读(3278) 评论(8)  编辑 收藏 引用 所属分类: C++
1. 不使用线程池
2.  暂时只封装了 win32 的 icop
3.  暂时还未写测试用例,因为不知道怎么写 -_-!!!

// networksocket.h: interface for the CNetworkSocket class.
//
//////////////////////////////////////////////////////////////////////

#if !defined(NETWORKSOCKET_H__A63ED8A5_7ED1_463D_A0F0_41F6E9C79441__INCLUDED_)
#define NETWORKSOCKET_H__A63ED8A5_7ED1_463D_A0F0_41F6E9C79441__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

#include 
<vector>

#include 
<winsock2.h>
#include 
<process.h>
#pragma  comment(lib, 
"ws2_32.lib")

#define BUFFER_SIZE 4096

enum OPERATION_TYPE
{
    OPERATION_TYPE_RECV 
= 0,
    OPERATION_TYPE_SEND 
= 1
}
;

//IO操作数据
typedef struct _PER_IO_OPERATION_DATA
{
    OVERLAPPED OverLapped;
    WSABUF DataBuf;
    
char szBuf[BUFFER_SIZE];
    OPERATION_TYPE OperationType;      
// 操作类型表示

}
PER_IO_OPERATION_DATA,*PPER_IO_OPERATION_DATA;

class CNetworkSocket  
{
public:
    CNetworkSocket();
    
virtual ~CNetworkSocket();

    
bool init();
    
bool uninit();

    
bool initNetWork(unsigned short nPort);
    
bool unitNetWork();

    
bool send(SOCKET s, const char* pBuf, int nLen);

private:
    SOCKET m_listensock;
    HANDLE m_hIocpPort;
    std::vector
<HANDLE> m_vectorThreadHandle;

    
static unsigned __stdcall _WorkerThreadProc(void* pVoid);    
    
static unsigned __stdcall _AcceptThreadProc(void* pVoid);
}
;

#endif // !defined(NETWORKSOCKET_H__A63ED8A5_7ED1_463D_A0F0_41F6E9C79441__INCLUDED_)

// networksocket.cpp: implementation of the CNetworkSocket class.
//
//////////////////////////////////////////////////////////////////////

#include 
"networksocket.h"

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

CNetworkSocket::CNetworkSocket()
{
    m_listensock 
= INVALID_SOCKET;
    m_hIocpPort  
= INVALID_HANDLE_VALUE;
}


CNetworkSocket::
~CNetworkSocket()
{

}


bool CNetworkSocket::init()
{
    WORD wVersionRequested;
    WSADATA wsaData;
    
int nErrCode = 0;
    
    wVersionRequested 
= MAKEWORD(22);
    
    nErrCode 
= ::WSAStartup(wVersionRequested, &wsaData);
    
if ( nErrCode != 0 ) 
    
{
        
return false;
    }


    
if ( LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2 ) 
    
{
        
bool b = uninit();
        
return false;
    }

    
else
    
{
        
return true;
    }

}


bool CNetworkSocket::uninit()
{
    
int nErrCode = 0;
    nErrCode 
= ::WSACleanup();

    
return true;
}


bool CNetworkSocket::initNetWork(unsigned short nPort)
{
    
int nErrCode = 0;

    
// 创建 socket
    m_listensock = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    
    
if( INVALID_SOCKET == m_listensock )
    
{
        nErrCode 
= ::WSAGetLastError();
        
return false;
    }


    
// 绑定端口
    sockaddr_in InternetAddr;
    InternetAddr.sin_family 
= AF_INET;
    InternetAddr.sin_addr.S_un.S_addr 
= ::htonl(INADDR_ANY);
    InternetAddr.sin_port 
= htons(nPort);

    
if( ::bind( m_listensock, (PSOCKADDR)&InternetAddr, sizeof(InternetAddr)) == SOCKET_ERROR )
    
{
        nErrCode 
= ::WSAGetLastError();
        
return false;
    }


    
// 开始监听
    if(    ::listen(m_listensock, 5== SOCKET_ERROR )
    
{
        nErrCode 
= ::WSAGetLastError();
        
return false;
    }


    
// 创建完成端口
    m_hIocpPort = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 00);
        
    
if( m_hIocpPort == INVALID_HANDLE_VALUE )
    
{
        nErrCode 
= ::WSAGetLastError();
        
return false;
    }


    
// 启动工作线程,线程数为CPU处理器数量*2+2
    SYSTEM_INFO sys_Info;
    ::GetSystemInfo(
&sys_Info);

    
for(int i = 0; i < (int(sys_Info.dwNumberOfProcessors) * 2 + 2); i++)
    
{
        HANDLE ThreadHandle 
= INVALID_HANDLE_VALUE;
        DWORD ThreadID      
= 0;
        
        unsigned 
int nThreadId  =   0 ;
        ThreadHandle 
= (HANDLE)_beginthreadex(NULL, 0, _WorkerThreadProc, (void *)this,  0&nThreadId);
        
        
if ( ThreadHandle == 0 )
            
return false;

        m_vectorThreadHandle.push_back(ThreadHandle);
    }


    
// 启动侦听线程
    HANDLE ThreadHandle = INVALID_HANDLE_VALUE;
    unsigned 
int nThreadId = 0 ;

    ThreadHandle 
= (HANDLE)_beginthreadex(NULL, 0, _AcceptThreadProc, (void *)this,  0&nThreadId);
    
    
if ( ThreadHandle == 0 )
            
return false;

    m_vectorThreadHandle.push_back(ThreadHandle);

    
return true;
}


bool CNetworkSocket::unitNetWork()
{
    
// 启动工作线程,线程数为CPU处理器数量*2+2
    SYSTEM_INFO sys_Info;
    ::GetSystemInfo(
&sys_Info);
    
    
for(int i = 0; i < (int(sys_Info.dwNumberOfProcessors) * 2 + 2); i++)
    
{
        
//寄出退出消息
        ::PostQueuedCompletionStatus(m_hIocpPort,    -1,    -1, NULL);
    }


    std::vector
<HANDLE>::iterator iter_t;
    
for ( iter_t = m_vectorThreadHandle.begin(); iter_t != m_vectorThreadHandle.end(); iter_t++ )
    

        DWORD dwRet 
= ::WaitForSingleObject(*iter_t, INFINITE);
    }


    
//关闭网络的侦听
    ::shutdown(m_listensock, 0);
    ::closesocket(m_listensock);
    
    
return true;
}


unsigned __stdcall CNetworkSocket::_WorkerThreadProc(
void* pVoid)
{
    CNetworkSocket
* pThis = (CNetworkSocket*)pVoid;
    DWORD dwByteTransferred 
= 0;
    unsigned 
long nFlag = 0;
    DWORD RecvByte 
= 0;

    SOCKET ClientSock 
= INVALID_SOCKET;
     PPER_IO_OPERATION_DATA PerIoData;

     
while ( true )
     
{
        BOOL bSuccess 
= ::GetQueuedCompletionStatus(pThis->m_hIocpPort,
                                                       
&dwByteTransferred,
                                                     (LPDWORD)
&ClientSock,
                                                     (LPOVERLAPPED
*)&PerIoData,
                                                     INFINITE);

        
//退出信号到达,退出线程
        if( dwByteTransferred == -1 && PerIoData == NULL )
            
return 1;

        
//客户机已经断开连接或者连接出现错误
        if( dwByteTransferred == 0 && (PerIoData->OperationType == OPERATION_TYPE_RECV || PerIoData->OperationType == OPERATION_TYPE_SEND ) )
        
{
            ::closesocket(ClientSock);
            ::GlobalFree(PerIoData);
            
continue;
        }


        
// 接收完成
        if ( PerIoData->OperationType == OPERATION_TYPE_RECV )
        
{
            
// 处理接收数据
            
// pThis->OnRecv(ClientSock, PerIoData->szBuf, dwByteTransferred);

            
//将源数据置空
            ::memset(PerIoData->szBuf, 0, BUFFER_SIZE);
            dwByteTransferred
=0;

            
//重置IO操作数据
            unsigned long Flag=0;
            ::ZeroMemory(
&(PerIoData->OverLapped), sizeof(OVERLAPPED));
            
            PerIoData
->DataBuf.buf    = PerIoData->szBuf;
            PerIoData
->DataBuf.len    = BUFFER_SIZE;
            PerIoData
->OperationType = OPERATION_TYPE_RECV;

            
// 再投递另一个Recv请求
            ::WSARecv(ClientSock, &(PerIoData->DataBuf), 1&RecvByte, &Flag, &(PerIoData->OverLapped),    NULL);
        }


        
// 发送完成,置空缓冲区,释放缓冲区
        if( PerIoData->OperationType == OPERATION_TYPE_SEND )
        
{
            ::memset(PerIoData, 
0sizeof(PER_IO_OPERATION_DATA));
            ::GlobalFree(PerIoData);
            dwByteTransferred 
= 0;
        }

 
     }
// while ( true )

    
return 0;
}


unsigned __stdcall CNetworkSocket::_AcceptThreadProc(
void* pVoid)
{
    CNetworkSocket
* pThis = (CNetworkSocket*)pVoid;

    SOCKET AcceptSock 
= INVALID_SOCKET;

    
while ( true )
    
{
        AcceptSock 
= ::WSAAccept(pThis->m_listensock, NULL, NULL, NULL, 0);

        
//关联客户端口到完成端口,句柄数据在此时被绑定到完成端口
        ::CreateIoCompletionPort((HANDLE)AcceptSock, pThis->m_hIocpPort, (DWORD)AcceptSock, 0);

        PPER_IO_OPERATION_DATA PerIoData 
= (PPER_IO_OPERATION_DATA)::GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA));
        unsigned 
long nFlag = 0;
        DWORD RecvByte 
= 0;
        ::ZeroMemory(
&(PerIoData->OverLapped),sizeof(OVERLAPPED));
        
        PerIoData
->DataBuf.buf     = PerIoData->szBuf;
        PerIoData
->DataBuf.len     = BUFFER_SIZE;
        PerIoData
->OperationType  = OPERATION_TYPE_RECV;

        
//提交首个接收数据请求
        
//这时
        
//如果客户端断开连接
        
//则也可以以接收数据时得到通知    
        ::WSARecv(AcceptSock, &(PerIoData->DataBuf), 1&RecvByte, &nFlag, &(PerIoData->OverLapped), NULL);

    }
// while ( true )

    
return 0;
}


bool CNetworkSocket::send(SOCKET s, const char* pBuf, int nLen)
{
    
if( s == INVALID_SOCKET || pBuf == NULL || nLen == 0 )
        
return false;
    
    PPER_IO_OPERATION_DATA PerIoData 
= (PPER_IO_OPERATION_DATA)::GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA));
    
    unsigned 
long nFlag = 0;
    DWORD dwSendByte 
= 0;
    ::ZeroMemory(
&(PerIoData->OverLapped), sizeof(OVERLAPPED));
    ::memcpy(PerIoData
->szBuf, pBuf, nLen);

    PerIoData
->DataBuf.buf = PerIoData->szBuf;
    PerIoData
->DataBuf.len = nLen;
    PerIoData
->OperationType  = OPERATION_TYPE_SEND;

    
int nRet = ::WSASend(s, &(PerIoData->DataBuf), 1&dwSendByte, nFlag, &(PerIoData->OverLapped), NULL);

    
if( nRet==SOCKET_ERROR && GetLastError() != WSA_IO_PENDING )
    
{
        
return false;
    }

    
else
    
{
        
return true;
    }

}

Feedback

# re: 封装了IOCP  回复  更多评论   

2009-01-31 19:56 by nn
在send时,重新分配了一个IO对象,这个对象也许不会在你想象中那样被释放了。

在GET中的释放的地方,好像没问题,但如果达到上千个连接,而且每个连接的数据非量密的时候,会发现GET时总是在判断Recv的情况而没有判断Send的情况。

# re: 封装了IOCP  回复  更多评论   

2009-01-31 19:56 by nn
这种情况,send时分配的对象就堆在服务器了。

# re: 封装了IOCP  回复  更多评论   

2009-01-31 19:59 by 苦丁茶
现在我一直在怀疑WSASend后,GET一直没有取得完成的事件。
虽然都说肯定会有完成事件的。

总这,你这个类可以应付小型的网络应用,但无法应付上规模的应用。

# re: 封装了IOCP  回复  更多评论   

2009-01-31 20:37 by 苦丁茶
这个类还有一个问题,
就是当连接断开后,没有清理资源。
还有没有主动检测无效连接的功能。

# re: 封装了IOCP  回复  更多评论   

2009-01-31 20:42 by 85093103
在WinSock上使用IOCP
本文章假设你已经理解WindowsNT的I/O模型以及I/O完成端口(IOCP),并且比较熟悉将要用到的API,如果你打算学习IOCP,请参考Jeffery Richter的Advanced Windows(第三版),第15章I/O设备,里面有极好的关于完成端口的讨论以及对即将使用API的说明。
IOCP提供了一个用于开发高效率和易扩展程序的模型。Winsock2提供了对IOCP的支持,并在WindowsNT平台得到了完整的实现。然而IOCP是所有WindowsNT I/O模型中最难理解和实现的,为了帮助你使用IOCP设计一个更好的Socket服务,本文提供了一些诀窍。
Tip 1:使用Winsock2 IOCP函数例如WSASend和WSARecv,如同Win32文件I/O函数,例如WriteFile和ReadFile。
微软提供的Socket句柄是一个可安装文件系统(IFS)句柄,因此你可以使用Win32的文件I/O函数调用这个句柄,然而,将Socket句柄和文件系统联系起来,你不得不陷入很多的Kernal/User模式转换的问题中,例如线程的上下文转换,花费的代价还包括参数的重新排列导致的性能降低。
因此你应该使用只被Winsock2中IOCP允许的函数来使用IOCP。在ReadFile和WriteFile中会发生的额外的参数重整以及模式转换只会发生在一种情况下,那就是如果句柄的提供者并没有将自己的WSAPROTOCOL_INFO结构中的DwServiceFlags1设置为XP1_IFS_HANDLES。
注解:即使使用WSASend和WSARecv,这些提供者仍然具有不可避免的额外的模式转换,当然ReadFile和WriteFile需要更多的转换。
TIP 2: 确定并发工作线程数量和产生的工作线程总量。
并发工作线程的数量和工作线程的数量并不是同一概念。你可以决定IOCP使用最多2个的并发线程以及包括10个工作线程的线程池。工作线程池拥有的线程多于或者等于并发线程的数量时,工作线程处理队列中一个封包的时候可以调用win32的Wait函数,这样可以无延迟的处理队列中另外的封包。
如果队列中有正在等待被处理的封包,系统将会唤醒一个工作线程处理他,最后,第一个线程确认正在休眠并且可以被再次调用,此时,可调用线程数量会多于IOCP允许的并发线程数量(例如,NumberOFConcurrentThreads)。然而,当下一个线程调用GetQueueCompletionStatus并且进入等待状态,系统不会唤醒他。一般来说,系统会试图保持你设定的并发工作线程数量。
一般来讲,每拥有一个CPU,在IOCP中你可以使用一个并发工作线程,要做到这点,当你第一次初始化IOCP的时候,可以在调用CreateIOCompletionPort的时候将NumberOfConcurrentThreads设置为0。
TIP 3:将一个提交的I/O操作和完成封包的出列联系起来。
当对一个封包进行出列,可以调用GetQueuedCompletionStatus返回一个完成Key和一个复合的结构体给I/O。你可以分别的使用这两个结构体来返回一个句柄和一个I/O操作信息,当你将IOCP提供的句柄信息注册给Socket,那么你可以将注册的Socket句柄当做一个完成Key来使用。为每一个I/O的"extend"操作提供一个包含你的应用程序IO状态信息的复合结构体。当然,必须确定你为每个的I/O提供的是唯一的复合结构体。当I/O完成的时候,会返回一个指向结构体的指针。
TIP 4:I/O完成封包队列的行为
IOCP中完成封包队列的等待次序并不决定于Winsock2 I/O调用产生的顺序。如果一个Winsock2的I/O调用返回了SUCCESS或者IO_PENDING,那么他保证当I/O操作完成后,完成封包会进入IOCP的等待队列,而不管Socket句柄是否已经关闭。如果你关闭了socket句柄,那么将来调用WSASend,WSASendTo,WSARecv和WSARecvFrom会失败并返回一个不同于SUCCES或者IO_PENDING的代码,这时将不会产生一个完成封包。而在这种情况下,前一次使用GetQueuedCompletionStatus提交的I/O操作所得到的完成封包,会显示一个失败的信息。
如果你删除了IOCP本身,那么不会有任何I/O请求发送给IOCP,因为IOCP的句柄已经不可用,尽管系统底层的IOCP核心结构并不会在所有已提交I/O请求完成之前被移除。
TIP5:IOCP的清除
很重要的一件事是使用复合I/O时候的IOCP清除:如果一个I/O操作尚未完成,那么千万不要释放该操作创建的复合结构体。HasOverlappedIoCompleted函数可以帮助你检查一个I/O操作是否已经完成。
关闭服务一般有两种情况,第一种你并不关心尚未结束的I/O操作的完成状态,你只希望尽可能快的关闭他。第二种,你打算关闭服务,但是你需要获知未结束I/O操作的完成状态。
第一种情况你可以调用PostQueueCompletionStatus(N次,N等于你的工作线程数量)来提交一个特殊的完成封包,他通知所有的工作线程立即退出,关闭所有socket句柄和他们关联的复合结构体,然后关闭完成端口(IOCP)。在关闭复合结构体之前使用HasOverlappedIOCompleted检查他的完成状态。如果一个socket关闭了,所有基于他的未结束的I/O操作会很快的完成。
在第二种情况,你可以延迟工作线程的退出来保证所有的完成封包可以被适当的出列。你可以首先关闭所有的socket句柄和IOCP。可是,你需要维护一个未完成I/O的数字,以便你的线程可以知道可以安全退出的时间。尽管当队列中有很多完成封包在等待的时候,活动的工作线程不能立即退出,但是在IOCP服务中使用全局I/O计数器并且使用临界区保护他的代价并不会象你想象的那样昂贵。

# re: 封装了IOCP  回复  更多评论   

2009-01-31 20:42 by 85093103
看看iocp的清除。

# re: 封装了IOCP  回复  更多评论   

2009-01-31 21:00 by 苦丁茶
因为我发现当大理recv后,就没机会处理send的完成事件了。
如果在recv有同步或有复杂的逻辑,比如recv后需要send几个,此时可能产生死锁,以致永远没机会get到send的完成事件。
而如果IOCP本身就能将这两个事件用两个不同的GET分开,那业务逻辑就也好处理一些。

# re: 封装了IOCP  回复  更多评论   

2009-04-12 15:30 by cbm
@苦丁茶
我也在写iocp,对于你的评论,我认为非常正确,但是到现在为止,还没有找到一个好的办法处理send的完成事件。你找到了吗

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理