Introduction
I have been writing server applications on Windows server platforms for quite sometime. We all know that I/O completion port is one of the nicest things available for writing server applications (or even in clients where you can apply Worker-Boss pattern). There are tons of articles and samples to show how I/O completion port can be used with TCP/IP. Surprisingly enough, I did not find any source code examples to demonstrate the use of I/O completion ports with UDP.
To spice up things a little, I took the multicast topic to demonstrate the code. Most of the multicast code is borrowed from Mr. Bob Quinn's article.
This sample consists of two projects (Client and Server). In order to test this application as it is, routers in your network should be enabled for multicast routing. If not, modify samples appropriately to act just UDP server and client.
Since there is so much written about I/O completion port and multicast, I won't go into all the details in this article.
Server application design and implementation is an art. You can put so many bells and whistles. Typical server application should have the following:
-
Object Pooling
If any of the object is to be created and destroyed often, consider making a pool of such objects. Typical example would be your sockets, user context objects, database connections.
-
Ease of configuration
Do not hardcode configuration information in the code. Try to use XML configuration files. Let the application read these vales during the startup. Typical examples would be, IP address of other servers, Oracle TNS names, user ids and passwords (!!!), number of worker threads, email addresses etc.
Besides, there should be some mechanism to change these parameters while the server is running.
-
Monitoring and logging
Since most of these servers will be running somewhere in the data center, it is nice to have a small component sitting in your server application which will send log messages to central server from where you multicast those messages to other client applications. Make sure you have very good filtering mechanism so that you can control the number of messages being logged during the run time.
-
Error handling
Your servers have to be running nonstop (four nines most of the time). That means you should have proper error handling mechanism. Make use of SEH in a responsible manner. Besides keep track of number of TCP connections (using SNMP) and CPU usage, memory usage (Microsoft provides a library by which you can query all these parameters from within your program).
-
Thread Synchronization
Make sure you thread protects the shared data.
-
Load balancing.
You can not serve infinite number of clients from one server box. Always keep the 'scale-out' factor in mind. Do proper capacity planning to quantify your server hardware requirement. 80-20 rule is quite effective.
Source Code
Collapse
#include "StdAfx.h"
#include <winsock2.h>
#include <ws2tcpip.h>
#include "Stdio.h"
#define BUFSIZE 1024 //max size of incoming data buffer
#define MAXADDRSTR 16
#define DEFAULT_GROUP_ADDRESS "239.254.1.2"
#define DEFAULT_PORT 7125
LONG nCount = 0;
HANDLE g_hCompletionPort;
DWORD WINAPI WorkerThread( LPVOID WorkContext );
BOOL HandleIncomingData( UCHAR* pBuf);
BOOL CreateNetConnections( VOID );
BOOL CreateWorkers( UINT );
void InitWinsock2();
void UnInitWinsock2();
HANDLE g_hReadEvent;
SOCKET g_hSocket;
UCHAR achInBuf [BUFSIZE];
char achMCAddr[MAXADDRSTR] = DEFAULT_GROUP_ADDRESS;
u_short nPort = DEFAULT_PORT;
OVERLAPPED Overlapped;
void InitWinsock2()
{
WSADATA data;
WORD version;
int ret = 0;
version = (MAKEWORD(2, 2));
ret = WSAStartup(version, &data);
if (ret != 0)
{
ret = WSAGetLastError();
if (ret == WSANOTINITIALISED)
{
printf("not initialised");
}
}
}
void UnInitWinsock2()
{
WSACleanup();
}
BOOL CreateNetConnections (void)
{
DWORD nbytes;
BOOL b;
BOOL fFlag = TRUE;
int nRet=0;
SOCKADDR_IN stLclAddr;
struct ip_mreq stMreq;
g_hSocket = socket(AF_INET, SOCK_DGRAM,0);
if (g_hSocket == INVALID_SOCKET)
{
printf ("socket() failed, Err: %d\n", WSAGetLastError());
return FALSE;
}
nRet = setsockopt(g_hSocket,SOL_SOCKET,
SO_REUSEADDR, (char *)&fFlag, sizeof(fFlag));
if (nRet == SOCKET_ERROR)
{
printf ("setsockopt() SO_REUSEADDR failed,
Err: %d\n",WSAGetLastError());
}
stLclAddr.sin_family = AF_INET;
stLclAddr.sin_addr.s_addr = htonl(INADDR_ANY);
stLclAddr.sin_port = htons(nPort);
nRet = bind(g_hSocket,(struct sockaddr*) &stLclAddr,sizeof(stLclAddr));
if (nRet == SOCKET_ERROR)
{
printf ("bind() port: %d failed, Err: %d\n",
nPort,WSAGetLastError());
}
stMreq.imr_multiaddr.s_addr = inet_addr(achMCAddr);
stMreq.imr_interface.s_addr = INADDR_ANY;
nRet = setsockopt(g_hSocket,IPPROTO_IP,
IP_ADD_MEMBERSHIP,(char *)&stMreq,sizeof(stMreq));
if (nRet == SOCKET_ERROR)
{
printf("setsockopt() IP_ADD_MEMBERSHIP address %s failed,
Err: %d\n",achMCAddr,
WSAGetLastError());
}
g_hCompletionPort = CreateIoCompletionPort (INVALID_HANDLE_VALUE,
NULL,0,3);
if (!g_hCompletionPort)
{
fprintf (stdout, "g_hCompletionPort Create Failed\n");
return FALSE;
}
CreateIoCompletionPort((HANDLE)g_hSocket,g_hCompletionPort,
(DWORD)g_hSocket,3);
Overlapped.hEvent = g_hReadEvent;
Overlapped.Internal = 0;
Overlapped.InternalHigh = 0;
Overlapped.Offset = 0;
Overlapped.OffsetHigh = 0;
b = ReadFile ((HANDLE)g_hSocket,&achInBuf,
sizeof(achInBuf),&nbytes,&Overlapped);
if (!b && GetLastError () != ERROR_IO_PENDING)
{
fprintf (stdout, "ReadFile Failed\n");
return FALSE;
}
return TRUE;
}
BOOL CreateWorkers (UINT dwNumberOfWorkers)
{
DWORD ThreadId;
HANDLE ThreadHandle;
DWORD i;
for (i = 0; i < dwNumberOfWorkers; i++)
{
ThreadHandle = CreateThread (NULL,0,
WorkerThread,NULL,0,&ThreadId);
if (!ThreadHandle)
{
fprintf (stdout, "Create Worker Thread Failed\n");
return FALSE;
}
CloseHandle (ThreadHandle);
}
return TRUE;
}
DWORD WINAPI WorkerThread (LPVOID WorkContext)
{
DWORD nSocket;
BOOL b;
OVERLAPPED ovl;
LPOVERLAPPED lpo=&ovl;
DWORD nBytesRead=0;
DWORD nBytesToBeRead;
UCHAR ReadBuffer[BUFSIZE];
LPVOID lpMsgBuf;
memset(&ReadBuffer,0,BUFSIZE);
for (;;)
{
b = GetQueuedCompletionStatus(g_hCompletionPort,
&nBytesToBeRead,&nSocket,&lpo,INFINITE);
if (b || lpo)
{
if (b)
{
OVERLAPPED ol;
ol.hEvent = g_hReadEvent;
ol.Offset = 0;
ol.OffsetHigh = 0;
b = ReadFile ((HANDLE)nSocket,&ReadBuffer,
nBytesToBeRead,&nBytesRead,&ol);
if (!b )
{
DWORD dwErrCode = GetLastError();
if( dwErrCode != ERROR_IO_PENDING )
{
printf("Something has gone
wrong:Error code - %d\n",dwErrCode );
FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, dwErrCode,
MAKELANGID(LANG_NEUTRAL,
SUBLANG_DEFAULT),
(LPTSTR) &lpMsgBuf, 0, NULL);
OutputDebugString((LPCTSTR)lpMsgBuf);
LocalFree(lpMsgBuf );
}
else if( dwErrCode == ERROR_IO_PENDING )
{
WaitForSingleObject(ol.hEvent,INFINITE);
HandleIncomingData(ReadBuffer);
}
}
else
{
HandleIncomingData(ReadBuffer);
}
continue;
}
else
{
fprintf (stdout, "WorkThread Wait Failed\n");
}
}
return 1;
}
}
BOOL HandleIncomingData( UCHAR* pBuf)
{
InterlockedIncrement(&nCount);
SYSTEMTIME *lpstSysTime;
lpstSysTime = (SYSTEMTIME *)(pBuf);
printf("[%d]UTC Time %02d:%02d:%02d:%03d on %02d-%02d-%d \n",nCount,
lpstSysTime->wHour, lpstSysTime->wMinute,
lpstSysTime->wSecond, lpstSysTime->wMilliseconds,
lpstSysTime->wMonth, lpstSysTime->wDay, lpstSysTime->wYear);
memset(&pBuf,0,BUFSIZE);
return TRUE;
}
main ()
{
printf("\n***************************************\n");
printf("Group IP address: %s\n",achMCAddr);
printf("Port number : %d\n",nPort);
printf("\n***************************************\n");
InitWinsock2();
HANDLE hWait2Exit = CreateEvent(NULL,FALSE,TRUE,"MCLIENT");
ResetEvent(hWait2Exit );
g_hReadEvent = CreateEvent(NULL,TRUE,TRUE,NULL);
SetThreadPriority (GetCurrentThread (), THREAD_PRIORITY_TIME_CRITICAL);
if (!CreateNetConnections ())
{
printf("Error condition @ CreateNetConnections , exiting\n");
return 1;
}
if (!CreateWorkers (5))
{
printf("Error condition @CreateWorkers, exiting\n");
return 1;
}
WaitForSingleObject(hWait2Exit,INFINITE);
UnInitWinsock2();
return 1;
}
posted on 2007-08-17 12:56
聂文龙 阅读(1071)
评论(0) 编辑 收藏 引用 所属分类:
net work