I/O完成端口背后的理论是同时运行的线程数必须有个上界;也就是,500个并发的客户端请求不必要500个线程存在。那么,合适的并发线程数是多少呢?你会意识到,如果一个机器有两个CPU,那么在此基础上多余两个以上的线程实在是没有意义。因为,当有超过CPU数量的线程数时,系统不得不耗费时间来进行线程之间的切换,这会浪费宝贵的CPU时钟周期。
为每个客户端创建一个线程还有一个不足,就是创建一个线程虽然比创建一个进程廉价,但它远远不是免费的。如果服务器应用程序在启动时创建一个线程池,并且池子中的线程留驻在程序的整个生命期内,那么服务器的性能会得到提高。I/O完成端口在设计上就是使用了线程池。
I/O完成端口可能是最复杂的核心对象。要创建一个完成端口你可以使用CreateIoCompletionPort函数。
HANDLE CreateIoCompletionPort(
HANDLE hfile,
HANDLE hExistingCompPort,
ULONG_PTR CompKey,
DWORD dwNumberOfConcurrentThreads);
或者封装:
HANDLE CreateNewCompletionPort(DWORD dwNumberOfConcurrentThreads) {
return(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0,
dwNumberOfConcurrentThreads));
}
这个函数根据传入的参数的不同,可以完成两个完全不同的功能。一是可以创建一个完成端口,另一个是可以将一个设备与完成端口关联起来。例如要产生一个完成端口,看起来可以这样调用函数:
hCompPort = CreateIoCompletionPort(
INVALID_HANDLE_VALUE,
NULL,
0,
0);
FileHandle —— 文件或设备的句柄。在产生完成端口时,这个参数应设置为INVALID_HANDLE_VALUE,于是产生一个没有和任何句柄有关系的port。
ExistingCompletionPort —— 在产生完成端口时此参数设定为NULL,产生一个新的port。如果此参数指定为一个已存在的完成端口的句柄,那么在上一个参数FileHandle中指定的句柄就会被加到此port上,而不会产生新的port。
CompletionKey —— 用户自定义的一个值,将被交给提供服务的线程。此值和FileHandle有关联。
NumberOfConcurrentThreads —— 与此完成端口有关联的线程个数。如果设定为0则为CPU的个数。
NumberOfConcurrentThreads这个参数是告诉完成端口可同时运行的最大线程数。如果传0,则默认值为CPU的个数。在大多数情况下为了避免不必要的线程切换你可以传0。有时你也许会想要增加这个值,那就是当来自客户端的请求需要一个长时间的计算甚至发生阻塞时,但通常不鼓励增加这个值。你可以尝试往这个参数传不同的值来测试比较服务器的性能。
关联一个设备到完成端口
当你创建完成端口时,系统内核会创建5个不同的数据结构.
第一个数据结构是设备列表,指明了被关联到完成端口上的设备。将设备关联到完成端口上,你同样是调用CreateIoCompletionPort函数。看起来可以这样调用函数:
HANDLE h = CreateIoCompletionPort(hDevice, hCompPort, dwCompKey, 0);
或者封装:
BOOL AssociateDeviceWithCompletionPort(
HANDLE hCompPort, HANDLE hDevice, DWORD dwCompKey) {
HANDLE h = CreateIoCompletionPort(hDevice, hCompPort, dwCompKey, 0);
return(h == hCompPort);
}
这样就添加一个记录(entry)到一个已经存在的完成端口设备列表上。你在第一个参数上传递一个已经存在的完成端口句柄(在第一次调CreateIoCompletionPort时得到),在第二个参数上传递设备句柄(这个设备可以是file,socket,
mailslot,pipe等等),在第三个参数上传递一个完成键(由用户定义的值,操作系统不关心你传递的是什么值)。每当你关联一个设备到完成端口上时,系统就添加这些信息到完成端口设备列表上。
注意:CreateIoCompletionPort很复杂,推荐把它看成两个函数来使用。下面有个例子把创建端口和关联放在一次调用中完成,但是不推荐这样使用。下面的代码在打开一个文件并在同一时间创建一个新的完成端口,同时关联该设备与完成端口。所有这个文件I/O请求都将在完成时使用CK_FILE这个完成键,完成端口将允许两个线程来并发执行。
#define CK_FILE 1
HANDLE hfile = CreateFile(...);
HANDLE hCompPort = CreateIoCompletionPort(hfile, NULL, CK_FILE, 2);
第二个数据结构是一I/O完成队列。当针对一个设备的异步I/O请求完成时,系统就检查看这个设备在之前是否被关联到一个完成端口过,如果是,系统就将这个已经完成的I/O请求作为一条记录,添加到对应的完成端口的I/O完成队列末尾。在队列中的每条记录都指明了4个值:已传输的字节数;完成键(当设备被关联到端口上时通过第三个参数设定的);一个指针,指向I/O请求的OVERLAPPED结构;和一个错误码。
注意:对一个设备发出一个I/O请求,但并不将它记录到完成端口的完成队列上是可以的。这不是常态,但有时却派得上用场。比如,当你通过socket发送数据,但你并不在意这些数据是否被处理时。(或者有时候想用旧有的受激发的event对象机制来激发,而不用I/O完成队列)
为了这么做,请你设定一个OVERLAPPED结构,内含一个合法的手动重置(manual-reset)event对象,放在hEvent栏位。然后把该handle的最低位设置为1。虽然这听起来象是一种黑客行为,但档案中有交代。看起来象这样:
Overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
Overlapped.hEvent = (HANDLE) ((DWORD_PTR) Overlapped.hEvent | 1);
ReadFile(..., &Overlapped);
同样,不要忘记在关闭event句柄之前重置低位:
CloseHandle((HANDLE) ((DWORD_PTR) Overlapped.hEvent & ~1));
Architecting Around an I/O Completion Port
当你的服务器应用程序启动时,它将通过调用像CreateNewCompletionPort这样的函数来创建I/O完成端口。然后应用程序创建一个线程池来处理client请求。现在你有个问题要问,“线程池中有多少线程?”。这是一个很难回答的问题,我将在“How Many Threads in the Pool?”这个小节中给出详细的回答。目前,单凭经验的标准是本地机器上的CPU数再乘以2。比如,一个双CPU的机器上,你就应该在线程池中创建4个线程。
池子里的所有线程都将执行相同的线程函数。通常,这个线程函数执行一些初始化工作,然后就进入一个循环,直到服务进程被指示停止时才结束掉。在循环里,线程将自己休眠,等待关联到完成端口上的设备的I/O请求完成。线程通过调用GetQueuedCompletionStatus函数进入等待。
下面是函数原型:
BOOL GetQueuedCompletionStatus(
HANDLE hCompPort,
PDWORD pdwNumBytes,
PULONG_PTR CompKey,
OVERLAPPED** ppOverlapped,
DWORD dwMilliseconds);
第一个参数hCompPort传递一个完成端口句柄,表明线程监控的是哪一个完成端口。许多服务程序都是使用的单一完成端口,所有I/O请求的完成通知都在这一个端口上。基本上,GetQueuedCompletionStatus的工作就是将调用此函数的线程休眠,直到一条记录出现在指定完成端口上的I/O完成队列中,或者直到指定的超时时间到达(超时时间在dwMilliseconds参数中指定)。
第三个数据结构就是等待中的线程队列(the waiting thread queue)。在线程池中的每一个调用了GetQueuedCompletionStatus的线程,其线程ID就被放置到等待中的线程队列里,以使I/O完成端口核心对象能知道当前有哪些线程正等待着处理已经完成了的I/O请求。当有一条记录出现在端口的I/O完成队列时,完成端口就唤醒一个在等待队列中的线程。这个线程会得到组成这条完成记录的一些信息:已传输的字节数,完成键,和一个OVERLAPPED结构的地址。这些信息通过传递给GetQueuedCompletionStatus函数的三个参数(pdwNumBytes、CompKey、ppOverlapped)返回到线程中。GetQueuedCompletionStatus函数的返回值有点复杂,下面的代码演示了如何处理不同的返回值:
DWORD dwNumBytes;
ULONG_PTR CompKey;
OVERLAPPED* pOverlapped;
// hIOCP is initialized somewhere else in the program
BOOL fOk = GetQueuedCompletionStatus(hIOCP,
&dwNumBytes, &CompKey, &pOverlapped, 1000);
DWORD dwError = GetLastError();
if (fOk) {
// Process a successfully completed I/O request
} else {
if (pOverlapped != NULL) {
// Process a failed completed I/O request
// dwError contains the reason for failure
} else {
if (dwError == WAIT_TIMEOUT) {
// Time-out while waiting for completed I/O entry
} else {
// Bad call to GetQueuedCompletionStatus
// dwError contains the reason for the bad call
}
}
}
如你所预料到的,I/O完成队列是以队列的方式移出记录的,也就是先进先出FIFO。然而,你可能没有料到的是,调用GetQueuedCompletionStatus函数进入等待的线程却是以栈的方式来唤醒的,也就是LIFO,最后调用函数进入等待的那个线程被最先唤醒。这样做的理由是为了进一步提高处理性能。例如,有四个线程在“等待线程队列”中等待着。如果这个时候只有一条“已完成的I/O”记录出现了,那么最后一个调用GetQueuedCompletionStatus的线程被唤醒来处理这条记录。当这个线程处理完这条记录时,此线程再次调用GetQueuedCompletionStatus函数进入等待线程队列。那么,此时当有另一条I/O完成记录出现时,刚才那个线程被继续唤醒来处理这条新的记录。(这样的好处就是可以避免线程间的切换)
只要I/O请求完成得够迟缓,那么单个线程就足以处理他们。系统只需要保持唤醒同一个线程,并让其他三个线程持续休眠。使用LIFO算法,线程就不必花时间将他们在内存中的资源置换到磁盘中(例如栈空间),并从CPU的cache中替换出去。
I/O完成端口如何管理线程池
现在该讨论为什么完成端口是如此有用了。首先,在你创建完成端口的时候,你通过CreateIoCompletionPort函数的最后一个参数指定可以并行运行的线程数量。我们说过,这个值通常设置为主机上的CPU数量。当已经完成的I/O记录进入队列时,完成端口就唤醒等待中的线程。然而,完成端口只能唤醒你所指定的那么多个线程。于是,在双CPU的机器中,如果有4个I/O请求完成,并有4个线程在等待,但完成端口却只能唤醒2个线程,另2个线程保持休眠。每个得到唤醒在处理已完成的I/O记录的线程,处理完成后,又重新调用GetQueuedCompletionStatus函数进入等待。因为线程的唤醒算法是后进先出,所以系统看到还有已完成的I/O记录在排队时,又唤醒刚才被唤醒过的线程。
仔细思考这个过程,你似乎会觉得好象有些问题:既然完成端口仅允许指定数量的线程被唤醒,为什么还要产生多余的线程等待在线程池中呢?比如,在上一段中所叙述的那种情况,在一个双CPU的主机上,创建一个完成端口,并告之只允许2个线程并行处理。但是创建线程池时我们创建了4个线程(按照2倍CPU的数量),看上去我们好象创建了2个多余的线程——他们永远也不会被唤醒。(因为LIFO)
但是不用担心,完成端口是非常聪明的。当完成端口唤醒一个线程时,会将此线程的ID放入第四个数据结构中——被释放的线程列表。这可以让完成端口记住哪些线程是被唤醒了的,并监控这些线程的执行。如果某个在“被释放的线程列表”中的线程因为调用了某功能而被挂起(即产生阻塞)那么完成端口会侦测到这一情况,并将此线程的ID从“被释放的线程列表”中移动到“暂停的线程列表”——即第五个数据结构中。
完成端口的目标就是要保证在“被释放的线程列表”中的记录条数保持为你在创建完成端口时所指定的并发线程数。因此,当一个被唤醒的线程因为某种原因而被挂起时,完成端口就会释放掉另一个在等待中的线程,以次来维持“被释放的线程列表”中的记录数保持为你所指定的并发线程数。如果被挂起的线程重新被唤醒,它又会从“暂停的线程列表”中移动到“被释放的线程列表”。这个时候需要注意的是加上刚才被另外唤醒的一个线程,此时在“被释放的线程列表”中的记录数会超过你所指定的并发线程数(看上去很奇怪,但却是这样)。完成端口意识到这一情况后将不会再唤醒其他线程,除非并行线程数又降到CPU数量之下。实际并行线程数将只在你指定的最大并行线程数之上停留很短的时间并且线程在循环到再次调用GetQueuedCompletionStatus时这种情况将快速的停止下来。
注意:一旦线程调用GetQueuedCompletionStatus,那么这个线程就被“分配”到那指定的完成端口上去。系统会假定那些被“分配”的线程代表完成端口在工作。你可以终止线程和完成端口间的这种“分配”关系,通过以下三种方式:
终止线程;
让线程调用GetQueuedCompletionStatus,并传递另一个完成端口的句柄,这样就终止掉当前的“分配”关系,与另一完成端口产生新的“分配”关系。
销毁完成端口。
在线程池中创建多少个线程?
现在是讨论线程池中需要多少个线程的好时机。考虑两点:首先,当服务程序启动时你应该创建一个最小的线程集,这样你就不用象每客户端每线程模型那样频繁地创建和销毁线程了。因为创建和销毁线程都需要耗费CPU时间,所以你应该尽可能的少做这些操作。其次,你应该设置线程数的最大值,因为创建太多的线程会相应耗费太多的系统资源。就算大多数资源能被交换出RAM,但你还是应该最小限度地使用系统资源,不要浪费,最好不要产生paging file space。
你也许应该测试不同的线程数。大多数服务程序(包括MS的IIS)都使用启发式(heuristic,实际就是枚举选择,通过可选择的方法发现的几种解决办法中最合适的一种,在一个程序的连续阶段被选择出来以用于该程序的下一步的一种解决问题的技巧的应用)算法来管理他们的线程池。我推荐你也使用同样的方法。例如,你可以创建如下的变量来管理线程池。
LONG g_nThreadsMin; // Minimum number of threads in pool
LONG g_nThreadsMax; // Maximum number of threads in pool
LONG g_nThreadsCrnt; // Current number of threads in pool
LONG g_nThreadsBusy; // Number of busy threads in pool
当你的程序启动时,你可以创建g_nThreadsMin这么多个线程,都执行相同的线程函数。线程函数看起来是这样:
DWORD WINAPI ThreadPoolFunc(PVOID pv) {
// Thread is entering pool
InterlockedIncrement(&g_nThreadsCrnt);
InterlockedIncrement(&g_nThreadsBusy);
for (BOOL fStayInPool = TRUE; fStayInPool;) {
// Thread stops executing and waits for something to do
InterlockedDecrement(&m_nThreadsBusy);
BOOL fOk = GetQueuedCompletionStatus(...);
DWORD dwIOError = GetLastError();
// Thread has something to do, so it's busy
int nThreadsBusy = InterlockedIncrement(&m_nThreadsBusy);
// Should we add another thread to the pool?
if (nThreadsBusy == m_nThreadsCrnt) { // All threads are busy
if (nThreadsBusy < m_nThreadsMax) { // The pool isn't full
if (GetCPUUsage() < 75) { // CPU usage is below 75%
// Add thread to pool
CloseHandle(chBEGINTHREADEX(...));
}
}
}
if (!fOk && (dwIOError == WAIT_TIMEOUT)) { // Thread timed-out
if (!ThreadHasIoPending()) {
// There isn't much for the server to do, and this thread
// can die because it has no outstanding I/O requests
fStayInPool = FALSE;
}
}
if (fOk || (po != NULL)) {
// Thread woke to process something; process it
...
if (GetCPUUsage() > 90) { // CPU usage is above 90%
if (!ThreadHasIoPending()) { // No pending I/O requests
if (g_nThreadsCrnt > g_nThreadsMin)) { // Pool above min
fStayInPool = FALSE; // Remove thread from pool
}
}
}
}
}
// Thread is leaving pool
InterlockedDecrement(&g_nThreadsBusy);
InterlockedDecrement(&g_nThreadsCurrent);
return(0);
}
注意:在这章稍早的时候(in the section “Canceling Queued Device I/O Requests”)。我说过:当线程退出的时候,系统自动取消所有已发起但处于挂起中的I/O请求。这就是为什么伪代码像ThreadHasIoPending这样的函数是必须的。如果线程还有未完成的I/O请求,就不要允许线程结束。
许多服务都提供了管理工具,以使管理员可以通过此来控制线程池的行为。例如,设置线程数的最小值和最大值,CPU使用极限,以及最大并发线程数。
Simulating Completed I/O Requests
I/O完成端口并不是只能用于device I/O,它也是用来解决线程间通讯问题的卓越的机制。
BOOL PostQueuedCompletionStatus(
HANDLE hCompPort,
DWORD dwNumBytes,
ULONG_PTR CompKey,
OVERLAPPED* pOverlapped);
这个函数将一个“已完成的I/O”通知添加到一个完成端口的队列上。第一个参数指定要针对的完成端口,剩下的三个参数,dwNumBtyes,ComKey和pOverlapped指出了线程调用 GetQueuedCompletionStatus时将从对应的返回参数中返回的值。当一个线程从“I/O完成队列”中取出一条模拟记录时,GetQueuedCompletionStatus函数将返回TRUE,表明已经成功地执行了一个I/O请求。
函数PostQueuedCompletionStatus难以置信的有用,它提供了一种方式让你可以跟线程池里的所有线程通讯。例如,当用户终止服务应用程序时,你当然希望所有的线程都能够干干净净地退出。但如果有线程等待在完成端口上,并且此时又一直没有I/O请求到达,那么这些线程就不能被唤醒。通过对线程池中的每个线程调用一次PostQueuedCompletionStatus,每个线程都能得以唤醒,你就可以设计检查从GetQueuedCompletionStatus的返回参数中得到的值(比如设定完成键),可以判断出是否是程序要终止了,由此可决定是否要做清理工作并退出。
当使用这项技术,你必须要特别小心。我的例子能工作是因为在池中的线程会死去并不会再次调用GetQueuedCompletionStatus。不管怎样,如果你想要通知其池中的每个线程一些事情,并且它们会循环到再次调用GetQueuedCompletionStatus,你将会有一个问题。因为线程是按LIFO的顺序醒来。所以你的应用程序中你将会使用一些额外的线程同步来确保每个线程获得了合适的机会来查看它的伪I/O记录。不使用这个格外的线程同步,一个线程可能会查看相同的通知好几次。
from:油库七号
posted on 2008-10-29 18:36
我风 阅读(1615)
评论(0) 编辑 收藏 引用