本文简单介绍了当前Windows支持的各种Socket I/O模型,如果你发现其中存在什么错误请务必赐教。
一:select模型
二:WSAAsyncSelect模型
三:WSAEventSelect模型
四:Overlapped I/O 事件通知模型
五:Overlapped I/O 完成例程模型
六:IOCP模型
老陈有一个在外地工作的女儿,不能经常回来,老陈和她通过信件联系。他们的信会被邮递员投递到他们的信箱里。
这和Socket模型非常类似。下面我就以老陈接收信件为例讲解Socket I/O模型~~~
一:Select模型
老陈非常想看到女儿的信。以至于他每隔10分钟就下楼检查信箱,看是否有女儿的信~~~~~
在这种情况下,“下楼检查信箱”然后回到楼上耽误了老陈太多的时间,以至于老陈无法做其他工作。
select模型和老陈的这种情况非常相似:周而复始地去检查......如果有数据......接收/发送.......
使用线程来select应该是通用的做法:
procedure TListenThread.Execute;
var
addr : TSockAddrIn;
fd_read : TFDSet;
timeout : TTimeVal;
ASock,
MainSock : TSocket;
len, i : Integer;
begin
MainSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP );
addr.sin_family := AF_INET;
addr.sin_port := htons(5678);
addr.sin_addr.S_addr := htonl(INADDR_ANY);
bind( MainSock, @addr, sizeof(addr) );
listen( MainSock, 5 );
while (not Terminated) do
begin
FD_ZERO( fd_read );
FD_SET( MainSock, fd_read );
timeout.tv_sec := 0;
timeout.tv_usec := 500;
if select( 0, @fd_read, nil, nil, @timeout ) > 0 then //至少有1个等待Accept的connection
begin
if FD_ISSET( MainSock, fd_read ) then
begin
for i:=0 to fd_read.fd_count-1 do //注意,fd_count <= 64,也就是说select只能同时管理最多64个连接
begin
len := sizeof(addr);
ASock := accept( MainSock, addr, len );
if ASock <> INVALID_SOCKET then
....//为ASock创建一个新的线程,在新的线程中再不停地select
end;
end;
end;
end; //while (not self.Terminated)
shutdown( MainSock, SD_BOTH );
closesocket( MainSock );
end;
select模型
select已经是老掉牙的东西了,windows下很少用了,不过既然叫“全接触”,还是写出来吧!!!
首先创建一个listen线程(thrListen)负责监听远程机器的连接请求, 和远程机器建立连接后,为此连接专门创建一个线程(thrReadWrite)进行read/write。 注意,要使用“临界区”保证线程对共享数据的安全访问。
代码很简单,不多说了~~~~~~~~~~~~~~~~~~~~~~~~
unit thrListen;
interface
uses Windows, Classes, SysUtils, Winsock2, thrReadWrite;
type YConnection = record thrRW : TRWThread; hsock : TSocket; dwIP : DWORD; dwPort : DWORD; end; PConnection = ^YConnection;
type TListenThread = class(TThread) private { Private declarations } FSock : TSocket; //主socket FList : TList; //客户连接线程列表 protected procedure Execute; override; end;
implementation
uses frmMain;
{ TListenThread }
procedure TListenThread.Execute; var addr : TSockAddrIn; fd_read : TFDSet; timeout : TTimeVal; AConnect : PConnection; len, i : Integer; begin FList:= TList.Create;
FSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP );
addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
bind( FSock, @addr, sizeof(SOCKADDR) ); listen( FSock, 5 );//正在等待连接的最大队列长度5
while (not Terminated) do begin FD_ZERO( fd_read ); FD_SET( FSock, fd_read );
timeout.tv_sec := 0; timeout.tv_usec := 500;
if select( 0, @fd_read, nil, nil, @timeout ) > 0 then //至少有1个等待Accept的connection begin if FD_ISSET( FSock, fd_read ) then begin for i:=0 to fd_read.fd_count-1 do //注意,fd_count <= FD_SETSIZE(64) begin New( AConnect ); len := sizeof(addr); AConnect^.hsock := accept( FSock, addr, len ); if AConnect^.hsock <> INVALID_SOCKET then begin AConnect^.dwIP := ntohl( addr.sin_addr.S_addr ); AConnect^.dwPort := ntohs( addr.sin_port ); AConnect^.thrRW := TRWThread.Create( True ); with AConnect^.thrRW do begin m_sock := AConnect^.hsock; m_ip := AConnect^.dwIP; m_port := AConnect^.dwPort; m_itemid := AConnect; FreeOnTerminate := True; Resume; end;
//修改客户连接列表 FList.Add( AConnect ); len := FList.Count; end else begin len := WSAGetLastError(); MessageBox( 0, PChar(IntToStr(len)), 'accept error', MB_ICONERROR ); Dispose( AConnect ); end; end; //for i:=0 to fd_read.fd_count-1 end; //if FD_ISSET( m_sock, fd_read ) end; //if ret > 0
end; //while (not self.Terminated)
shutdown( FSock, SD_BOTH ); closesocket( FSock );
//结束所有维护客户端连接的线程 if FList.Count > 0 then begin for i:=0 to FList.Count-1 do begin PConnection(FList.Items[i])^.thrRW.Terminate; shutdown( PConnection(FList.Items[i])^.hsock, SD_BOTH ); closesocket( PConnection(FList.Items[i])^.hsock ); Dispose(FList.Items[i]); end; end;
FList.Free; end;
end.
unit thrReadWrite;
interface
uses Windows, Classes, SysUtils, Winsock2;
const PACK_SIZE_RECEIVE = 4096;
type TRWThread = class(TThread) public m_sock : THandle; m_ip : DWORD; m_port : DWORD; m_itemid : Pointer; private FRecvBuf : Array [0..PACK_SIZE_RECEIVE-1] of Char; protected procedure Execute; override; end;
implementation
uses frmMain;
{ TRWThread }
procedure TRWThread.Execute; var sTitle : String; fd_read : TFDSet; timeout : TTimeVal; ret : Integer; begin sTitle := inet_ntoa( TInAddr(htonl(m_ip)) ); sTitle := 'IP: ' + sTitle + ' Port: ' + IntToStr(m_port) + ' Msg: ';
while (not self.Terminated) do begin FD_ZERO( fd_read ); FD_SET( m_sock, fd_read ); timeout.tv_sec := 0; timeout.tv_usec := 500;
ret := select( 0, @fd_Read, nil, nil, @timeout ); if ret = SOCKET_ERROR then begin MessageBox( 0, 'Call select() failed.', 'Error', MB_ICONERROR ); Exit; end;
if ret > 0 then begin if FD_ISSET( m_sock, fd_read ) then begin FillChar( FRecvBuf[0], PACK_SIZE_RECEIVE, 0 ); ret := recv( m_sock, FRecvBuf[0], PACK_SIZE_RECEIVE, 0 );
if (ret=0) or (ret=SOCKET_ERROR) then begin closesocket( m_sock ); Exit; end;
EnterCriticalSection( gCSListBox ); fmMain.ListBox1.Items.Add( sTitle + FRecvBuf ); LeaveCriticalSection( gCSListBox ); end; end; //if ret > 0
end; //while (not self.Terminated)
closesocket( m_sock ); end;
end. |
二:WSAAsyncSelect模型
后来,老陈使用了微软公司的新式信箱。这种信箱非常先进,一旦信箱里有新的信件,盖茨就会给老陈打电话:喂,大爷,你有新的信件了!从此,老陈再也不必频繁上下楼检查信箱了,牙也不疼了,你瞅准了,蓝天......不是,微软~~~~~~~~
微软提供的WSAAsyncSelect模型就是这个意思。
WSAAsyncSelect模型是Windows下最简单易用的一种Socket I/O模型。使用这种模型时,Windows会把网络事件以消息的形势通知应用程序。
首先定义一个消息标示常量:
const WM_SOCKET = WM_USER + 55;
再在主Form的private域添加一个处理此消息的函数声明:
private
procedure WMSocket(var Msg: TMessage); message WM_SOCKET;
然后就可以使用WSAAsyncSelect了:
var
addr : TSockAddr;
sock : TSocket;
sock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP );
addr.sin_family := AF_INET;
addr.sin_port := htons(5678);
addr.sin_addr.S_addr := htonl(INADDR_ANY);
bind( m_sock, @addr, sizeof(SOCKADDR) );
WSAAsyncSelect( m_sock, Handle, WM_SOCKET, FD_ACCEPT or FD_CLOSE );
listen( m_sock, 5 );
....
应用程序可以对收到WM_SOCKET消息进行分析,判断是哪一个socket产生了网络事件以及事件类型:
procedure TfmMain.WMSocket(var Msg: TMessage);
var
sock : TSocket;
addr : TSockAddrIn;
addrlen : Integer;
buf : Array [0..4095] of Char;
begin
//Msg的WParam是产生了网络事件的socket句柄,LParam则包含了事件类型
case WSAGetSelectEvent( Msg.LParam ) of
FD_ACCEPT :
begin
addrlen := sizeof(addr);
sock := accept( Msg.WParam, addr, addrlen );
if sock <> INVALID_SOCKET then
WSAAsyncSelect( sock, Handle, WM_SOCKET, FD_READ or FD_WRITE or FD_CLOSE );
end;
FD_CLOSE : closesocket( Msg.WParam );
FD_READ : recv( Msg.WParam, buf[0], 4096, 0 );
FD_WRITE : ;
end;
end;
1。WSAAsyncSelect模型
这个很简单,贴个源码了事。。。。。。。。。。。。
unit frmMain;
interface
uses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, Winsock2, StdCtrls, ComCtrls;
const LISTEN_PORT = 5005; WM_SOCKET = WM_USER + 55;
type TfmMain = class(TForm) btnStart: TButton; btnStop: TButton; ListBox1: TListBox; StatusBar1: TStatusBar;
procedure FormCreate(Sender: TObject); procedure FormClose(Sender: TObject; var Action: TCloseAction); procedure btnStartClick(Sender: TObject); procedure btnStopClick(Sender: TObject);
private { Private declarations } procedure WMSocket(var Msg: TMessage); message WM_SOCKET; procedure SendBuf( hsock: TSocket ); procedure RecvBuf( hsock: TSocket ); public { Public declarations } m_sock : TSocket; //主socket m_connect_list : TList; //客户连接列表 end;
var fmMain : TfmMain;
implementation
{$R *.dfm}
procedure TfmMain.WMSocket(var Msg: TMessage); var s : TSocket; addr : TSockAddrIn; addrlen : Integer; begin case WSAGetSelectEvent( Msg.LParam ) of FD_ACCEPT : begin addrlen := sizeof(addr); s := accept( m_sock, addr, addrlen ); if s <> INVALID_SOCKET then begin WSAAsyncSelect( s, Handle, WM_SOCKET, FD_READ or FD_WRITE or FD_CLOSE ); m_connect_list.Add( Pointer(s) ); StatusBar1.Panels[0].Text := 'Connection count: ' + IntToStr(m_connect_list.Count); end; end;
FD_CLOSE : begin if m_connect_list.IndexOf( Pointer(Msg.WParam) ) > -1 then begin m_connect_list.Remove( Pointer(Msg.WParam) ); StatusBar1.Panels[0].Text := 'Connection count: ' + IntToStr(m_connect_list.Count); end; closesocket( Msg.WParam ); end;
FD_READ : RecvBuf( Msg.WParam ); FD_WRITE : SendBuf( Msg.WParam ); end; //case... end;
procedure TfmMain.SendBuf( hsock: TSocket ); begin {/* 只有在三种条件下,才会发出FD_WRITE通知: ■使用connect或WSAConnect ,一个套接字首次建立了连接。 ■使用accept或WSAAccept,套接字被接受以后。 ■若send、WSASend、sendto或WSASendTo操作失败,返回了WSAEWOULDBLOCK错误, 而且缓冲区的空间变得可用 因此,作为一个应用程序,自收到首条FD_WRITE消息开始,便应认为自己必然能在一 个套接字上发出数据,直至一个send、WSASend、sendto或WSASendTo返回套接字错误 WSAEWOULDBLOCK。经过了这样的失败以后,要再用另一条FD_WRITE通知应用程序再次 送数据。 也就是说,不要关心FD_WRITE消息,尽管send,直到出现WSAEWOULDBLOCK错误! */} end;
procedure TfmMain.RecvBuf( hsock: TSocket ); var buf : Array [0..4095] of Char; adr : TSockAddrIn; len : Integer; s : String; begin FillChar( buf[0], 4096, 0 ); recv( hsock, buf[0], 4096, 0 );
len := sizeof(adr); getpeername( hsock, adr, len ); s := inet_ntoa( adr.sin_addr ); s := 'IP: ' + s + ' Port: ' + IntToStr(ntohs(adr.sin_port)) + ' Msg: '; ListBox1.Items.Add( s + buf ); end;
procedure TfmMain.FormCreate(Sender: TObject); var wsa : TWSAData; begin if WSAStartup( $0202, wsa ) <> 0 then //WSAStartup returns zero if successful. begin MessageBox( 0, 'WSAStartup failed', 'Error', MB_ICONERROR ); btnStart.Enabled := False; btnStop.Enabled := False; end;
btnStart.Enabled := True; btnStop.Enabled := False;
m_connect_list := TList.Create; end;
procedure TfmMain.FormClose(Sender: TObject; var Action: TCloseAction); var i : Integer; begin shutdown( m_sock, SD_BOTH ); closesocket( m_sock );
//结束所有维护客户端连接的线程 if m_connect_list.Count > 0 then for i:=0 to m_connect_list.Count-1 do begin shutdown( TSocket(m_connect_list.Items[i]), SD_BOTH ); closesocket( TSocket(m_connect_list.Items[i]) ); end;
m_connect_list.Free;
WSACleanup(); end;
procedure TfmMain.btnStartClick(Sender: TObject); var addr : TSockAddr; ret : Integer; begin m_sock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); if m_sock = INVALID_SOCKET then begin MessageBox( 0, 'Call socket() failed.', 'Error', MB_ICONERROR ); Exit; end;
addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
if bind( m_sock, @addr, sizeof(SOCKADDR) ) = SOCKET_ERROR then begin MessageBox( 0, 'Call bind failed.', 'Error', MB_ICONERROR ); Exit; end;
ret := WSAAsyncSelect( m_sock, Handle, WM_SOCKET, FD_ACCEPT or FD_CLOSE ); if ret = SOCKET_ERROR then begin MessageBox( 0, 'Call WSAAsyncSelect failed.', 'Error', MB_ICONERROR ); Exit; end;
if listen( m_sock, 5 ) = SOCKET_ERROR then begin MessageBox( 0, 'Call listen failed.', 'Error', MB_ICONERROR ); Exit; end;
btnStart.Enabled := False; btnStop.Enabled := True; end;
procedure TfmMain.btnStopClick(Sender: TObject); var i : Integer; begin shutdown( m_sock, SD_BOTH ); closesocket( m_sock );
//结束所有维护客户端连接的线程 if m_connect_list.Count > 0 then for i:=0 to m_connect_list.Count-1 do begin shutdown( TSocket(m_connect_list.Items[i]), SD_BOTH ); closesocket( TSocket(m_connect_list.Items[i]) ); end;
m_connect_list.Clear;
btnStart.Enabled := True; btnStop.Enabled := False; end;
end.
|
三:WSAEventSelect模型
后来,微软的信箱非常畅销,购买微软信箱的人以百万计数......以至于盖茨每天24小时给客户打电话,累得腰酸背痛,喝蚁力神都不好使~~~~~~
微软改进了他们的信箱:在客户的家中添加一个附加装置,这个装置会监视客户的信箱,每当新的信件来临,此装置会发出“新信件到达”声,提醒老陈去收信。盖茨终于可以睡觉了。
同样要使用线程:
procedure TListenThread.Execute;
var
hEvent : WSAEvent;
ret : Integer;
ne : TWSANetworkEvents;
sock : TSocket;
adr : TSockAddrIn;
sMsg : String;
Index,
EventTotal : DWORD;
EventArray : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of WSAEVENT;
begin
...socket...bind...
hEvent := WSACreateEvent();
WSAEventSelect( ListenSock, hEvent, FD_ACCEPT or FD_CLOSE );
...listen...
while ( not Terminated ) do
begin
Index := WSAWaitForMultipleEvents( EventTotal, @EventArray[0], FALSE, WSA_INFINITE, FALSE );
FillChar( ne, sizeof(ne), 0 );
WSAEnumNetworkEvents( SockArray[Index-WSA_WAIT_EVENT_0], EventArray[Index-WSA_WAIT_EVENT_0], @ne );
if ( ne.lNetworkEvents and FD_ACCEPT ) > 0 then
begin
if ne.iErrorCode[FD_ACCEPT_BIT] <> 0 then
continue;
ret := sizeof(adr);
sock := accept( SockArray[Index-WSA_WAIT_EVENT_0], adr, ret );
if EventTotal > WSA_MAXIMUM_WAIT_EVENTS-1 then//这里WSA_MAXIMUM_WAIT_EVENTS同样是64
begin
closesocket( sock );
continue;
end;
hEvent := WSACreateEvent();
WSAEventSelect( sock, hEvent, FD_READ or FD_WRITE or FD_CLOSE );
SockArray[EventTotal] := sock;
EventArray[EventTotal] := hEvent;
Inc( EventTotal );
end;
if ( ne.lNetworkEvents and FD_READ ) > 0 then
begin
if ne.iErrorCode[FD_READ_BIT] <> 0 then
continue;
FillChar( RecvBuf[0], PACK_SIZE_RECEIVE, 0 );
ret := recv( SockArray[Index-WSA_WAIT_EVENT_0], RecvBuf[0], PACK_SIZE_RECEIVE, 0 );
......
end;
end;
end;
WSAEventSelect模型
看来大家不感兴趣啊呵呵没有信心了把代码贴完拉倒。。。。。。
unit frmMain;
interface
uses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, Winsock2, StdCtrls, ComCtrls, thrEvent;
const LISTEN_PORT = 5005;
type TfmMain = class(TForm) btnStart: TButton; btnStop: TButton; ListBox1: TListBox; StatusBar1: TStatusBar;
procedure FormCreate(Sender: TObject); procedure FormClose(Sender: TObject; var Action: TCloseAction); procedure btnStartClick(Sender: TObject); procedure btnStopClick(Sender: TObject); private { Private declarations } public { Public declarations } EventThread : TEventThread; end;
var fmMain : TfmMain;
implementation
{$R *.dfm}
procedure TfmMain.FormCreate(Sender: TObject); var wsa : TWSAData; begin if WSAStartup( $0202, wsa ) <> 0 then //WSAStartup returns zero if successful. begin MessageBox( 0, 'WSAStartup failed', 'Error', MB_ICONERROR ); btnStart.Enabled := False; btnStop.Enabled := False; end;
btnStart.Enabled := True; btnStop.Enabled := False; end;
procedure TfmMain.FormClose(Sender: TObject; var Action: TCloseAction); begin WSACleanup(); end;
procedure TfmMain.btnStartClick(Sender: TObject); begin EventThread := TEventThread.Create( True ); EventThread.FreeOnTerminate := True; EventThread.OnTerminate := EventThread.WhileTerminate; EventThread.Resume;
btnStart.Enabled := False; btnStop.Enabled := True; end;
procedure TfmMain.btnStopClick(Sender: TObject); begin EventThread.Terminate; btnStart.Enabled := True; btnStop.Enabled := False; end;
end. //--------------------------------------------------------------------------------------
unit thrEvent;
interface
uses Windows, SysUtils, Classes, Winsock2;
const PACK_SIZE_RECEIVE = 4096;
type TEventThread = class(TThread) public procedure WhileTerminate(Sender: TObject); private ListenSock : TSocket; SockArray : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of TSocket; EventArray : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of WSAEVENT; EventTotal : DWORD; Index : DWORD; RecvBuf : Array [0..PACK_SIZE_RECEIVE-1] of Char;
procedure InitSock; procedure CompressArray(idx: DWORD); protected procedure Execute; override; end;
implementation
uses frmMain;
{ TEventThread }
procedure TEventThread.Execute; var hEvent : WSAEvent; ret : Integer; ne : TWSANetworkEvents; sock : TSocket; adr : TSockAddrIn; sMsg : String; begin InitSock(); if EventTotal = 0 then Exit;
while ( not Terminated ) do begin Index := WSAWaitForMultipleEvents( EventTotal, @EventArray[0], FALSE, WSA_INFINITE, FALSE ); if Index = WSA_WAIT_FAILED then begin MessageBox( 0,'Call WSAWaitForMultipleEvents failed.','Error',MB_ICONERROR ); Exit; end;
FillChar( ne, sizeof(ne), 0 ); WSAEnumNetworkEvents( SockArray[Index-WSA_WAIT_EVENT_0], EventArray[Index-WSA_WAIT_EVENT_0], @ne );
if ( ne.lNetworkEvents and FD_ACCEPT ) > 0 then begin if ne.iErrorCode[FD_ACCEPT_BIT] <> 0 then continue;
ret := sizeof(adr); sock := accept( SockArray[Index-WSA_WAIT_EVENT_0], adr, ret ); if EventTotal > WSA_MAXIMUM_WAIT_EVENTS-1 then begin closesocket( sock ); continue; end;
hEvent := WSACreateEvent(); WSAEventSelect( sock, hEvent, FD_READ or FD_WRITE or FD_CLOSE ); SockArray[EventTotal] := sock; EventArray[EventTotal] := hEvent; Inc( EventTotal );
fmMain.StatusBar1.Panels[0].Text := 'Connection: ' +IntToStr(EventTotal-1); end;
if ( ne.lNetworkEvents and FD_READ ) > 0 then begin if ne.iErrorCode[FD_READ_BIT] <> 0 then continue;
FillChar( RecvBuf[0], PACK_SIZE_RECEIVE, 0 ); ret := recv( SockArray[Index-WSA_WAIT_EVENT_0], RecvBuf[0], PACK_SIZE_RECEIVE, 0 ); if (ret=0) or (ret=SOCKET_ERROR) then continue;
ret := sizeof(adr); getpeername( SockArray[Index-WSA_WAIT_EVENT_0], adr, ret ); sMsg := inet_ntoa( adr.sin_addr ); sMsg := 'IP: ' +sMsg +' Port: ' +IntToStr(ntohs(adr.sin_port)) +' Msg: '; fmMain.ListBox1.Items.Add( sMsg + RecvBuf ); end; { if ( ne.lNetworkEvents and FD_WRITE ) > 0 then begin if ne.iErrorCode[FD_WRITE_BIT] <> 0 then continue;
... end; }
if ( ne.lNetworkEvents and FD_CLOSE ) > 0 then begin if ne.iErrorCode[FD_CLOSE_BIT] <> 0 then continue;
WSACloseEvent( EventArray[Index-WSA_WAIT_EVENT_0] ); closesocket( SockArray[Index-WSA_WAIT_EVENT_0] ); CompressArray( Index-WSA_WAIT_EVENT_0 );
fmMain.StatusBar1.Panels[0].Text := 'Connection: ' +IntToStr(EventTotal-1); end; end; end;
procedure TEventThread.InitSock; var addr : TSockAddr; hEvent : WSAEvent; begin EventTotal := 0; ListenSock := INVALID_SOCKET;
ListenSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); if ListenSock = INVALID_SOCKET then begin MessageBox( 0, 'Call socket() failed.', 'Error', MB_ICONERROR ); Exit; end;
addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
if bind( ListenSock, @addr, sizeof(SOCKADDR) ) = SOCKET_ERROR then begin MessageBox( 0, 'Call bind failed.', 'Error', MB_ICONERROR ); Exit; end;
hEvent := WSACreateEvent(); if hEvent = WSA_INVALID_EVENT then begin MessageBox( 0, 'Call WSACreateEvent failed.', 'Error', MB_ICONERROR ); Exit; end;
if WSAEventSelect( ListenSock,hEvent,FD_ACCEPT or FD_CLOSE )=SOCKET_ERROR then begin MessageBox( 0, 'Call WSAEventSelect failed.', 'Error', MB_ICONERROR ); Exit; end;
if listen( ListenSock, 5 ) = SOCKET_ERROR then begin MessageBox( 0, 'Call listen failed.', 'Error', MB_ICONERROR ); Exit; end;
SockArray[EventTotal] := ListenSock; EventArray[EventTotal] := hEvent; Inc( EventTotal ); end;
procedure TEventThread.CompressArray(idx: DWORD); var i : Integer; begin if idx = EventTotal-1 then begin Dec( EventTotal ); Exit; end;
for i:=idx to EventTotal-2 do begin SockArray[i] := SockArray[i+1]; EventArray[i] := EventArray[i+1]; end; Dec( EventTotal ); end;
procedure TEventThread.WhileTerminate(Sender: TObject); var i : Integer; begin if EventTotal > 0 then begin for i:=0 to EventTotal-1 do begin WSACloseEvent( EventArray[i] ); shutdown( SockArray[i], SD_BOTH ); closesocket( SockArray[i] ); end; end; end;
end.
|
四:Overlapped I/O 事件通知模型
后来,微软通过调查发现,老陈不喜欢上下楼收发信件,因为上下楼其实很浪费时间。于是微软再次改进他们的信箱。新式的信箱采用了更为先进的技术,只要用户告诉微软自己的家在几楼几号,新式信箱会把信件直接传送到用户的家中,然后告诉用户,你的信件已经放到你的家中了!老陈很高兴,因为他不必再亲自收发信件了!
Overlapped I/O 事件通知模型和WSAEventSelect模型在实现上非常相似,主要区别在“Overlapped”,Overlapped模型是让应用程序使用重叠数据结构(WSAOVERLAPPED),一次投递一个或多个Winsock I/O请求。这些提交的请求完成后,应用程序会收到通知。什么意思呢?就是说,如果你想从socket上接收数据,只需要告诉系统,由系统为你接收数据,而你需要做的只是为系统提供一个缓冲区~~~~~
Listen线程和WSAEventSelect模型一模一样,Recv/Send线程则完全不同:
procedure TOverlapThread.Execute;
var
dwTemp : DWORD;
ret : Integer;
Index : DWORD;
begin
......
while ( not Terminated ) do
begin
Index := WSAWaitForMultipleEvents( FLinks.Count, @FLinks.Events[0], FALSE, RECV_TIME_OUT, FALSE );
Dec( Index, WSA_WAIT_EVENT_0 );
if Index > WSA_MAXIMUM_WAIT_EVENTS-1 then //超时或者其他错误
continue;
WSAResetEvent( FLinks.Events[Index] );
WSAGetOverlappedResult( FLinks.Sockets[Index], FLinks.pOverlaps[Index], @dwTemp, FALSE, FLinks.pdwFlags[Index]^ );
if dwTemp = 0 then //连接已经关闭
begin
......
continue;
end else
begin
fmMain.ListBox1.Items.Add( FLinks.pBufs[Index]^.buf );
end;
//初始化缓冲区
FLinks.pdwFlags[Index]^ := 0;
FillChar( FLinks.pOverlaps[Index]^, sizeof(WSAOVERLAPPED), 0 );
FLinks.pOverlaps[Index]^.hEvent := FLinks.Events[Index];
FillChar( FLinks.pBufs[Index]^.buf^, BUFFER_SIZE, 0 );
//递一个接收数据请求
WSARecv( FLinks.Sockets[Index], FLinks.pBufs[Index], 1, FLinks.pdwRecvd[Index]^, FLinks.pdwFlags[Index]^, FLinks.pOverlaps[Index], nil );
end;
end;
Overlapped I/O 事件通知
unit thrAccept;
interface
uses Windows, SysUtils, Classes, Winsock2, thrOverlap;
type TEventThread = class(TThread) private FListenSock : TSocket; FListenEvent : WSAEVENT; FRWThread : TOverlapThread; protected procedure Execute; override; function InitSock():BOOL; procedure FreeResource; end;
var gCS1 : TRTLCriticalSection; //临界区,保证线程安全 gSockTotal : DWORD; gSockArray : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of TSocket;
implementation
uses frmMain;
{ TEventThread }
procedure TEventThread.Execute; var ret : Integer; ne : TWSANetworkEvents; sock : TSocket; adr : TSockAddrIn; begin if not InitSock() then Exit;
InitializeCriticalSection( gCS1 ); gSockTotal := 0;
FRWThread := TOverlapThread.Create( True ); FRWThread.FreeOnTerminate := True; FRWThread.Resume;
while ( not Terminated ) do begin WSAWaitForMultipleEvents( 1, @FListenEvent, FALSE, ACCEPT_TIME_OUT, FALSE ); FillChar( ne, sizeof(ne), 0 ); WSAEnumNetworkEvents( FListenSock, FListenEvent, @ne );
if ( ne.lNetworkEvents and FD_ACCEPT ) > 0 then begin if ne.iErrorCode[FD_ACCEPT_BIT] <> 0 then continue; ret := sizeof(adr); sock := accept( FListenSock, adr, ret ); if sock = INVALID_SOCKET then continue; EnterCriticalSection( gCS1 ); ret := gSockTotal; LeaveCriticalSection( gCS1 );
if ret > WSA_MAXIMUM_WAIT_EVENTS-1 then begin closesocket( sock ); continue; end;
EnterCriticalSection( gCS1 ); gSockArray[gSockTotal] := sock; Inc( gSockTotal ); ret := gSockTotal; LeaveCriticalSection( gCS1 );
fmMain.StatusBar1.Panels[0].Text := 'Connection: ' + IntToStr(ret); end;
//不关心其他事件。虽然客户端断开连接会ne.lNetworkEvents==0,但是鉴于本线程 //仅仅负责accept,所以不响应其他事件。 end;
FreeResource; end;
function TEventThread.InitSock: BOOL; var addr : TSockAddr; begin result := False;
FListenSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
bind( FListenSock, @addr, sizeof(SOCKADDR) ); FListenEvent := WSACreateEvent(); WSAEventSelect( FListenSock, FListenEvent, FD_ACCEPT ); listen( FListenSock, 5 );
result := True; end;
procedure TEventThread.FreeResource; begin FRWThread.Terminate;
DeleteCriticalSection( gCS1 );
closesocket( FListenSock ); WSACloseEvent( FListenEvent ); end;
end. --------------------------------------------------------------------------------- unit thrOverlap;
interface
uses Windows, SysUtils, Classes, Winsock2;
const BUFFER_SIZE = 4096; ACCEPT_TIME_OUT = 550; RECV_TIME_OUT = 550;
type YOverlappedSockets = record Count : DWORD; Sockets : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of TSocket; Events : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of WSAEVENT; pOverlaps : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of PWSAOVERLAPPED; pBufs : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of PWSABUF; pdwRecvd : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of PDWORD; pdwFlags : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of PDWORD; end;
type TOverlapThread = class(TThread) private FLinks : YOverlappedSockets; protected procedure Execute; override; procedure CompressArray(idx: DWORD); procedure DoNewConnection(dwCount: DWORD); procedure FreeResource; end;
implementation
uses thrAccept, frmMain;
{ TOverlapThread }
procedure TOverlapThread.Execute; var dwTemp : DWORD; ret : Integer; Index : DWORD; begin for ret:=0 to WSA_MAXIMUM_WAIT_EVENTS-1 do begin New( FLinks.pdwRecvd[ret] ); FLinks.pdwRecvd[ret]^ := 0; New( FLinks.pdwFlags[ret] ); FLinks.pdwFlags[ret]^ := 0; New( FLinks.pOverlaps[ret] ); New( FLinks.pBufs[ret] );
FLinks.pBufs[ret]^.len := BUFFER_SIZE; FLinks.pBufs[ret]^.buf := AllocMem( BUFFER_SIZE ); end;
while ( not Terminated ) do begin EnterCriticalSection( gCS1 ); dwTemp := gSockTotal; //得到连接数量 LeaveCriticalSection( gCS1 );
if dwTemp = 0 then //没有客户连接 dwTemp==FLinks.Count说明没有新的连接 continue; //dwTemp < FLinks.Count --- 没有这种可能性
if dwTemp > FLinks.Count then //Accept线程接受了新的连接 DoNewConnection( dwTemp );
Index := WSAWaitForMultipleEvents( FLinks.Count, @FLinks.Events[0], FALSE, RECV_TIME_OUT, FALSE ); Dec( Index, WSA_WAIT_EVENT_0 ); if Index > WSA_MAXIMUM_WAIT_EVENTS-1 then //超时或者其他错误 continue;
WSAResetEvent( FLinks.Events[Index] ); WSAGetOverlappedResult( FLinks.Sockets[Index], FLinks.pOverlaps[Index], @dwTemp, FALSE, FLinks.pdwFlags[Index]^ );
if dwTemp = 0 then //连接已经关闭 begin closesocket( FLinks.Sockets[Index] ); WSACloseEvent( FLinks.Events[Index] ); CompressArray( Index ); fmMain.StatusBar1.Panels[0].Text := 'Connection: '+IntToStr(FLinks.Count); continue; end else begin fmMain.ListBox1.Items.Add( FLinks.pBufs[Index]^.buf ); end;
FLinks.pdwFlags[Index]^ := 0; FillChar( FLinks.pOverlaps[Index]^, sizeof(WSAOVERLAPPED), 0 ); FLinks.pOverlaps[Index]^.hEvent := FLinks.Events[Index]; FillChar( FLinks.pBufs[Index]^.buf^, BUFFER_SIZE, 0 ); WSARecv( FLinks.Sockets[Index], FLinks.pBufs[Index], 1, FLinks.pdwRecvd[Index]^, FLinks.pdwFlags[Index]^, FLinks.pOverlaps[Index], nil ); end;
FreeResource; end;
procedure TOverlapThread.CompressArray(idx: DWORD); var i : Integer; p1,p2,p3,p4 : Pointer; begin EnterCriticalSection( gCS1 ); if idx = gSockTotal-1 then begin Dec( gSockTotal ); end else begin for i:=idx to gSockTotal-2 do gSockArray[i] := gSockArray[i+1]; Dec( gSockTotal ); end; LeaveCriticalSection( gCS1 );
if idx = FLinks.Count-1 then begin Dec( FLinks.Count ); Exit; end else begin p1 := FLinks.pOverlaps[idx]; p2 := FLinks.pBufs[idx]; p3 := FLinks.pdwRecvd[idx]; p4 := FLinks.pdwFlags[idx];
for i:=idx to FLinks.Count-2 do begin FLinks.Sockets[i] := FLinks.Sockets[i+1]; FLinks.Events[i] := FLinks.Events[i+1]; FLinks.pOverlaps[i] := FLinks.pOverlaps[i+1]; FLinks.pBufs[i] := FLinks.pBufs[i+1]; FLinks.pdwRecvd[i] := FLinks.pdwRecvd[i+1]; FLinks.pdwFlags[i] := FLinks.pdwFlags[i+1]; end;
FLinks.pOverlaps[FLinks.Count-1] := p1; FLinks.pBufs[FLinks.Count-1] := p2; FLinks.pdwRecvd[FLinks.Count-1] := p3; FLinks.pdwFlags[FLinks.Count-1] := p4; Dec( FLinks.Count ); end; end;
procedure TOverlapThread.DoNewConnection(dwCount: DWORD); var ret : Integer; begin EnterCriticalSection( gCS1 ); for ret:=dwCount-1 downto FLinks.Count do FLinks.Sockets[ret] := gSockArray[ret]; LeaveCriticalSection( gCS1 );
for ret:=dwCount-1 downto FLinks.Count do begin FLinks.Events[ret] := WSACreateEvent(); FillChar( FLinks.pOverlaps[ret]^, sizeof(WSAOVERLAPPED), 0 ); FLinks.pOverlaps[ret]^.hEvent := FLinks.Events[ret]; WSARecv( FLinks.Sockets[ret], FLinks.pBufs[ret], 1, FLinks.pdwRecvd[ret]^, FLinks.pdwFlags[ret]^, FLinks.pOverlaps[ret], nil ); end;
FLinks.Count := dwCount; end;
procedure TOverlapThread.FreeResource; var i : Integer; begin if FLinks.Count > 0 then begin for i:=0 to FLinks.Count-1 do begin closesocket( FLinks.Sockets[i] ); WSACloseEvent( FLinks.Events[i] ); end; end;
for i:=0 to WSA_MAXIMUM_WAIT_EVENTS-1 do begin FreeMem( FLinks.pBufs[i]^.buf ); Dispose( FLinks.pdwRecvd[i] ); Dispose( FLinks.pdwFlags[i] ); Dispose( FLinks.pOverlaps[i] ); Dispose( FLinks.pBufs[i] ); end; end;
end.
|
五:Overlapped I/O 完成例程模型
老陈接收到新的信件后,一般的程序是:打开信封----掏出信纸----阅读信件----回复信件......为了进一步减轻用户负担,微软又开发了一种新的技术:用户只要告诉微软对信件的操作步骤,微软信箱将按照这些步骤去处理信件,不再需要用户亲自拆信/阅读/回复了!老陈终于过上了小资生活!
Overlapped I/O 完成例程要求用户提供一个回调函数,发生新的网络事件的时候系统将执行这个函数:
procedure WorkerRoutine( const dwError, cbTransferred : DWORD; const
lpOverlapped : LPWSAOVERLAPPED; const dwFlags : DWORD ); stdcall;
然后告诉系统用WorkerRoutine函数处理接收到的数据:
WSARecv( m_socket, @FBuf, 1, dwTemp, dwFlag, @m_overlap, WorkerRoutine );
然后......没有什么然后了,系统什么都给你做了!微软真实体贴!
while ( not Terminated ) do//这就是一个Recv/Send线程要做的事情......什么都不用做啊!!!
begin
if SleepEx( RECV_TIME_OUT, True ) = WAIT_IO_COMPLETION then //
begin
;
end else
begin
continue;
end;
end;
Overlapped I/O 完成例程
据说,“重叠I / O (Overlapped I/O )模型使应用程序能达到更佳的系统性能。”,不过性能到底“更佳”了多少,没有做过测试,不清楚。。。道理网上有很多,不讲了,还是直接贴代码。。。
unit thrAccept;
interface
uses Windows, SysUtils, Classes, Winsock2, thrOverlap;
type TEventThread = class(TThread) private FListenSock : TSocket; FListenEvent : WSAEVENT; FRWThread : TOverlapThread; protected procedure Execute; override; function InitSock: BOOL; procedure FreeResource; end;
implementation
uses frmMain;
{ TEventThread }
procedure TEventThread.Execute; var ret : Integer; ne : TWSANetworkEvents; sock : TSocket; adr : TSockAddrIn; begin if not InitSock() then Exit;
FRWThread := TOverlapThread.Create( True ); FRWThread.FreeOnTerminate := True; FRWThread.Resume;
while ( not Terminated ) do begin WSAWaitForMultipleEvents( 1, @FListenEvent, FALSE, ACCEPT_TIME_OUT, FALSE );
FillChar( ne, sizeof(ne), 0 ); WSAEnumNetworkEvents( FListenSock, FListenEvent, @ne ); //此函数使FListenEvent自动成为“未传信”状态. 不再需要使用WSAResetEvent
if ( ne.lNetworkEvents and FD_ACCEPT ) > 0 then begin if ne.iErrorCode[FD_ACCEPT_BIT] <> 0 then continue;
ret := sizeof(adr); sock := accept( FListenSock, adr, ret ); if sock = INVALID_SOCKET then continue;
//fmMain.StatusBar1.Panels[0].Text := 'Connection: ' + IntToStr(gSockTotal); end;
//不关心其他事件。虽然客户端断开连接会ne.lNetworkEvents==0,但是鉴于本线程 //仅仅负责accept,所以不响应其他事件。 end;
FreeResource; end;
function TEventThread.InitSock: BOOL; var addr : TSockAddr; begin result := False;
FListenSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
bind( FListenSock, @addr, sizeof(SOCKADDR) ); FListenEvent := WSACreateEvent(); WSAEventSelect( FListenSock, FListenEvent, FD_ACCEPT ); listen( FListenSock, 5 ); result := True; end;
procedure TEventThread.FreeResource; begin closesocket( FListenSock ); WSACloseEvent( FListenEvent ); end;
end.
unit thrOverlap;
interface
uses Windows, SysUtils, Classes, Winsock2;
const BUFFER_SIZE = 4096; ACCEPT_TIME_OUT = 550; RECV_TIME_OUT = 550;
type TOverlapThread = class(TThread) private FBuf : WSABUF; public m_socket : TSocket; m_overlap : WSAOVERLAPPED; protected procedure Execute; override; end;
procedure WorkerRoutine( const dwError, cbTransferred : DWORD; const lpOverlapped : LPWSAOVERLAPPED; const dwFlags : DWORD ); stdcall;
implementation
uses frmMain;
{ TOverlapThread }
procedure TOverlapThread.Execute; var dwTemp, dwFlag : DWORD; begin FBuf.len := BUFFER_SIZE; FBuf.buf := AllocMem( BUFFER_SIZE );
dwFlag := 0; FillChar( m_overlap, sizeof(WSAOVERLAPPED), 0 ); m_overlap.hEvent := DWORD(self);{If lpCompletionRoutine is not NULL, the hEvent field is ignored and can be used by the application to pass context information to the completion routine.} WSARecv( m_socket, @FBuf, 1, dwTemp, dwFlag, @m_overlap, WorkerRoutine );
while ( not Terminated ) do begin if SleepEx( RECV_TIME_OUT, True ) = WAIT_IO_COMPLETION then // begin ; end else begin continue; end; end; end;
procedure WorkerRoutine( const dwError, cbTransferred : DWORD; const lpOverlapped : LPWSAOVERLAPPED; const dwFlags : DWORD ); var dwTemp, Flags : DWORD; begin if ( dwError <> 0 ) or ( cbTransferred = 0 ) then begin closesocket( TOverlapThread(lpOverlapped^.hEvent).m_socket ); Exit; end;
fmMain.ListBox1.Items.Add( TOverlapThread(lpOverlapped^.hEvent).FBuf.buf ); FillChar( TOverlapThread(lpOverlapped^.hEvent).FBuf.buf^, BUFFER_SIZE, 0 );
Flags := 0; FillChar( lpOverlapped^, sizeof(WSAOVERLAPPED), 0 );
if WSARecv( TOverlapThread(lpOverlapped^.hEvent).m_socket, @(TOverlapThread(lpOverlapped^.hEvent)).FBuf, 1, dwTemp, Flags, @(TOverlapThread(lpOverlapped^.hEvent)).m_overlap, WorkerRoutine ) = SOCKET_ERROR then begin ; end; end;
end. |
六:IOCP模型
微软信箱似乎很完美,老陈也很满意。但是在一些大公司情况却完全不同!这些大公司有数以万计的信箱,每秒钟都有数以百计的信件需要处理,以至于微软信箱经常因超负荷运转而崩溃!需要重新启动!微软不得不使出杀手锏......
微软给每个大公司派了一名名叫“Completion Port”的超级机器人,让这个机器人去处理那些信件!
“Windows NT小组注意到这些应用程序的性能没有预料的那么高。特别的,处理很多同时的客户请求意味着很多线程并发地运行在系统中。因为所有这些线程都是可运行的[没有被挂起和等待发生什么事],Microsoft意识到NT内核花费了太多的时间来转换运行线程的上下文[Context],线程就没有得到很多CPU时间来做它们的工作。大家可能也都感觉到并行模型的瓶颈在于它为每一个客户请求都创建了一个新线程。创建线程比起创建进程开销要小,但也远不是没有开销的。我们不妨设想一下:如果事先开好N个线程,让它们在那hold[堵塞],然后可以将所有用户的请求都投递到一个消息队列中去。然后那N个线程逐一从消息队列中去取出消息并加以处理。就可以避免针对每一个用户请求都开线程。不仅减少了线程的资源,也提高了线程的利用率。理论上很不错,你想我等泛泛之辈都能想出来的问题,Microsoft又怎会没有考虑到呢?”-----摘自nonocast的《理解I/O Completion Port》
先看一下IOCP模型的实现:
//创建一个完成端口
FCompletPort := CreateIoCompletionPort( INVALID_HANDLE_VALUE, 0,0,0 );
//接受远程连接,并把这个连接的socket句柄绑定到刚才创建的IOCP上
AConnect := accept( FListenSock, addr, len);
CreateIoCompletionPort( AConnect, FCompletPort, nil, 0 );
//创建CPU数*2 + 2个线程
for i:=1 to si.dwNumberOfProcessors*2+2 do
begin
AThread := TRecvSendThread.Create( false );
AThread.CompletPort := FCompletPort;//告诉这个线程,你要去这个IOCP去访问数据
end;
OK,就这么简单,我们要做的就是建立一个IOCP,把远程连接的socket句柄绑定到刚才创建的IOCP上,最后创建n个线程,并告诉这n个线程到这个IOCP上去访问数据就可以了。
再看一下TRecvSendThread线程都干些什么:
procedure TRecvSendThread.Execute;
var
......
begin
while (not self.Terminated) do
begin
//查询IOCP状态(数据读写操作是否完成)
GetQueuedCompletionStatus( CompletPort, BytesTransd, CompletKey, POVERLAPPED(pPerIoDat), TIME_OUT );
if BytesTransd <> 0 then
....;//数据读写操作完成
//再投递一个读数据请求
WSARecv( CompletKey, @(pPerIoDat^.BufData), 1, BytesRecv, Flags, @(pPerIoDat^.Overlap), nil );
end;
end;
读写线程只是简单地检查IOCP是否完成了我们投递的读写操作,如果完成了则再投递一个新的读写请求。
应该注意到,我们创建的所有TRecvSendThread都在访问同一个IOCP(因为我们只创建了一个IOCP),并且我们没有使用临界区!难道不会产生冲突吗?不用考虑同步问题吗?
呵呵,这正是IOCP的奥妙所在。IOCP不是一个普通的对象,不需要考虑线程安全问题。它会自动调配访问它的线程:如果某个socket上有一个线程A正在访问,那么线程B的访问请求会被分配到另外一个socket。这一切都是由系统自动调配的,我们无需过问。
完成端口
unit frmMain;
interface
uses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, Winsock2, StdCtrls, thrListen;
type TfmMain = class(TForm) btnStart: TButton; ListBox1: TListBox; btnStop: TButton; procedure btnStartClick(Sender: TObject); procedure FormCreate(Sender: TObject); procedure FormClose(Sender: TObject; var Action: TCloseAction); procedure btnStopClick(Sender: TObject); private { Private declarations } FListenThread : TListenThread; public { Public declarations } end;
const LISTEN_PORT = 5005;
var fmMain: TfmMain;
implementation
{$R *.dfm}
procedure TfmMain.btnStartClick(Sender: TObject); begin FListenThread := TListenThread.Create( true ); FListenThread.FreeOnTerminate := true; FListenThread.Resume;
btnStop.Enabled := true; btnStart.Enabled := false; end;
procedure TfmMain.btnStopClick(Sender: TObject); begin FListenThread.terminate; btnStop.Enabled := false; btnStart.Enabled := true; end;
procedure TfmMain.FormCreate(Sender: TObject); var wsa : TWSAData; begin if WSAStartup( $0202, wsa ) <> 0 then //WSAStartup returns zero if successful. begin MessageBox( 0, 'WSAStartup failed', 'Error', MB_ICONERROR ); btnStart.Enabled := False; btnStop.Enabled := False; end;
btnStart.Enabled := true; btnStop.Enabled := false; end;
procedure TfmMain.FormClose(Sender: TObject; var Action: TCloseAction); begin WSACleanup(); end;
end. //---------------------------------------------------------------------
unit thrListen;
interface
uses Windows, Classes, Winsock2;
const RECV_POSTED = 0; SEND_POSTED = 1; TIME_OUT = 110; BUFFER_SIZE = 4096;
type YPER_OPERATION_DATA = record Overlap : OVERLAPPED; BufData : WSABUF; Buf : Array [0..BUFFER_SIZE-1] of Char; OprtType : Integer; end; PPER_OPERATION_DATA = ^YPER_OPERATION_DATA;
YPER_HANDLE_DATA = record Sock : TSocket; Ip : Array [0..15] of Char; Port : DWORD; OprtType : Integer; end; PPER_HANDLE_DATA = ^YPER_HANDLE_DATA;
type TListenThread = class(TThread) private { Private declarations } FCompletPort : THandle; FListenSock : TSocket; function InitSocket: BOOL; protected procedure Execute; override; end;
function WorkerThread( CompletPortID: Pointer ): DWORD; stdcall;
implementation
uses frmMain;
{ TListenThread }
procedure TListenThread.Execute; var si : SYSTEM_INFO; i : Integer; hThread : THandle; ThreadID : DWORD; AConnect : TSocket; addr : TSockAddrIn; len : Integer; BytesRecv, Flags : DWORD; pPerIoDat : PPER_OPERATION_DATA; begin FCompletPort := CreateIoCompletionPort( INVALID_HANDLE_VALUE, 0,0,0 ); if FCompletPort = 0 then begin MessageBox( 0, 'CreateIoCompletionPort failed.', 'Error', MB_OK ); Exit; end;
GetSystemInfo( si ); for i:=0 to si.dwNumberOfProcessors-1 do begin hThread := CreateThread( nil,0,@WorkerThread,Pointer(FCompletPort),0,ThreadID ); CloseHandle( hThread ); end;
if not InitSocket() then Exit;
while (not self.Terminated) do begin len := sizeof(addr); AConnect := accept( FListenSock, addr, len); if AConnect = INVALID_SOCKET then begin sleepex( 110, false ); continue; end;
CreateIoCompletionPort( AConnect, FCompletPort, AConnect, 0 );
New( pPerIoDat );
FillChar( pPerIoDat^.Overlap, sizeof(OVERLAPPED), 0 ); FillChar( pPerIoDat^.Buf[0], BUFFER_SIZE, 0 ); pPerIoDat^.BufData.len := BUFFER_SIZE; pPerIoDat^.BufData.buf := pPerIoDat^.Buf; pPerIoDat.OprtType := RECV_POSTED;
Flags := 0; WSARecv( AConnect, @(pPerIoDat^.BufData), 1, BytesRecv, Flags, @(pPerIoDat^.Overlap), nil ); end;
PostQueuedCompletionStatus( FCompletPort, 0,0,nil ); CloseHandle( FCompletPort ); end;
function TListenThread.InitSocket: BOOL; var addr : TSockAddr; begin result := False;
FListenSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); if FListenSock = INVALID_SOCKET then begin MessageBox( 0, 'Call socket() failed.', 'Error', MB_ICONERROR ); Exit; end;
addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
if bind( FListenSock, @addr, sizeof(SOCKADDR) ) = SOCKET_ERROR then begin MessageBox( 0, 'Call bind failed.', 'Error', MB_ICONERROR ); Exit; end;
if listen( FListenSock, 5 ) = SOCKET_ERROR then begin MessageBox( 0, 'Call listen failed.', 'Error', MB_ICONERROR ); Exit; end;
result := True; end;
function WorkerThread( CompletPortID: Pointer ): DWORD; var CompletPort : THandle; CompletKey, BytesTransd, BytesSend, BytesRecv, Flags : DWORD; pPerIoDat : PPER_OPERATION_DATA; begin CompletPort := DWORD(CompletPortID);
while True do begin BytesTransd:=0;CompletKey:=0; GetQueuedCompletionStatus( CompletPort, BytesTransd, CompletKey, POVERLAPPED(pPerIoDat), 550 );
if ( BytesTransd = 0 ) and ( (pPerIoDat=nil )or(pPerIoDat^.OprtType = RECV_POSTED)or (pPerIoDat^.OprtType = SEND_POSTED) ) then begin closesocket( CompletKey ); Dispose( pPerIoDat ); continue; end;
if pPerIoDat^.OprtType = RECV_POSTED then begin fmmain.ListBox1.Items.Add( pPerIoDat^.BufData.buf ); end;
Flags := 0; FillChar( pPerIoDat^.Overlap, sizeof(OVERLAPPED), 0 ); FillChar( pPerIoDat^.Buf[0], 4096, 0 ); pPerIoDat^.BufData.len := 4096; pPerIoDat^.BufData.buf := pPerIoDat^.Buf; pPerIoDat.OprtType := RECV_POSTED;
WSARecv( CompletKey, @(pPerIoDat^.BufData), 1, BytesRecv, Flags, @(pPerIoDat^.Overlap), nil ); end;
//closesocket( CompletKey ); //Dispose( pPerIoDat ); end;
end. ---------------------------------------------------------------------------------- 上面的完成端口的例子可能太C++了,换一个更Delphi的:
新建一个线程类TRecvSendThread ------------ thrRecvSend.pas
unit thrRecvSend;
interface
uses Windows, Classes, Winsock2;
type TRecvSendThread = class(TThread) public CompletPort : THandle; protected procedure Execute; override; end;
implementation
uses thrListen, frmMain;
{ TRecvSendThread }
procedure TRecvSendThread.Execute; var CompletKey, BytesTransd, BytesRecv, Flags : DWORD; pPerIoDat : PPER_OPERATION_DATA; begin while (not self.Terminated) do begin BytesTransd := 0; CompletKey := 0; GetQueuedCompletionStatus( CompletPort, BytesTransd, CompletKey, POVERLAPPED(pPerIoDat), TIME_OUT );
if ( BytesTransd = 0 ) and ( (pPerIoDat = nil) or (pPerIoDat^.OprtType = RECV_POSTED) or (pPerIoDat^.OprtType = SEND_POSTED) ) then begin closesocket( CompletKey ); Dispose( pPerIoDat ); continue; end;
if pPerIoDat^.OprtType = RECV_POSTED then begin fmmain.ListBox1.Items.Add( pPerIoDat^.BufData.buf ); end;
Flags := 0; FillChar( pPerIoDat^.Overlap, sizeof(OVERLAPPED), 0 ); FillChar( pPerIoDat^.Buf[0], 4096, 0 ); pPerIoDat^.BufData.len := 4096; pPerIoDat^.BufData.buf := pPerIoDat^.Buf; pPerIoDat.OprtType := RECV_POSTED;
WSARecv( CompletKey, @(pPerIoDat^.BufData), 1, BytesRecv, Flags, @(pPerIoDat^.Overlap), nil ); end;
end;
end.
//------------------------------------------------------------- 原来的TListenThread也稍微修改一下。。。。。。。。。 unit thrListen;
interface
uses Windows, Classes, Winsock2, thrRecvSend;
const RECV_POSTED = 0; SEND_POSTED = 1; TIME_OUT = 110; BUFFER_SIZE = 4096;
type YPER_OPERATION_DATA = record Overlap : OVERLAPPED; BufData : WSABUF; Buf : Array [0..BUFFER_SIZE-1] of Char; OprtType : Integer; end; PPER_OPERATION_DATA = ^YPER_OPERATION_DATA;
YPER_HANDLE_DATA = record Sock : TSocket; Ip : Array [0..15] of Char; Port : DWORD; OprtType : Integer; end; PPER_HANDLE_DATA = ^YPER_HANDLE_DATA;
type TListenThread = class(TThread) private { Private declarations } FCompletPort : THandle; FListenSock : TSocket; function InitSocket: BOOL; protected procedure Execute; override; end;
implementation
uses frmMain;
{ TListenThread }
procedure TListenThread.Execute; var si : SYSTEM_INFO; i, len : Integer; AThread : TRecvSendThread; AConnect : TSocket; addr : TSockAddrIn; BytesRecv, Flags : DWORD; pPerIoDat : PPER_OPERATION_DATA; begin if not InitSocket() then Exit;
FCompletPort := CreateIoCompletionPort( INVALID_HANDLE_VALUE, 0,0,0 ); if FCompletPort = 0 then begin MessageBox( 0, 'CreateIoCompletionPort failed.', 'Error', MB_OK ); Exit; end;
GetSystemInfo( si ); for i:=0 to si.dwNumberOfProcessors-1 do begin AThread := TRecvSendThread.Create( True ); AThread.CompletPort := FCompletPort; AThread.FreeOnTerminate := True; AThread.Resume; end;
while (not self.Terminated) do begin len := sizeof(addr); AConnect := accept( FListenSock, addr, len); if AConnect = INVALID_SOCKET then begin sleepex( TIME_OUT, false ); continue; end;
CreateIoCompletionPort( AConnect, FCompletPort, AConnect, 0 );
New( pPerIoDat );
FillChar( pPerIoDat^.Overlap, sizeof(OVERLAPPED), 0 ); FillChar( pPerIoDat^.Buf[0], BUFFER_SIZE, 0 ); pPerIoDat^.BufData.len := BUFFER_SIZE; pPerIoDat^.BufData.buf := pPerIoDat^.Buf; pPerIoDat.OprtType := RECV_POSTED;
Flags := 0; WSARecv( AConnect, @(pPerIoDat^.BufData), 1, BytesRecv, Flags, @(pPerIoDat^.Overlap), nil ); end;
PostQueuedCompletionStatus( FCompletPort, 0,0,nil ); CloseHandle( FCompletPort ); end;
function TListenThread.InitSocket: BOOL; var addr : TSockAddr; begin result := False;
FListenSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); if FListenSock = INVALID_SOCKET then begin MessageBox( 0, 'Call socket() failed.', 'Error', MB_ICONERROR ); Exit; end;
addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
if bind( FListenSock, @addr, sizeof(SOCKADDR) ) = SOCKET_ERROR then begin MessageBox( 0, 'Call bind failed.', 'Error', MB_ICONERROR ); Exit; end;
if listen( FListenSock, 5 ) = SOCKET_ERROR then begin MessageBox( 0, 'Call listen failed.', 'Error', MB_ICONERROR ); Exit; end;
result := True; end;
end.
呵呵,感觉明了多了~~~~~~~~~~```` |
呵呵,终于写完了,好累......以上所有的源代码可以从这里看到:
http://community.csdn.net/Expert/topic/3844/3844679.xml?temp=5.836123E-02
posted on 2007-08-17 12:10
聂文龙 阅读(1253)
评论(0) 编辑 收藏 引用 所属分类:
net work