select模型
利用select函数,判断套接字上是否存在数据,或者能否向一个套接字写入数据。
目的是防止应用程序在套接字处于锁定模式时,调用recv(或send)从没有数据的套接字上接收数据,被迫进入阻塞状态。
select参数和返回值意义如下:
int select (
IN int nfds, //0,无意义
IN OUT fd_set* readfds, //检查可读性
IN OUT fd_set* writefds, //检查可写性
IN OUT fd_set* exceptfds, //例外数据
IN const struct timeval* timeout); //函数的返回时间
struct timeval {
long tv_sec; //秒
long tv_usec; //毫秒
};
select返回fd_set中可用的套接字个数。
fd_set是一个SOCKET队列,以下宏可以对该队列进行操作:
FD_CLR( s, *set) 从队列set删除句柄s;
FD_ISSET( s, *set) 检查句柄s是否存在与队列set中;
FD_SET( s, *set )把句柄s添加到队列set中;
FD_ZERO( *set ) 把set队列初始化成空队列.
如果想要检查一个套接字是否有数据需要接收,可以用FD_SET宏把套接接字句柄加入可读性检查队列中,然后调用select。
如果该套接字没有数据需要接收,select函数会把该套接字从可读性检查队列中删除掉,然后利用FD_ISSET宏只要检查该套接字句柄是否还存在于可读性队列中,就可以知道到底有没有数据需要接收了。
WSADATA wd;
::WSAStartup(MAKEWORD(2,2), &wd);
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(5000);
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
SOCKET s = ::socket(AF_INET, SOCK_DGRAM, 0);
::bind(s, (const sockaddr*)&addr, sizeof(addr));
sockaddr_in client_addr;
int fromlen = sizeof(sockaddr_in);
fd_set fs; //定义fd_set队列
timeval tm; //timeval时间结构体
tm.tv_sec = 3; //3秒
tm.tv_usec = 0;
//收数据
while(1)
{
FD_ZERO(&fs); //清空队列
FD_SET(s, &fs); //将套接字放入队列
int i = ::select(0, &fs, NULL, NULL, &tm);
cout<<i<<endl; //输出可用套接字个数
if(0 == i)
continue; //无可读数据的套接字,继续循环
char cBuffer[1024] = {0};
::recvfrom(s, cBuffer, 1024, 0, (sockaddr*)&client_addr, &fromlen);
cout<<inet_ntoa(client_addr.sin_addr)<<endl<<cBuffer<<endl;
}
::closesocket(s);
::WSACleanup();
WSAAsyncSelect模型
通过WSAAsyncSelect函数设定当指定套接字收到请求时,向指定窗口发送指定的消息,自动将套接口设置为非阻塞模式,需要创建窗口接收消息。
int WSAAsyncSelect ( SOCKET s, HWND hWnd, unsigned int wMsg, long lEvent );
s 标识一个需要事件通知的套接口的描述符.
hWnd 标识一个在网络事件发生时需要接收消息的窗口句柄.
wMsg 在网络事件发生时要接收的消息.
lEvent 位屏蔽码,用于指明应用程序感兴趣的网络事件集合.
FD_READ 欲接收读准备好的通知.
FD_WRITE 欲接收写准备好的通知.
FD_OOB 欲接收带边数据到达的通知.
FD_ACCEPT 欲接收将要连接的通知.
FD_CONNECT 欲接收已连接好的通知.
FD_CLOSE 欲接收套接口关闭的通知.
#include <Windows.h>
#include <iostream>
#include "tchar.h"
#include <process.h>
#include <iostream>
#pragma comment (lib, "Ws2_32.lib")
using namespace std;
//定义消息
#define WM_MYSOCK WM_USER+1
LRESULT CALLBACK WndProc(HWND, UINT, WPARAM, LPARAM);
SOCKET NonConnect(HWND hwnd)
{
WSADATA wd;
::WSAStartup(MAKEWORD(2,2), &wd);
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(5000);
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
SOCKET s = ::socket(AF_INET, SOCK_DGRAM, 0);
::bind(s, (const sockaddr*)&addr, sizeof(addr));
sockaddr_in client_addr;
int fromlen = sizeof(sockaddr_in);
//当s中有可读数据时,向hwnd发送消息WM_MYSOCK
::WSAAsyncSelect(s, hwnd, WM_MYSOCK, FD_READ);
return s;
}
int WINAPI WinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPSTR lpCmdLine, int nShowCmd)
{
const TCHAR wcAppName[] = L"CharConversion";
WNDCLASS wndclass;
wndclass.style = CS_HREDRAW | CS_VREDRAW ;
wndclass.lpfnWndProc = WndProc ;
wndclass.cbClsExtra = 0 ;
wndclass.cbWndExtra = 0 ;
wndclass.hInstance = hInstance ;
wndclass.hIcon = LoadIcon (NULL, IDI_APPLICATION) ;
wndclass.hCursor = LoadCursor (NULL, IDC_ARROW) ;
wndclass.hbrBackground = (HBRUSH) GetStockObject (WHITE_BRUSH) ;
wndclass.lpszMenuName = NULL ;
wndclass.lpszClassName = wcAppName ;
if (!RegisterClass (&wndclass))
{
MessageBox ( NULL, TEXT ("This program requires Windows NT!"),
wcAppName, MB_ICONERROR) ;
return 0 ;
}
HWND hwnd;
hwnd = CreateWindow (wcAppName, L"TestCharacters",
WS_OVERLAPPEDWINDOW,
CW_USEDEFAULT, CW_USEDEFAULT,
300, 200,
NULL, NULL, hInstance, NULL) ;
ShowWindow (hwnd, nShowCmd) ;
UpdateWindow (hwnd) ;
//创建socket
SOCKET s = NonConnect(hwnd);
MSG msg;
while (GetMessage (&msg, NULL, 0, 0))
{
TranslateMessage (&msg) ;
DispatchMessage (&msg) ;
}
//收尾工作
::closesocket(s);
::WSACleanup();
return (int)msg.wParam ;
}
LRESULT CALLBACK WndProc(HWND hwnd, UINT msg, WPARAM wParam, LPARAM lParam)
{
switch(msg)
{
case WM_CREATE:
return 0;
case WM_PAINT:
LPPAINTSTRUCT ps;
BeginPaint (hwnd, ps) ;
EndPaint (hwnd, ps) ;
return 0;
case WM_MYSOCK:
//处理消息
PostQuitMessage(0) ;
return 0;
case WM_DESTROY:
PostQuitMessage(0) ;
return 0 ;
}
return DefWindowProc (hwnd, msg, wParam, lParam) ;
}
WSAEventSelect模型
与WSAAsyncSelect模型类似,因为它也接收FD_XXX类型的网络事件。
1、利用WSACreateEvent创建event。
2、利用WSAEventSelect关联套接字和event,并指定接收的网络事件类型。
3、将event和套接字放入分别放入数组中,利用WSAWaitForMultipleEvents等待event数组中的event受信,返回受信event索引。
4、利用WSAEnumNetworkEvents读取对应socket信息,并重置对应event。
5、从上一步的信息中判断网络事件,进行操作。
WSACreateEvent创建的event,初始为非受信,人工重置。
数组最多可容纳WSA_MAXIMUM_WAIT_EVENTS(64)个socket或event。
WSAWaitForMultipleEvents(
IN DWORD cEvents, //数组元素个数
IN const WSAEVENT FAR * lphEvents, //event数组
IN BOOL fWaitAll, //FALSE:有一个event受信就返回,TRUE:全部event受信才返回
IN DWORD dwTimeout, //timeout,单位为毫秒,INFINITE表示无限
IN BOOL fAlertable //FALSE
);
WSAEnumNetworkEvents第二个参数可重置event至非受信状态,也可不传入,用WSAResetEvent设置。
WSANETWORKEVENTS结构体中,lNetworkEvents表示网络事件类型,iErrorCode是一个错误代码数组,例如FD_READ对应的错误代码是iErrorCode[FD_READ_BIT],若该值为非0,则FD_READ出错。
WSADATA wd;
::WSAStartup(MAKEWORD(2,2), &wd);
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(5000);
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
SOCKET s = ::socket(AF_INET, SOCK_DGRAM, 0);
::bind(s, (const sockaddr*)&addr, sizeof(addr));
sockaddr_in client_addr;
int fromlen = sizeof(sockaddr_in);
//创建Event,类型为人工重置,初始状态为未受信
WSAEVENT wEvent = ::WSACreateEvent();
//创建套接字数组和event数组,并将以创建的套接字和event放入数组
SOCKET socketArray[WSA_MAXIMUM_WAIT_EVENTS] = {NULL};
WSAEVENT eventArray[WSA_MAXIMUM_WAIT_EVENTS] = {NULL};
DWORD dwSocketNum = 0;
socketArray[dwSocketNum] = s;
eventArray[dwSocketNum] = wEvent;
//关联套接字和event
::WSAEventSelect(s, wEvent, FD_READ);
++dwSocketNum;
while(1)
{
//等待event数组中的event受信
DWORD dwIndex = ::WSAWaitForMultipleEvents(dwSocketNum, eventArray, FALSE, INFINITE, FALSE);
//受信后读取对应socket的信息,并重置对应event
WSANETWORKEVENTS wNetworkEvent;
::WSAEnumNetworkEvents(socketArray[dwIndex], eventArray[dwIndex], &wNetworkEvent);
//若网络事件匹配,则收数据
if((FD_READ == wNetworkEvent.lNetworkEvents) && (0 == wNetworkEvent.iErrorCode[FD_READ_BIT]))
{
char cBuffer[1024] = {0};
::recvfrom(s, cBuffer, 1024, 0, (sockaddr*)&client_addr, &fromlen);
cout<<inet_ntoa(client_addr.sin_addr)<<endl<<cBuffer<<endl;
}
}
//关闭event句柄
::WSACloseEvent(wEvent);
::closesocket(s);
::WSACleanup();
OverLapped事件模型
1、创建SOCKET.
2、创建event,与WSAOVERLAPPED结构体成员关联。
3、创建buffer,与WSABUF关联。
4、使用类似WSARecvFrom这样的函数,传入WSAOVERLAPPED结构体,函数立即返回。若IO完成,则指定WSAOVERLAPPED中的event状态变为受信。
5、使用WSAWaitForMultipleEvents等待event状态,若受信则表示IO完成,用WSAResetEvent手动设为非受信。
6、使用WSAGetOverlappedResult判断IO完成状态。
为了使用重叠模型,用WSASocket创建SOCKET的时候需要指定WSA_FLAG_OVERLAPPED,用socket()创建会默认指定该标志。
使用重叠模型时,需要使用此类函数,例如WSASend 、WSARecv ,传入WSAOVERLAPPED结构体参数,并且这些函数会立即返回,即使SOCKET为阻塞类型也不会阻塞。如果是OverLapped事件模型,最后一个参数为NULL。
WSAOVERLAPPED结构体的hEvent成员需要与event关联
WSABUF结构体的buf成员为数据buffer,len成员为buffer大小。
WSAGetOverlappedResult(
IN SOCKET s, //SOCKET
IN LPWSAOVERLAPPED lpOverlapped, //WSAOVERLAPPED
OUT LPDWORD lpcbTransfer, //传输字节数,若为0,在面向连接的socket中,表示对方已断开连接
IN BOOL fWait, //TRUE:除非重叠操作完成,否则函数不会返回;FALSE:如果IO操作未结束返回FALSE,否则返回TRUE
OUT LPDWORD lpdwFlags
);
重叠I/O模型也允许应用程序以一种重叠方式,实现对连接的接受。具体的做法是在监听套接字上调用AcceptEx函数。
//面向连接的Socket
WSADATA wd;
::WSAStartup(MAKEWORD(2,2), &wd);
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(1555);
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
SOCKET s = ::socket(AF_INET, SOCK_STREAM, 0);
::bind(s, (const sockaddr*)&addr, sizeof(addr));
::listen(s, 2);
sockaddr_in client_addr;
SOCKET ns = ::accept(s, (sockaddr*)&client_addr, NULL);
int fromlen = sizeof(sockaddr_in);
//创建event数组,将event放入数组
WSAEVENT wEvent = ::WSACreateEvent();
WSAEVENT eventArray[WSA_MAXIMUM_WAIT_EVENTS] = {NULL};
DWORD dwEventNum = 0;
eventArray[dwEventNum] = wEvent;
++dwEventNum;
//创建WSAOVERLAPPED结构体,与event关联
WSAOVERLAPPED wOverLapped;
ZeroMemory(&wOverLapped, sizeof(WSAOVERLAPPED));
wOverLapped.hEvent = wEvent;
//提供接收数据的buffer
WSABUF wBuffer;
char cBuffer[1024] = {0};
wBuffer.buf = cBuffer;
wBuffer.len = 1024;
DWORD dwRecv = 0; //接收的数据字节
DWORD dwFlag = 0; //设为0的参数
while(1)
{
//指定SOCKET,WSABUF,WSAOVERLAPPED,立即反回;若IO完成,则指定WSAOVERLAPPED中的event受信
::WSARecvFrom(ns, &wBuffer, 1, &dwRecv, &dwFlag, (sockaddr*)&client_addr, &fromlen, &wOverLapped, NULL);
//等待event数组中的event受信
DWORD dwIndex = ::WSAWaitForMultipleEvents(dwEventNum, eventArray, FALSE, INFINITE, FALSE);
//将受信的event手动设为非受信
::WSAResetEvent(eventArray[dwIndex]);
DWORD dwResult = 0; //传送的字节数
::WSAGetOverlappedResult(ns, &wOverLapped, &dwResult, FALSE, &dwFlag);
if(0 == dwResult)
{
//若为0,在面向连接的socket中,表示对方已断开连接
break;
}
//输出收到的数据
cout<<inet_ntoa(client_addr.sin_addr)<<endl<<cBuffer<<endl;
}
//关闭event句柄
::WSACloseEvent(wEvent);
::closesocket(ns);
::closesocket(s);
::WSACleanup();
OverLapped完成例程模型
与重叠事件模型不同的是,事件模型在IO结束后使event变为受信,而这里是调用一个回调函数。利用SleepEx等待回调函数结束。
1、创建SOCKET.
2、创建WSAOVERLAPPED结构体。
3、创建buffer,与WSABUF关联。
4、定义WSAOVERLAPPED_COMPLETION_ROUTINE回调函数。
5、使用类似WSARecvFrom这样的函数,传入WSAOVERLAPPED结构体和回调函数,函数立即返回。若IO完成,则指定的回调函数被调用。
6、使用SleepEx等待回调函数结束。
typedef
void
(CALLBACK * LPWSAOVERLAPPED_COMPLETION_ROUTINE)(
IN DWORD dwError, //错误代码,非0即出错
IN DWORD cbTransferred, //传送字节数,面向连接时为0表示对方断开连接
IN LPWSAOVERLAPPED lpOverlapped,
IN DWORD dwFlags
);
利用SleepEx等待回调函数结束时,第二个参数为TRUE;也可用WSAWaitForMultipleEvents等待,将最后一个参数设为TRUE,需要创建event对象,只作为参数输入,没有实际意义。
//回调函数
void CALLBACK MyCompletionRoutine(
IN DWORD dwError, //错误代码
IN DWORD cbTransferred, //传输的字节
IN LPWSAOVERLAPPED lpOverlapped,
IN DWORD dwFlags)
{
if((0 != dwError) || (0 == cbTransferred))
{
cout<<"disconnect"<<endl;
exit(1);
}
}
//面向连接的Socket
WSADATA wd;
::WSAStartup(MAKEWORD(2,2), &wd);
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(1555);
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
SOCKET s = ::socket(AF_INET, SOCK_STREAM, 0);
::bind(s, (const sockaddr*)&addr, sizeof(addr));
::listen(s, 2);
sockaddr_in client_addr;
SOCKET ns = ::accept(s, (sockaddr*)&client_addr, NULL);
int fromlen = sizeof(sockaddr_in);
//创建WSAOVERLAPPED结构体,只作为参数输入
WSAOVERLAPPED wOverLapped;
ZeroMemory(&wOverLapped, sizeof(WSAOVERLAPPED));
//提供接收数据的buffer
WSABUF wBuffer;
char cBuffer[1024] = {0};
wBuffer.buf = cBuffer;
wBuffer.len = 1024;
DWORD dwRecv = 0; //接收的数据字节
DWORD dwFlag = 0; //设为0的参数
while(1)
{
//指定SOCKET,WSABUF,WSAOVERLAPPED和回调函数,立即反回;若IO完成,调用回调函数
::WSARecvFrom(ns, &wBuffer, 1, &dwRecv, &dwFlag, (sockaddr*)&client_addr, &fromlen, &wOverLapped, MyCompletionRoutine);
//等待回调函数结束
DWORD dwIndex = ::SleepEx(INFINITE, TRUE);
if(WAIT_IO_COMPLETION == dwIndex)
{
//返回值正常
cout<<"finished"<<endl;
}
else
{
cout<<"error"<<endl;
}
//输出收到的数据
cout<<inet_ntoa(client_addr.sin_addr)<<endl<<cBuffer<<endl;
}
::closesocket(ns);
::closesocket(s);
::WSACleanup();
}
完成端口模型(IOCP)
主线程:
1、使用CreateIoCompletionPort创建一个CompletionPort。
2、创建工作线程(线程数一般为CPU数*2+2,使用GetSystemInfo获得CPU数),每个线程传入CompletionPort参数。
3、服务器端使用accept等待连接,使用CreateIoCompletionPort将连接创建的SOCKET与CompletionPort关联。
4、在堆中创建自定义的OverLapped扩展结构体对象。
5、投递一个异步IO请求。
6、主线程重复步骤3-5。
工作线程:
1、使用GetQueuedCompletionStatus从CompletionPort队列中获得一个完成包(一个完成包表示一次IO操作的完成),若队列为空,则阻塞。
2、如果完成包表示客户端关闭连接,则服务器端关闭SOCKET,释放堆空间;否则从完成包中找到buffer地址,输出收到的数据。
3、再投递一个异步IO请求。
4、工作线程重复步骤1-3。
CreateIoCompletionPort:
在创建CompletionPort时,只需指定最后一个参数,最后一个参数定义了在一个完成端口上,同时允许执行的线程数量,若将该参数设为0,表明系统内安装了多少个处理器,便允许同时运行多少个线程,一般为CPU数*2+2。
在关联CompletionPort与SOCKET时,第一个参数指定了SOCKET,第二个参数指定CompletionPort,第三个参数可传入SOCKET,以便在GetQueuedCompletionStatus时获得该SOCKET。
主线程投递异步IO操作传入的OverLapped地址起到索引作用,工作线程中GetQueuedCompletionStatus传入OverLapped指针的地址,这样该指针就指向主线程中传入的OverLapped;所以自定义一个扩展结构体,在堆中创建对象,在主线程中传入其地址,那么在工作线程中就可以得到该对象的地址,从而访问OverLapped后面自定义的成员,例如buffer。
GetQueuedCompletionStatus:
第一个参数指定CompletionPort。
第二个参数是该次IO操作传输的字节数,0表示对方关闭连接。
第三个参数是该次IO操作对应的SOCKET,(CreateIoCompletionPort关联时第三个参数传入的SOCKET)。
第四个参数是投递异步IO操作时传入的OVERLAPPED,在这里用它来获得堆中创建的扩展结构体对象的地址。
第五个参数设置超时的时间,INFINITE表示无限等待。
//OverLapped结构体扩展
struct OVERLAPPEDPLUS
{
OVERLAPPED ol;
char buf[1024]; //在SOCKET上收数据的buffer
DWORD recvd; //收到的字节数
};
//线程函数声明
DWORD CALLBACK ServerWorkerThread(void* CompletionPortID) ;
void Connect()
{
//面向连接的Socket
WSADATA wd;
::WSAStartup(MAKEWORD(2,2), &wd);
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(1555);
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
SOCKET s = ::socket(AF_INET, SOCK_STREAM, 0);
::bind(s, (const sockaddr*)&addr, sizeof(addr));
::listen(s, 2);
//创建完成端口
//最后一个参数定义了在一个完成端口上,同时允许执行的线程数量
//若将该参数设为0,表明系统内安装了多少个处理器,便允许同时运行多少个线程
HANDLE CompetionPort = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
//获得CPU数,创建对应数量的线程
SYSTEM_INFO SystenInfo;
::GetSystemInfo(&SystenInfo);
for(DWORD i=0; i<SystenInfo.dwNumberOfProcessors; ++i)
{
DWORD ThreadID = 0;
//创建线程,传入参数为CompetionPort
HANDLE ThreadHandle = ::CreateThread(NULL,0,ServerWorkerThread,CompetionPort,0,&ThreadID);
::CloseHandle(ThreadHandle);
}
while(TRUE)
{
sockaddr_in client_addr;
int fromlen = sizeof(sockaddr_in);
SOCKET ns = ::accept(s, (sockaddr*)&client_addr, NULL);
cout<<"Client Connected!"<<endl;
//把链接的SOCKET与CompetionPort相关联
//第三个参数可传入SOCKET,第四个参数忽略
::CreateIoCompletionPort((HANDLE)ns,CompetionPort,(DWORD)ns,0);
//在堆中创建OverLapped扩展结构体的对象
OVERLAPPEDPLUS* olp = (OVERLAPPEDPLUS*)::GlobalAlloc(GPTR, sizeof(OVERLAPPEDPLUS));
//提供接收数据的buffer
WSABUF wBuffer;
wBuffer.buf = olp->buf;
wBuffer.len = 1024;
DWORD dwFlag = 0; //设为0的参数
//在指定SOCKET上投递一个异步的IO请求
::WSARecvFrom(ns, &wBuffer, 1, &(olp->recvd), &dwFlag, (sockaddr*)&client_addr, &fromlen, &(olp->ol), NULL);
}
::closesocket(s);
::WSACleanup();
}
//工作者线程
DWORD CALLBACK ServerWorkerThread(void* CompletionPortID)
{
HANDLE CompletionPort = (HANDLE) CompletionPortID;
DWORD BytesTransferred = 0;
SOCKET mySock;
OVERLAPPEDPLUS* polp;
while(TRUE)
{
//从指定的CP队列中取出一个完成包,一个完成包表示一次IO操作的完成;若队列为空则阻塞
//第二个参数是该次IO操作传输的字节数
//第三个参数是该次IO操作对应的SOCKET,(CreateIoCompletionPort关联时第三个参数传入的SOCKET)
//第四个参数是投递异步IO操作时传入的OVERLAPPED,在这里用它来获得堆中创建的扩展结构体对象的地址
::GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&mySock, (OVERLAPPED**)&polp, INFINITE);
if (BytesTransferred == 0)
{
//如果客户端关闭连接,则服务器端关闭SOCKET,释放申请的堆内存空间
::closesocket(mySock);
::GlobalFree(polp);
cout<<"Client Disconnected!"<<endl;
continue; //不要关闭工作者线程
}
//输出收到的数据
cout<<polp->buf<<endl;
//再投递一个异步的WSARecv请求
WSABUF wBuffer;
wBuffer.buf = polp->buf;
wBuffer.len = 1024;
DWORD dwFlag = 0;
::WSARecv(mySock, &wBuffer, 1, &(polp->recvd), &dwFlag, &(polp->ol), NULL);
}
return 0;
}
int main()
{
Connect();
return 0;
}