1#pragma once
2
3#include <afxmt.h>
4#include <map>
5
6using namespace std;
7
8#define MAX_PENDING_CONNECTIONS 20
9#define MAX_WSAEVENT_NUMS 64
10
11class CEventSocket
12{
13public:
14 CEventSocket(void);
15 virtual ~CEventSocket(void);
16
17 virtual void RecvDataNotidy(UINT64 fromAddr, char *pData, int nDataSize) {}
18 virtual void CloseNotidy(UINT64 fromAddr) {}
19
20 BOOL ConnectTo(long ip, int port, int sockType = SOCK_STREAM, int protocol = IPPROTO_TCP);
21
22 BOOL ListenOn(int port, int sockType = SOCK_STREAM, int protocol = IPPROTO_TCP);
23 void Stop();
24 int SendData(UINT64 toAddr, char *pData, int nDataSize);
25
26 void ClosePeer(UINT64 peerAddr);
27protected:
28 static DWORD WINAPI WkThread(LPVOID lParam);
29
30private:
31 int m_nEventTotal;
32 HANDLE m_hWkThread;
33 HANDLE m_hStopEvent;
34 CCriticalSection cs;
35 map<UINT64, int> m_ClientList;
36 WSAEVENT m_EventArray[MAX_WSAEVENT_NUMS];
37 SOCKET m_SockArray[MAX_WSAEVENT_NUMS];
38};
实现文件
1#include "StdAfx.h"
2#include "EventSocket.h"
3
4CEventSocket::CEventSocket(void)
5{
6 WSADATA wsaData;
7 if( (WSAStartup(MAKEWORD(2,2), &wsaData)) != 0 )
8 TRACE(_T("WSAStartup Failed"));
9
10 m_hStopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
11 m_nEventTotal = 0;
12 m_hWkThread = NULL;
13}
14
15CEventSocket::~CEventSocket(void)
16{
17 Stop();
18 WSACleanup();
19 CloseHandle(m_hStopEvent);
20}
21
22BOOL CEventSocket::ConnectTo(long ip, int port, int sockType, int protocol)
23{
24 SOCKADDR_IN skAddr;
25
26 skAddr.sin_family = AF_INET;
27 skAddr.sin_addr.s_addr = ip;
28 skAddr.sin_port = htons(port);
29
30 SOCKET sk = WSASocket(AF_INET, sockType, protocol, NULL, 0, WSA_FLAG_OVERLAPPED);
31
32 if (sk == INVALID_SOCKET)
33 {
34 TRACE(_T("WSASocket Failed"));
35 return FALSE;
36 }
37
38 //1.先将socket设为非堵塞
39 unsigned long lBlocked = 1;
40 ioctlsocket(sk,FIONBIO ,&lBlocked);
41 //2.设置连接超时
42 struct timeval Timeout = {1,0};
43 fd_set writefds;
44 FD_ZERO(&writefds);
45 FD_SET(sk,&writefds);
46
47 connect(sk,(const sockaddr*)&skAddr,sizeof(sockaddr_in));
48 select(1, NULL,&writefds, NULL, &Timeout);
49 //3.将socket设为堵塞
50 lBlocked = 0;
51 ioctlsocket(sk, FIONBIO ,&lBlocked);
52 if ( ! FD_ISSET(sk,&writefds))
53 {
54 closesocket(sk);
55 return FALSE;
56 }
57
58 if (m_nEventTotal >= MAX_WSAEVENT_NUMS)
59 {
60 TRACE(_T("Too many Events"));
61 closesocket(sk);
62 return FALSE;
63 }
64
65 WSAEVENT wsaEvent = WSACreateEvent();
66
67 if (wsaEvent == INVALID_HANDLE_VALUE)
68 {
69 closesocket(sk);
70 TRACE(_T("CreateEvent Failed"));
71 return FALSE;
72 }
73
74 if (WSAEventSelect(sk, wsaEvent, FD_READ|FD_WRITE|FD_CLOSE) != 0)
75 {
76 TRACE(_T("WSAEventSelect Failed"));
77 closesocket(sk);
78 CloseHandle(wsaEvent);
79 return FALSE;
80 }
81
82 UINT64 fromAddr = (UINT64)skAddr.sin_addr.S_un.S_addr << 32 | ntohs(skAddr.sin_port);
83
84 cs.Lock();
85 m_ClientList[fromAddr] = m_nEventTotal;
86 m_EventArray[m_nEventTotal] = wsaEvent;
87 m_SockArray[m_nEventTotal] = sk;
88 ++m_nEventTotal;
89 cs.Unlock();
90
91 if (m_hWkThread == NULL)
92 {
93 DWORD dwThreadID;
94 m_hWkThread = ::CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)WkThread, this, 0, &dwThreadID);
95
96 if (m_hWkThread == INVALID_HANDLE_VALUE)
97 {
98 TRACE(_T("WSAEventSelect Failed"));
99 closesocket(sk);
100 CloseHandle(wsaEvent);
101 return FALSE;
102 }
103 }
104
105 return TRUE;
106}
107
108
109BOOL CEventSocket::ListenOn(int port, int sockType, int protocol)
110{
111 SOCKADDR_IN skAddr;
112
113 skAddr.sin_family = AF_INET;
114 skAddr.sin_addr.s_addr = ADDR_ANY;
115 skAddr.sin_port = htons(port);
116
117 SOCKET sk = WSASocket(AF_INET, sockType, protocol, NULL, 0, WSA_FLAG_OVERLAPPED);
118
119 if (sk == INVALID_SOCKET)
120 {
121 TRACE(_T("WSASocket Failed"));
122 return FALSE;
123 }
124
125 if( bind(sk, (PSOCKADDR)&skAddr, sizeof(SOCKADDR_IN)) == SOCKET_ERROR )
126 {
127 TRACE(_T("Server Soket Bind Failed"));
128 closesocket(sk);
129 return FALSE;
130 }
131
132 if(listen(sk, MAX_PENDING_CONNECTIONS) == SOCKET_ERROR )
133 {
134 TRACE(_T("Server Socket Listen Failed"));
135 closesocket(sk);
136 return FALSE;
137 }
138
139 WSAEVENT wsaEvent = WSACreateEvent();
140
141 if (wsaEvent == INVALID_HANDLE_VALUE)
142 {
143 closesocket(sk);
144 TRACE(_T("CreateEvent Failed"));
145 return FALSE;
146 }
147
148 if (WSAEventSelect(sk, wsaEvent, FD_ACCEPT|FD_CLOSE) != 0)
149 {
150 TRACE(_T("WSAEventSelect Failed"));
151 closesocket(sk);
152 CloseHandle(wsaEvent);
153 return FALSE;
154 }
155
156 cs.Lock();
157 m_EventArray[m_nEventTotal] = wsaEvent;
158 m_SockArray[m_nEventTotal] = sk;
159 ++m_nEventTotal;
160 cs.Unlock();
161
162 if (m_hWkThread == NULL)
163 {
164 DWORD dwThreadID;
165 m_hWkThread = ::CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)WkThread, this, 0, &dwThreadID);
166 if (m_hWkThread == INVALID_HANDLE_VALUE)
167 {
168 TRACE(_T("WSAEventSelect Failed"));
169 closesocket(sk);
170 CloseHandle(wsaEvent);
171 return FALSE;
172 }
173 }
174
175 return TRUE;
176}
177
178
179DWORD WINAPI CEventSocket::WkThread(LPVOID lParam)
180{
181 CEventSocket * pThis = reinterpret_cast<CEventSocket *>(lParam);
182 ASSERT(NULL != pThis);
183
184 while (WaitForSingleObject(pThis->m_hStopEvent, 0) == WSA_WAIT_TIMEOUT)
185 {
186 int nIndex = ::WSAWaitForMultipleEvents(pThis->m_nEventTotal, pThis->m_EventArray,
187 FALSE, 500, FALSE);
188
189 if (nIndex == WSA_WAIT_TIMEOUT)
190 continue;
191
192 if (nIndex != WSA_WAIT_FAILED)
193 {
194 nIndex -= WSA_WAIT_EVENT_0;
195 WSANETWORKEVENTS netEvent;
196
197 WSAEnumNetworkEvents(pThis->m_SockArray[nIndex], pThis->m_EventArray[nIndex], &netEvent);
198 WSAResetEvent(pThis->m_EventArray[nIndex]);
199
200 if (netEvent.lNetworkEvents&FD_ACCEPT && netEvent.iErrorCode[FD_ACCEPT_BIT] == 0)
201 {
202 if (pThis->m_nEventTotal >= MAX_WSAEVENT_NUMS)
203 {
204 TRACE(_T("Too many Events"));
205 continue;
206 }
207
208 WSAEVENT wsaEvent = WSACreateEvent();
209 SOCKET sk = accept(pThis->m_SockArray[nIndex], NULL, NULL);
210 WSAEventSelect(sk, wsaEvent, FD_READ|FD_WRITE|FD_CLOSE);
211
212 sockaddr_in skAddr = {0};
213 int addrLen = sizeof(sockaddr_in);
214 ::getsockname (sk,(sockaddr*)&skAddr,&addrLen);
215 UINT64 fromAddr = (UINT64) skAddr.sin_addr.S_un.S_addr << 32 | ntohs(skAddr.sin_port);
216
217 pThis->cs.Lock();
218 pThis->m_ClientList[fromAddr] = pThis->m_nEventTotal;
219 pThis->m_EventArray[pThis->m_nEventTotal] = wsaEvent;
220 pThis->m_SockArray[pThis->m_nEventTotal] = sk;
221 ++pThis->m_nEventTotal;
222 pThis->cs.Unlock();
223 }
224 else if (netEvent.lNetworkEvents&FD_READ && netEvent.iErrorCode[FD_READ_BIT] == 0)
225 {
226 char buffer[2048] = {0};
227 int nDataSize = recv(pThis->m_SockArray[nIndex], buffer, 2048, 0);
228
229 sockaddr_in skAddr = {0};
230 int addrLen = sizeof(sockaddr_in);
231
232 ::getsockname (pThis->m_SockArray[nIndex],(sockaddr*)&skAddr,&addrLen);
233 UINT64 fromAddr = (UINT64) skAddr.sin_addr.S_un.S_addr << 32 | ntohs(skAddr.sin_port);
234
235 pThis->RecvDataNotidy(fromAddr, buffer, nDataSize);
236 }
237 else if (netEvent.lNetworkEvents&FD_CLOSE && netEvent.iErrorCode[FD_CLOSE_BIT] == 0)
238 {
239 sockaddr_in skAddr = {0};
240 int addrLen = sizeof(sockaddr_in);
241
242 ::getsockname (pThis->m_SockArray[nIndex],(sockaddr*)&skAddr,&addrLen);
243 UINT64 fromAddr = (UINT64) skAddr.sin_addr.S_un.S_addr << 32 | ntohs(skAddr.sin_port);
244
245 pThis->CloseNotidy(fromAddr);
246
247 pThis->cs.Lock();
248 closesocket(pThis->m_SockArray[nIndex]);
249 WSACloseEvent(pThis->m_EventArray[nIndex]);
250 for (int i = nIndex; i < pThis->m_nEventTotal; i++)
251 {
252 pThis->m_SockArray[i] = pThis->m_SockArray[i+1];
253 pThis->m_EventArray[i] = pThis->m_EventArray[i+1];
254 }
255 pThis->m_SockArray[pThis->m_nEventTotal] = NULL;
256 pThis->m_EventArray[pThis->m_nEventTotal] = NULL;
257 -- pThis->m_nEventTotal;
258 map<UINT64, int>::iterator iterIdx = pThis->m_ClientList.find(fromAddr);
259 if (iterIdx != pThis->m_ClientList.end())
260 pThis->m_ClientList.erase(iterIdx);
261 pThis->cs.Unlock();
262
263 }
264 }
265 }
266
267 return 0;
268}
269
270void CEventSocket::Stop()
271{
272 if (m_hWkThread == NULL)
273 return;
274
275 SetEvent(m_hStopEvent);
276 ::WaitForSingleObject(m_hWkThread, INFINITE);
277 ResetEvent(m_hStopEvent);
278 CloseHandle(m_hWkThread);
279 m_hWkThread = NULL;
280
281 for (int i = 0; i < m_nEventTotal; i++)
282 {
283 closesocket(m_SockArray[i]);
284 m_SockArray[i] = NULL;
285 WSACloseEvent(m_EventArray[i]);
286 m_EventArray[i] = NULL;
287 }
288
289 m_ClientList.clear();
290}
291
292void CEventSocket::ClosePeer(UINT64 peerAddr)
293{
294
295 cs.Lock();
296 map<UINT64, int>::iterator iterIdx = m_ClientList.find(peerAddr);
297 if (iterIdx != m_ClientList.end())
298 {
299 int nIndex = iterIdx->second;
300 closesocket(m_SockArray[nIndex]);
301 WSACloseEvent(m_EventArray[nIndex]);
302 for (int i = nIndex; i < m_nEventTotal; i++)
303 {
304 m_SockArray[i] = m_SockArray[i+1];
305 m_EventArray[i] = m_EventArray[i+1];
306 }
307 m_SockArray[m_nEventTotal] = NULL;
308 m_EventArray[m_nEventTotal] = NULL;
309 m_ClientList.erase(iterIdx);
310 -- m_nEventTotal;
311 }
312 cs.Unlock();
313}
314
315
316int CEventSocket::SendData(UINT64 toAddr, char *pData, int nDataSize)
317{
318 map<UINT64, int>::iterator iterIdx = m_ClientList.find(toAddr);
319
320 if (iterIdx != m_ClientList.end() && m_SockArray[iterIdx->second] != NULL)
321 {
322 return send(m_SockArray[iterIdx->second], pData, nDataSize, 0);
323 }
324
325 return -1;
326}
posted on 2008-12-23 13:12
黑色天使 阅读(574)
评论(0) 编辑 收藏 引用 所属分类:
C\C++