摘要: 9.1
意图
接受器-连接器设计模式(Acceptor-Connector)使分布式系统中的连接建立及服务初始化与一旦服务初始化后所执行的处理去耦合。这样的去耦合通过三种组件来完成:acceptor、connector和service handler(服务处理器)。连接器主动地建立到远地接受器组件的连接,并初始化服务处理器来处理在连接上交...
阅读全文
posted @
2007-02-27 21:47 walkspeed 阅读(3541) |
评论 (0) |
编辑 收藏
摘要: 接受器/连接器模式设计用于降低连接建立与连接建立后所执行的服务之间的耦合。例如,在WWW浏览器中,所执行的服务或“实际工作”是解析和显示客户浏览器接收到的HTML页面。连接建立是次要的,可能通过BSD socket或其他一些等价的IPC机制来完成。使用这些模式允许程序员专注于“实际工作”,而最少限度地去关心怎样在服务器和客户之间建立连接。而另外一方面,程序员也可以独立于他所编写的、或将要编写的服务...
阅读全文
posted @
2007-02-27 21:40 walkspeed 阅读(7221) |
评论 (1) |
编辑 收藏
第
8
章
前摄器(
Proactor
):用于为异步事件多路分离和分派处理器的对象行为模式
Irfan Pyarali Tim Harrison Douglas C. Schmidt Thomas D. Jordan
现代操作系统为开发并发应用提供了多种机制。同步多线程是一种流行的机制,用于开发同时执行多个操作的应用。但是,线程常常有很高的性能开销,并且需要对同步模式和原理有深入的了解。因此,有越来越多的操作系统支持异步机制,在减少多线程的大量开销和复杂性的同时,提供了并发的好处。
本论文中介绍的前摄器(Proactor
)模式描述怎样构造应用和系统,以有效地利用操作系统支持的异步机制。当应用调用异步操作时,OS
代表应用执行此操作。这使得应用可以让多个操作同时运行,而又不需要应用拥有相应数目的线程。因此,通过使用更少的线程和有效利用OS
对异步操作的支持,前摄器模式简化了并发编程,并改善了性能。
前摄器模式支持多个事件处理器的多路分离和分派,这些处理器由异步事件的完成来触发。通过集成完成事件(completion event)的多路分离和相应的事件处理器的分派,该模式简化了异步应用的开发。
这一部分提供使用前摄器模式的上下文和动机。
前摄器模式应该被用于应用需要并发执行操作的性能好处、又不想受到同步多线程或反应式编程的约束时。为说明这些好处,设想一个需要并发执行多个操作的网络应用。例如,一个高性能Web服务器必须并发处理发送自多个客户的HTTP请求[1, 2]。图8-1 显示了Web浏览器和Web服务器之间的典型交互。当用户指示浏览器打开一个URL时,浏览器发送一个HTTP GET请求给Web服务器。收到请求,服务器就解析并校验请求,并将指定的文件发回给浏览器。
图8-1 典型的Web服务器通信软件体系结构
开发高性能Web服务器要求消除以下压力:
-
并发性:服务器必须同时执行多个客户请求;
-
效率:服务器必须最小化响应延迟、最大化吞吐量,并避免不必要地使用CPU;
-
编程简单性:服务器的设计应该简化高效的并发策略的使用;
-
可适配性:应该使继承新的或改进的传输协议(比如HTTP 1.1[3])所带来的维护代价最小化。
Web服务器可以使用若干并发策略来实现,包括多个同步线程、反应式同步事件分派和前摄式异步事件分派。下面,我们检查传统方法的缺点,并解释前摄器模式是怎样提供一种强大的技术,为高性能并发应用而支持高效、灵活的异步事件分派策略的。
同步的多线程和反应式编程是实现并发的常用方法。这一部分描述这些编程模型的缺点。
或许最为直观的实现并发Web服务器的途径是使用同步的多线程。在此模型中,多个服务器线程同时处理来自多个客户的HTTP GET请求。每个线程同步地执行连接建立、HTTP请求读取、请求解析和文件传输操作。作为结果,每个操作都阻塞直到完成。
同步线程的主要优点是应用代码的简化。特别是,Web服务器为服务客户A的请求所执行的操作在很大程度上独立于为服务客户B的请求所需的操作。因而,很容易在分离的线程中对不同的请求进行服务,因为在线程之间共享的状态数量很少;这也最小化了对同步的需要。而且,在分离的线程中执行应用逻辑也使得开发者可以使用直观的顺序命令和阻塞操作。
图8-2 多线程Web服务器体系结构
图8-2显示使用同步线程来设计的Web服务器怎样并发地处理多个客户请求。该图显示的Sync Acceptor对象封装服务器端用于同步接受网络连接的机制。使用“Thread Per Connection”并发模型,各个线程为服务HTTP GET请求所执行的一系列步骤可被总结如下:
- 每个线程同步地阻塞在accept socket调用中,等待客户连接请求;
- 客户连接到服务器,连接被接受;
- 新客户的HTTP请求被同步地从网络连接中读取;
- 请求被解析;
- 所请求的文件被同步地读取;
- 文件被同步地发送给客户。
附录A.1中有一个将同步线程模型应用于Web服务器的C++代码例子。
如上所述,每个并发地连接的客户由一个专用的服务器线程服务。在继续为其他HTTP请求服务之前,该线程同步地完成一个被请求的操作。因此,要在服务多个客户时执行同步I/O,Web服务器必须派生多个线程。尽管这种同步线程模式是直观的,且能够相对高效地映射到多CPU平台上,它还是有以下缺点:
线程策略与并发策略被紧耦合:这种体系结构要求每个相连客户都有一个专用的线程。通过针对可用资源(比如使用线程池来对应CPU的数目)、而不是正被并发服务的客户的数目来调整其线程策略,可能会更好地优化一个并发应用;
更大的同步复杂性:线程可能会增加序列化对服务器的共享资源(比如缓存文件和Web页面点击日志)的访问所必需的同步机制的复杂性;
更多的性能开销:由于上下文切换、同步和CPU间的数据移动[4],线程的执行可能很低效;
不可移植性:线程有可能在有些平台上不可用。而且,根据对占先式和非占先式线程的支持,OS平台之间的差异非常大。因而,很难构建能够跨平台统一运作的多线程服务器。
作为这些缺点的结果,多线程常常不是开发并发Web服务器的最为高效的、也不是最不复杂的解决方案。
另一种实现同步Web服务器的常用方法是使用反应式事件分派模型。反应堆(Reactor)模式描述应用怎样将Event Handler登记到Initiation Dispatcher。Initiation Dispatcher通知Event Handler何时能发起一项操作而不阻塞。
单线程并发Web服务器可以使用反应式事件分派模型,它在一个事件循环中等待Reactor通知它发起适当的操作。Web服务器中反应式操作的一个例子是Acceptor(接受器)[6]到Initiation Dispatcher的登记。当数据在网络连接上到达时,分派器回调Acceptor,后者接受网络连接,并创建HTTP Handler。于是这个HTTP Handler就登记到Reactor,以在Web服务器的单线程控制中处理在那个连接上到来的URL请求。
图8-3和图8-4显示使用反应式事件分派设计的Web服务器怎样处理多个客户。图8-3显示当客户连接到Web服务器时所采取的步骤。图8-4显示Web服务器怎样处理客户请求。图8-3的一系列步骤可被总结如下:
图8-3 客户连接到反应式Web服务器
图8-4 客户发送HTTP请求到反应式Web服务器
- Web服务器将Acceptor登记到Initiation Dispatcher,以接受新连接;
- Web服务器调用Initiation Dispatcher的事件循环;
- 客户连接到Web服务器;
- Initiation Dispatcher将新连接请求通知Acceptor,后者接受新连接;
- Acceptor创建HTTP Handler,以服务新客户;
- HTTP Handler将连接登记到Initiation Dispatcher,以读取客户请求数据(就是说,在连接变得“读就绪”时);
- HTTP Handler服务来自新客户的请求。
图8-4显示反应式Web服务器为服务HTTP GET请求所采取的一系列步骤。该过程描述如下:
- 客户发送HTTP GET请求;
- 当客户请求数据到达服务器时,Initiation Dispatcher通知HTTP Handler;
- 请求以非阻塞方式被读取,于是如果操作会导致调用线程阻塞,读操作就返回EWOULDBLOCK(步骤2和3将重复直到请求被完全读取);
- HTTP Handler解析HTTP请求;
- 所请求的文件从文件系统中被同步读取;
- 为发送文件数据(就是说,当连接变得“写就绪”时),HTTP Handler将连接登记到Initiation Dispatcher;
- 当TCP连接变得写就绪时,Initiation Dispatcher通知HTTP Handler;
- HTTP Handler以非阻塞方式将所请求文件发送给客户,于是如果操作会导致调用线程阻塞,写操作就返回EWOULDBLOCK(步骤7和8将重复直到数据被完全递送)。
附录A.2中有一个将反应式事件分派模型应用于Web服务器的C++代码例子。
因为Initiation Dispatcher运行在单线程中,网络I/O操作以非阻塞方式运行在Reactor的控制之下。如果当前操作的进度停止了,操作就被转手给Initiation Dispatcher,由它监控系统操作的状态。当操作可以再度前进时,适当的Event Handler会被通知。
反应式模式的主要优点是可移植性,粗粒度并发控制带来的低开销(就是说,单线程不需要同步或上下文切换),以及通过使应用逻辑与分派机制去耦合所获得的模块性。但是,该方法有以下缺点:
复杂的编程:如从前面的列表所看到的,程序员必须编写复杂的逻辑,以保证服务器不会在服务一个特定客户时阻塞。
缺乏多线程的OS
支持:大多数操作系统通过select系统调用[7]来实现反应式分派模型。但是,select不允许多于一个的线程在同一个描述符集上等待。这使得反应式模型不适用于高性能应用,因为它没有有效地利用硬件的并行性。
可运行任务的调度:在支持占先式线程的同步多线程体系结构中,将可运行线程调度并时分(time-slice)到可用CPU上是操作系统的责任。这样的调度支持在反应式体系结构中不可用,因为在应用中只有一个线程。因此,系统的开发者必须小心地在所有连接到Web服务器的客户之间将线程分时。这只能通过执行短持续时间、非阻塞的操作来完成。
作为这些缺点的结果,当硬件并行可用时,反应式事件分派不是最为高效的模型。由于需要避免使用阻塞I/O,该模式还有着相对较高的编程复杂度。
当OS平台支持异步操作时,一种高效而方便的实现高性能Web服务器的方法是使用前摄式事件分派。使用前摄式事件分派模型设计的Web服务器通过一或多个线程控制来处理异步操作的完成。这样,通过集成完成事件多路分离(completion event demultiplexing)和事件处理器分派,前摄器模式简化了异步的Web服务器。
异步的Web服务器将这样来利用前摄器模式:首先让Web服务器向OS发出异步操作,并将回调方法登记到Completion Dispatcher(完成分派器),后者将在操作完成时通知Web服务器。于是OS代表Web服务器执行操作,并随即在一个周知的地方将结果排队。Completion Dispatcher负责使完成通知出队,并执行适当的、含有应用特有的Web服务器代码的回调。
图8-5 客户连接到基于前摄器的Web服务器
图8-6 客户发送请求给基于前摄器的Web服务器
图8-5和图8-6显示使用前摄式事件分派设计的Web服务器怎样在一或多个线程中并发地处理多个客户。图8-5显示当客户连接到Web服务器时所采取的一系列步骤。
- Web服务器指示Acceptor发起异步接受;
- 接受器通过OS发起异步接受,将其自身作为Completion Handler和Completion Dispatcher的引用传递;并将用于在异步接受完成时通知Acceptor;
- Web服务器调用Completion Dispatcher的事件循环;
- 客户连接到Web服务器;
- 当异步接受操作完成时,操作系统通知Completion Dispatcher;
- Completion Dispatcher通知接受器;
- Acceptor创建HTTP Handler;
- HTTP Handler发起异步操作,以读取来自客户的请求数据,并将其自身作为Completion Handler和Completion Dispatcher的引用传递;并将用于在异步读取完成时通知HTTP Handler。
图8-6 显示前摄式Web服务器为服务HTTP GET请求所采取的步骤。这些步骤解释如下:
- 客户发送HTTP GET请求;
- 读取操作完成,操作系统通知Completion Dispatcher;
- Completion Dispatcher通知HTTP Handler(步骤2和3将重复直到整个请求被接收);
- HTTP Handler解析请求;
- HTTP Handler同步地读取所请求的文件;
- HTTP Handler发起异步操作,以把文件数据写到客户连接,并将其自身作为Completion Handler和Completion Dispatcher的引用传递;并将用于在异步写入完成时通知HTTP Handler。
- 当写操作完成时,操作系统通知Completion Dispatcher;
- 随后Completion Dispatcher通知Completion Handler(步骤6-8将重复直到文件被完全递送)。
8.8中有一个将前摄式事件分派模型应用于Web服务器的C++代码例子。
使用前摄器模式的主要优点是可以启动多个并发操作,并可并行运行,而不要求应用必须拥有多个线程。操作被应用异步地启动,它们在OS的I/O子系统中运行直到完成。发起操作的线程现在可以服务另外的请求了。
例如,在上面的例子中,Completion Dispatcher可以是单线程的。当HTTP请求到达时,单个Completion Dispatcher线程解析请求,读取文件,并发送响应给客户。因为响应是被异步发送的,多个响应就有可能同时被发送。而且,同步的文件读取可以被异步的文件读取取代,以进一步增加并发的潜力。如果文件读取是被异步完成的,HTTP Handler所执行的唯一的同步操作就只剩下了HTTP协议请求解析。
前摄式模型的主要缺点是编程逻辑至少和反应式模型一样复杂。而且,前摄器模式可能会难以调试,因为异步操作常常有着不可预测和不可重复的执行序列,这就使分析和调试复杂化了。8.7描述怎样应用其他模式(比如异步完成令牌[8])来简化异步应用编程模型。
当具有以下一项或多项条件时使用前摄器模式:
- 应用需要执行一个或多个不阻塞调用线程的异步操作;
- 当异步操作完成时应用必须被通知;
- 应用需要独立于它的I/O模型改变它的并发策略;
- 通过使依赖于应用的逻辑与应用无关的底层构造去耦合,应用将从中获益;
- 当使用多线程方法或反应式分派方法时,应用的执行将很低效,或是不能满足性能需求。
在图8-7中使用OMT表示法演示了前摄器模式的结构。
前摄器模式中的关键参与者包括:
前摄发起器(Proactive Initiator。Web服务器应用的主线程):
- Proactive Initiator是应用中任何发起Asynchronous Operation(异步操作)的实体。它将Completion Handler和Completion Dispatcher登记到Asynchronous Operation Processor(异步操作处理器),此处理器在操作完成时通知前摄发起器。
完成处理器(Completion Handler。Acceptor和HTTP Handler):
- 前摄器模式将应用所实现的Completion Handler接口用于Asynchronous Operation完成通知。
异步操作(Asynchronous Operation。Async_Read、Async_Write和Async_Accept方法):
- Asynchronous Operation被用于代表应用执行请求(比如I/O和定时器操作)。当应用调用Asynchronous Operation时,操作的执行没有借用应用的线程控制。因此,从应用的角度来看,操作是被异步地执行的。当Asynchronous Operation完成时,Asynchronous Operation Processor将应用通知委托给Completion Dispatcher。
异步操作处理器(Asynchronous Operation Processor。操作系统):
- Asynchronous Operation是由Asynchronous Operation Processor来运行直至完成的。该组件通常由OS实现。
完成分派器(Completion Dispatcher。Notification Queue):
- Completion Dispatcher负责在Asynchronous Operation完成时回调应用的Completion Handler。当Asynchronous Operation Processor完成异步发起的操作时,Completion Dispatcher代表应用执行应用回调。
图8-7 前摄器模式中的参与者
有若干良好定义的步骤被用于所有Asynchronous Operation。在高水平的抽象上,应用异步地发起操作,并在操作完成时被通知。图8-8显示在模式参与者之间必定发生的下列交互:
-
前摄发起器发起操作:为执行异步操作,应用在Asynchronous Operation Processor上发起操作。例如,Web服务器可能要求OS在网络上使用特定的socket连接传输文件。要请求这样的操作,Web服务器必须指定要使用哪一个文件和网络连接。而且,Web服务器必须指定(1)当操作完成时通知哪一个Completion Handler,以及(2)一旦文件被传输,哪一个Completion Dispatcher应该执行回调。
-
异步操作处理器执行操作:当应用在Asynchronous Operation Processor上调用操作时,它相对于其他应用操作异步地运行这些操作。现代操作系统(比如Solaris和Windows NT)在内核中提供异步的I/O子系统。
-
异步操作处理器通知完成分派器:当操作完成时,Asynchronous Operation Processor取得在操作被发起时指定的Completion Handler和Completion Dispatcher。随后Asynchronous Operation Processor将Asynchronous Operation的结果和Completion Handler传递给Completion Dispatcher,以用于回调。例如,如果文件已被异步传输,Asynchronous Operation Processor可以报告完成状态(比如成功或失败),以及写入网络连接的字节数。
-
完成分派器通知应用:Completion Dispatcher在Completion Handler上调用完成挂钩,将由应用指定的任何完成数据传递给它。例如,如果异步读取完成,通常一个指向新到达数据的指针将会被传递给Completion Handler。
图8-8 前摄器模式的交互图
这一部分详述使用前摄器模式的效果。
前摄器模式提供以下好处:
增强事务分离:前摄器模式使应用无关的异步机制与应用特有的功能去耦合。应用无关的机制成为可复用组件,知道怎样多路分离与Asynchronous Operation相关联的完成事件,并分派适当的由Completion Handler定义的回调方法。同样地,应用特有的功能知道怎样执行特定类型的服务(比如HTTP处理)。
改善应用逻辑可移植性:通过允许接口独立于执行事件多路分离的底层OS调用而复用,它改善了应用的可移植性。这些系统调用检测并报告可能同时发生在多个事件源之上的事件。事件源可以是I/O端口、定时器、同步对象、信号,等等。在实时POSIX平台上,异步I/O函数由aio API族[9]提供。在Windows NT中,I/O完成端口和重叠式(overlapped)I/O被用于实现异步I/O[10]。
完成分派器封装了并发机制:使Completion Dispatcher与Asynchronous Operation Processor去耦合的一个好处是应用可以通过多种并发策略来配置Completion Dispatcher,而不会影响其他参与者。如8.7所讨论的,Completion Dispatcher可被配置使用包括单线程和线程池方案在内的若干并发策略。
线程策略被与并发策略去耦合:因为Asynchronous Operation Processor代表Proactive Initiator完成可能长时间运行的操作,应用不会被迫派生线程来增加并发。这使得应用可以独立于它的线程策略改变它的并发策略。例如,Web服务器可能只想每个CPU有一个线程,但又想同时服务更多数目的客户。
提高性能:多线程操作系统执行上下文切换,以在多个线程控制中轮换。虽然执行一次上下文切换的时间保持相当的恒定,如果OS上下文要切换到空闲线程的话,在大量线程间轮换的总时间可以显著地降低应用性能。例如,线程可以轮询OS以查看完成状态,而这是低效率的。通过只激活那些有事件要处理的合理的线程控制,前摄器模式能够避免上下文切换的代价。例如,如果没有待处理的GET请求,Web服务器不需要启用HTTP Handler。
应用同步的简化:只要Completion Handler不派生另外的线程控制,可以不考虑、或只考虑少许同步问题而编写应用逻辑。Completion Handler可被编写为就好像它们存在于一个传统的单线程环境中一样。例如,Web服务器的HTTP GET处理器可以通过Async Read操作(比如Windows NT TransmitFile函数[1])来访问磁盘。
前摄器模式有以下缺点:
难以调试:以前摄器模式编写的应用可能难以调试,因为反向的控制流在构架基础结构和应用特有的处理器上的回调方法之间来回振荡。这增加了在调试器中对构架的运行时行为的“单步跟踪”的困难度,因为应用开发者可能不了解或不能获得构架的代码。这与试图调试使用LEX和YACC编写的编译器的词法分析器和解析器时所遇到的问题是类似的。在这些应用中,当线程控制是在用户定义的动作例程中时,调试是相当直接的。但是一旦线程控制返回到所生成的有限确定自动机(Deterministic Finite Automate,DFA)骨架时,就很难跟住程序逻辑了。
调度和控制未完成操作:Proactive Initiator可能没有对Asynchronous Operation的执行顺序的控制。因此,Asynchronous Operation Processor必须被小心设计,以支持Asynchronous Operation的优先级和取消处理。
前摄器模式可以通过许多方式实现。这一部分讨论实现前摄器模式所涉及的步骤。
实现前摄器模式的第一步是构建Asynchronous Operation Processor。该组件负责代表应用异步地执行操作。因此,它的两项主要责任是输出Asynchronous Operation API和实现Asynchronous Operation Engine以完成工作。
Asynchronous Operation Processor必须提供API、允许应用请求Asynchronous Operation。在设计这些API时有若干压力需要考虑:
可移植性:此API不应约束应用或它的Proactive Initiator使用特定的平台。
灵活性:常常,异步API可以为许多类型的操作共享。例如,异步I/O操作常常被用于在多种介质(比如网络和文件)上执行I/O。设计支持这样的复用的API可能是有益的。
回调:当操作被调用时,Proactive Initiator必须登记回调。实现回调的一种常用方法是让调用对象(客户)输出接口、让调用者知道(服务器)。因此,Proactive Initiator必须通知Asynchronous Operation Processor,当操作完成时,哪一个Completion Handler应被回调。
完成分派器:因为应用可以使用多个Completion Dispatcher,Proactive Initiator还必须指示由哪一个Completion Dispatcher来执行回调。
给定所有这些问题,考虑下面的用于异步读写的API。Asynch_Stream类是用于发起异步读写的工厂。一旦构造,可以使用此类来启动多个异步读写。当异步读取完成时,Asynch_Stream::Read_Result将通过Completion_Handler上的handler_read回调方法被回传给handler。类似地,当异步写入完成时,Asynch_Stream::Write_Result将通过Completion_Handler上的handler_write回调方法被回传给handler。
class Asynch_Stream
// = TITLE
// A Factory for initiating reads
// and writes asynchronously.
{
// Initializes the factory with information
// which will be used with each asynchronous
// call. <handler> is notified when the
// operation completes. The asynchronous
// operations are performed on the <handle>
// and the results of the operations are
// sent to the <Completion_Dispatcher>.
Asynch_Stream (Completion_Handler &handler,
HANDLE handle,
Completion_Dispatcher *);
// This starts off an asynchronous read.
// Upto <bytes_to_read> will be read and
// stored in the <message_block>.
int read (Message_Block &message_block,
u_long bytes_to_read,
const void *act = 0);
// This starts off an asynchronous write.
// Upto <bytes_to_write> will be written
// from the <message_block>.
int write (Message_Block &message_block,
u_long bytes_to_write,
const void *act = 0);
...
};
Asynchronous Operation Processor必须含有异步执行操作的机制。换句话说,当应用线程调用Asynchronous Operation时,必须不借用应用的线程控制而执行此操作。幸好,现代操作系统提供了用于Asynchronous Operation的机制(例如,POSIX 异步I/O和WinNT重叠式I/O)。在这样的情况下,实现模式的这一部分只需要简单地将平台API映射到上面描述的Asynchronous Operation API。
如果OS平台不提供对Asynchronous Operation的支持,有若干实现技术可用于构建Asynchronous Operation Engine。或许最为直观的解决方案是使用专用线程来为应用执行Asynchronous Operation。要实现线程化的Asynchronous Operation Engine,有三个主要步骤:
-
操作调用:因为操作将在与进行调用的应用线程不同的线程控制中执行,必定会发生某种类型的线程同步。一种方法是为每个操作派生一个线程。更为常用的方法是为Asynchronous Operation Processor而管理一个专用线程池。该方法可能需要应用线程在继续进行其他应用计算之前将操作请求排队。
-
操作执行:既然操作将在专用线程中执行,所以它可以执行“阻塞”操作,而不会直接阻碍应用的进展。例如,在提供异步I/O读取机制时,专用线程可以在从socket或文件句柄中读时阻塞。
-
操作完成:当操作完成时,应用必须被通知到。特别是,专用线程必须将应用特有的通知委托给Completion Dispatcher。这要求在线程间进行另外的同步。
当Completion Dispatcher从Asynchronous Operation Processor接收到操作完成通知时,它会回调与应用对象相关联的Completion Handler。实现Completion Dispatcher涉及两个问题:(1)实现回调以及(2)定义用于执行回调的并发策略。
Completion Dispatcher必须实现一种机制,Completion Handler通过它被调用。这要求Proactive Initiator在发起操作时指定一个回调。下面是常用的回调可选方案:
回调类:Completion Handler输出接口、让Completion Dispatcher知道。当操作完成时,Completion Dispatcher回调此接口中的方法,并将已完成操作的有关信息传递给它(比如从网络连接中读取的字节数)。
函数指针:Completion Dispatcher通过回调函数指针来调用Completion Handler。该方法有效地打破了Completion Dispatcher和Completion Handler之间的知识依赖。这有两个好处:
- Completion Handler不会被迫输出特定的接口;以及
- 在Completion Dispatcher和Completion Handler之间不需要有编译时依赖。
会合点:Proactive Initiator可以设立事件对象或条件变量,用作Completion Dispatcher和Completion Handler之间的会合点。这在Completion Handler是Proactive Initiator时最为常见。在Asynchronous Operation运行至完成的同时,Completion Handler处理其他的活动。Completion Handler将在会合点周期性地检查完成状态。
当操作完成时,Asynchronous Operation Processor将会通知Completion Dispatcher。在这时,Completion Dispatcher可以利用下面的并发策略中的一种来执行应用回调:
动态线程分派:Completion Dispatcher可为每个Completion Handler动态分配一个线程。动态线程分派可通过大多数多线程操作系统来实现。在有些平台上,由于创建和销毁线程资源的开销,这可能是所列出的Completion Dispatcher实现技术中最为低效的一种,
后反应式分派(Post-reactive dispatching
):Completion Dispatcher可以发信号给Proactive Initiation所设立的事件对象或条件变量。尽管轮询和派生阻塞在事件对象上的子线程都是可选的方案,最为高效的后反应式分派方法是将事件登记到Reactor。后反应式分派可以通过POSIX实时环境中的aio_suspend和Win32环境中的WaitForMultipleObjects来实现。
Call-through
分派:来自Asynchronous Operation Processor的线程控制可被Completion Dispatcher借用,以执行Completion Handler。这种“周期偷取”策略可以通过减少空闲线程的影响范围来提高性能。在一些老操作系统会将上下文切换到空闲线程、又只是从它们切换出去的情况下,这种方法有着收回“失去的”时间的巨大潜力。
Call-through分派在Windows NT中可以使用ReadFileEx和WriteFileEx Win32函数来实现。例如,线程控制可以使用这些调用来等待信号量被置位。当它等待时,线程通知OS它进入了一种称为“可报警等待状态”(alterable wait state)的特殊状态。在这时,OS可以占有对等待中的线程控制的栈和相关资源的控制,以执行Completion Handler。
线程池分派:由Completion Dispatcher拥有的线程池可被用于Completion Handler的执行。在池中的每个线程控制已被动态地分配到可用的CPU。线程池分派可通过Windows NT的I/O完成端口来实现。
在考虑上面描述的Completion Dispatcher技术的适用性时,考虑表8-1中所示的OS环境和物理硬件的可能组合 :
线程模型 | 系统类型 |
单处理器 | 多处理器 |
单线程 | A | B |
多线程 | C | D |
表8-1 Completion Dispatcher并发策略
如果你的OS只支持同步I/O,那就参见反应堆模式[5]。但是,大多数现代操作系统都支持某种类型的异步I/O。
在表8-1的A和B组合中,假定你不等待任何信号量或互斥体,后反应方式的异步I/O很可能是最好的。否则,Call-through实现或许更能回应你的问题。在C组合中,使用Call-through方法。在D组合中,使用线程池方法。在实践中,系统化的经验测量对于选择最为合适的可选方案来说是必需的。
Completion Handler的实现带来以下考虑。
Completion Handler可能需要维护关于特定请求的状态信息。例如,OS可以通知Web服务器,只有一部分文件已被写到网络通信端口。作为结果,Completion Handler可能需要重新发出请求,直到文件被完全写出,或连接变得无效。因此,它必须知道原先指定的文件,还剩多少字节要写,以及在前一个请求开始时文件指针的位置。
没有隐含的限制来阻止Proactive Initiator将多个Asynchronous Operation请求分配给单个Completion Handler。因此,Completion Handler必须在完成通知链中一一“系上”请求特有的状态信息。为完成此工作,Completion Handler可以利用异步完成令牌(Asynchronous Completion Token)模式[8]。
与在任何多线程环境中一样,使用前摄器模式的Completion Handler还是要由它自己来确保对共享资源的访问是线程安全的。但是,Completion Handler不能跨越多个完成通知持有共享资源。否则,就有发生“用餐哲学家问题”的危险[11]。
该问题在于一个合理的线程控制永久等待一个信号量被置位时所产生的死锁。通过设想一个由一群哲学家出席的宴会可以演示这一问题。用餐者围绕一个圆桌就座,在每个哲学家之间只有一支筷子。当哲学家觉得饥饿时,他必须获取在他左边和在他右边的筷子才能用餐。一旦哲学家获得一支筷子,不到吃饱他们就不会放下它。如果所有哲学家都拿起在他们右边的筷子,就会发生死锁,因为他们将永远也不可能拿到左边的筷子。
8.7.3.3 占先式策略(Preemptive Policy)
Completion Dispatcher类型决定在执行时一个Completion Handler是否可占先。当与动态线程和线程池分派器相连时,Completion Handler自然可占先。但是,当与后反应式Completion Dispatcher相连时,Completion Handler并没有对其他Completion Handler的占先权。当由Call-through分派器驱动时,Completion Handler相对于在可报警等待状态的线程控制也没有占先权。
一般而言,处理器不应该执行持续时间长的同步操作,除非使用了多个完成线程,因为应用的总体响应性将会被显著地降低。这样的危险可以通过增强的编程训练来降低。例如,所有Completion Handler被要求用作Proactive Initiator,而不是去执行同步操作。
这一部分显示怎样使用前摄器模式来开发Web服务器。该例子基于ACE构架[4]中的前摄器实现。
当客户连接到Web服务器时,HTTP_Handler的open方法被调用。于是服务器就通过在Asynchronous Operation完成时回调的对象(在此例中是this指针)、用于传输数据的网络连接,以及一旦操作完成时使用的Completion Dispatcher(proactor_)来初始化异步I/O对象。随后读操作异步地启动,而服务器返回事件循环。
当Async read操作完成时,分派器回调HTTP_Handler::handle_read_stream。如果有足够的数据,客户请求就被解析。如果整个客户请求还未完全到达,另一个读操作就会被异步地发起。
在对GET请求的响应中,服务器对所请求文件进行内存映射,并将文件数据异步地写往客户。当写操作完成时,分派器回调HTTP_Handler::handle_write_stream,从而释放动态分配的资源。
附录中含有两个其他的代码实例,使用同步的线程模型和同步的(非阻塞)反应式模型实现Web服务器。
class HTTP_Handler
: public Proactor::Event_Handler
// = TITLE
// Implements the HTTP protocol
// (asynchronous version).
//
// = PATTERN PARTICIPANTS
// Proactive Initiator = HTTP_Handler
// Asynch Op = Network I/O
// Asynch Op Processor = OS
// Completion Dispatcher = Proactor
// Completion Handler = HTPP_Handler
{
public:
void open (Socket_Stream *client)
{
// Initialize state for request
request_.state_ = INCOMPLETE;
// Store reference to client.
client_ = client;
// Initialize asynch read stream
stream_.open (*this, client_->handle (), proactor_);
// Start read asynchronously.
stream_.read (request_.buffer (),
request_.buffer_size ());
}
// This is called by the Proactor
// when the asynch read completes
void handle_read_stream(u_long bytes_transferred)
{
if (request_.enough_data(bytes_transferred))
parse_request ();
else
// Start reading asynchronously.
stream_.read (request_.buffer (),
request_.buffer_size ());
}
void parse_request (void)
{
// Switch on the HTTP command type.
switch (request_.command ())
{
// Client is requesting a file.
case HTTP_Request::GET:
// Memory map the requested file.
file_.map (request_.filename ());
// Start writing asynchronously.
stream_.write (file_.buffer (), file_.buffer_size ());
break;
// Client is storing a file
// at the server.
case HTTP_Request::PUT:
// ...
}
}
void handle_write_stream(u_long bytes_transferred)
{
if (file_.enough_data(bytes_transferred))
// Success....
else
// Start another asynchronous write
stream_.write (file_.buffer (), file_.buffer_size ());
}
private:
// Set at initialization.
Proactor *proactor_;
// Memory-mapped file_;
Mem_Map file_;
// Socket endpoint.
Socket_Stream *client_;
// HTTP Request holder
HTTP_Request request_;
// Used for Asynch I/O
Asynch_Stream stream_;
};
下面是一些被广泛记载的前摄器的使用:
Windows NT中的I/O完成端口:Windows NT操作系统实现了前摄器模式。Windows NT支持多种Asynchronous Operation,比如接受新网络连接、读写文件和socket,以及通过网络连接传输文件。操作系统就是Asynchronous Operation Processor。操作结果在I/O完成端口(它扮演Completion Dispatcher的角色)上排队。
异步I/O操作的UNIX AIO族:在有些实时POSIX平台上,前摄器模式是由aio API族[9]来实现的。这些OS特性非常类似于上面描述的Windows NT的特性。一个区别是UNIX信号可用于实现真正异步的Completion Dispatcher(Windows NT API不是真正异步的)。
Windows NT中的异步过程调用(Asynchronous Procedure Call):有些系统(比如Windows NT)支持异步过程调用(APC)。APC是在特定线程的上下文中异步执行的函数。当APC被排队到线程时,系统发出软件中断。下一次线程被调度时,它将运行该APC。操作系统所发出的APC被称为内核模式APC。应用所发出的APC被称为用户模式APC。
图8-9演示与前摄器相关的模式。
图8-9 前摄器模式的相关模式
异步完成令牌(ACT)模式[8]通常与前摄器模式结合使用。当Asynchronous Operation完成时,应用可能需要比简单的通知更多的信息来适当地处理事件。异步完成令牌模式允许应用将状态高效地与Asynchronous Operation的完成相关联。
前摄器模式还与观察者(Observer)模式[12](在其中,当单个主题变动时,相关对象也会自动更新)有关。在前摄器模式中,当来自多个来源的事件发生时,处理器被自动地通知。一般而言,前摄器模式被用于异步地将多个输入源多路分离给与它们相关联的事件处理器,而观察者通常仅与单个事件源相关联。
前摄器模式可被认为是同步反应堆模式[5]的一种异步的变体。反应堆模式负责多个事件处理器的多路分离和分派;它们在可以同步地发起操作而不会阻塞时被触发。相反,前摄器模式也支持多个事件处理器的多路分离和分派,但它们是被异步事件的完成触发的。
主动对象(Active Object)模式[13]使方法执行与方法调用去耦合。前摄器模式也是类似的,因为Asynchronous Operation Processor代表应用的Proactive Initiator来执行操作。就是说,两种模式都可用于实现Asynchronous Operation。前摄器模式常常用于替代主动对象模式,以使系统并发策略与线程模型去耦合。
前摄器可被实现为单体(Singleton)[12]。这对于在异步应用中,将事件多路分离和完成分派集中到单一的地方来说是有用的。
责任链(Chain of Responsibility,COR)模式[12]使事件处理器与事件源去耦合。在Proactive Initiator与Completion Handler的隔离上,前摄器模式也是类似的。但是,在COR中,事件源预先不知道哪一个处理器将被执行(如果有的话)。在前摄器中,Proactive Initiator完全知道目标处理器。但是,通过建立一个Completion Handler(它是由外部工厂动态配置的责任链的入口),这两种模式可被结合在一起:。
前摄器模式包含了一种强大的设计范式,支持高性能并发应用的高效而灵活的事件分派策略。前摄器模式提供并发执行操作的性能助益,而又不强迫开发者使用同步多线程或反应式编程。
[1] J. Hu, I. Pyarali, and D. C. Schmidt, “Measuring the Impact of Event Dispatching and Concurrency Models on Web Server Performance Over High-speed Networks,” in Proceedings of the 2nd Global Internet Conference, IEEE, November 1997.
[2] J. Hu, I. Pyarali, and D. C. Schmidt, “Applying the Proactor Pattern to High-Performance Web Servers,” in Proceedings of the 10th International Conference on Parallel and Distributed Computing and Systems, IASTED, Oct. 1998.
[3] J. C. Mogul, “The Case for Persistent-connection HTTP,” in Proceedings of ACMSIGCOMM ’95 Conference in Computer Communication Review, (Boston, MA, USA), pp. 299–314, ACM Press, August 1995.
[4] D. C. Schmidt, “ACE: an Object-Oriented Framework for Developing Distributed Applications,” in Proceedings of the 6th USENIX C++ Technical Conference, (Cambridge, Massachusetts), USENIX Association, April 1994.
[5] D. C. Schmidt, “Reactor: An Object Behavioral Pattern for Concurrent Event Demultiplexing and Event Handler Dispatching,” in Pattern Languages of Program Design (J. O. Coplien and D. C. Schmidt, eds.), pp. 529–545, Reading, MA: Addison-Wesley, 1995.
[6] D. C. Schmidt, “Acceptor and Connector: Design Patterns for Initializing Communication Services,” in Pattern Languages of Program Design (R. Martin, F. Buschmann, and D. Riehle, eds.), Reading, MA: Addison-Wesley, 1997.
[7] M. K. McKusick, K. Bostic, M. J. Karels, and J. S. Quarterman, The Design and Implementation of the 4.4BSD Operating System. Addison Wesley, 1996.
[8] I. Pyarali, T. H. Harrison, and D. C. Schmidt, “Asynchronous Completion Token: an Object Behavioral Pattern for Efficient Asynchronous Event Handling,” in Pattern Languages of Program Design (R. Martin, F. Buschmann, and D. Riehle, eds.), Reading, MA: Addison-Wesley, 1997.
[9] “Information Technology – Portable Operating System Interface (POSIX) – Part 1: System Application: Program Interface (API) [C Language],” 1995.
[10] Microsoft Developers Studio, Version 4.2 - Software Development Kit, 1996.
[11] E. W. Dijkstra, “Hierarchical Ordering of Sequential Processes,” Acta Informatica, vol. 1, no. 2, pp. 115–138, 1971.
[12] E. Gamma, R. Helm, R. Johnson, and J. Vlissides, Design Patterns: Elements of Reusable Object-Oriented Software. Reading, MA: Addison-Wesley, 1995.
[13] R. G. Lavender and D. C. Schmidt, “Active Object: an Object Behavioral Pattern for Concurrent Programming,” in Proceedings of the 2nd Annual Conference on the Pattern Languages of Programs, (Monticello, Illinois), pp. 1–7, September 1995.
本附录概述用于开发前摄器模式的可选实现的代码。下面,我们检查使用多线程的同步I/O和使用单线程的反应式I/O。
下面的代码显示怎样使用线程池同步I/O来开发Web服务器。当客户连接到服务器时,池中的一个线程接受连接,并调用HTTP_Handler中的open方法。随后服务器同步地从网络连接读取请求。当读操作完成时,客户请求随之被解析。在对GET请求的响应中,服务器对所请求文件进行内存映射,并将文件数据同步地写往客户。注意阻塞I/O是怎样使Web服务器能够遵循2.2.1中所概述的步骤的。
class HTTP_Handler
// = TITLE
// Implements the HTTP protocol
// (synchronous threaded version).
//
// = DESCRIPTION
// This class is called by a
// thread in the Thread Pool.
{
public:
void open (Socket_Stream *client)
{
HTTP_Request request;
// Store reference to client.
client_ = client;
// Synchronously read the HTTP request
// from the network connection and
// parse it.
client_->recv (request);
parse_request (request);
}
void parse_request (HTTP_Request &request)
{
// Switch on the HTTP command type.
switch (request.command ())
{
// Client is requesting a file.
case HTTP_Request::GET:
// Memory map the requested file.
Mem_Map input_file;
input_file.map (request.filename());
// Synchronously send the file
// to the client. Block until the
// file is transferred.
client_->send (input_file.data (),
input_file.size ());
break;
// Client is storing a file at
// the server.
case HTTP_Request::PUT:
// ...
}
}
private:
// Socket endpoint.
Socket_Stream *client_;
// ...
};
下面的代码显示怎样将反应堆模式用于开发Web服务器。当客户连接到服务器时,HTTP_Handler::open方法被调用。服务器登记I/O句柄和在网络句柄“读就绪“时回调的对象(在此例中是this指针)。然后服务器返回事件循环。
当请求数据到达服务器时,reactor_回调HTTP_Handler::handle_input方法。客户数据以非阻塞方式被读取。如果有足够的数据,客户请求就被解析。如果整个客户请求还没有到达,应用就返回反应堆事件循环。
在对GET请求的响应中,服务器对所请求的文件进行内存映射;并在反应堆上登记,以在网络连接变为“写就绪”时被通知。当向连接写入数据不会阻塞调用线程时,reactor_就回调HTTP_Handler::handler_output方法。当所有数据都已发送给客户时,网络连接被关闭。
class HTTP_Handler :
public Reactor::Event_Handler
// = TITLE
// Implements the HTTP protocol
// (synchronous reactive version).
//
// = DESCRIPTION
// The Event_Handler base class
// defines the hooks for
// handle_input()/handle_output().
//
// = PATTERN PARTICIPANTS
// Reactor = Reactor
// Event Handler = HTTP_Handler
{
public:
void open (Socket_Stream *client)
{
// Initialize state for request
request_.state_ = INCOMPLETE;
// Store reference to client.
client_ = client;
// Register with the reactor for reading.
reactor_->register_handler
(client_->handle (),
this,
Reactor::READ_MASK);
}
// This is called by the Reactor when
// we can read from the client handle.
void handle_input (void)
{
int result = 0;
// Non-blocking read from the network
// connection.
do
result = request_.recv (client_->handle ());
while (result != SOCKET_ERROR && request_.state_ == INCOMPLETE);
// No more progress possible,
// blocking will occur
if (request_.state_ == INCOMPLETE && errno == EWOULDBLOCK)
reactor_->register_handler
(client_->handle (),
this,
Reactor::READ_MASK);
else
// We now have the entire request
parse_request ();
}
void parse_request (void)
{
// Switch on the HTTP command type.
switch (request_.command ())
{
// Client is requesting a file.
case HTTP_Request::GET:
// Memory map the requested file.
file_.map (request_.filename ());
// Transfer the file using Reactive I/O.
handle_output ();
break;
// Client is storing a file at
// the server.
case HTTP_Request::PUT:
// ...
}
}
void handle_output (void)
{
// Asynchronously send the file
// to the client.
if (client_->send (file_.data (),
file_.size ())
== SOCKET_ERROR
&& errno == EWOULDBLOCK)
// Register with reactor...
else
// Close down and releas
handle_close ();
}
private:
// Set at initialization.
Reactor *reactor_;
// Memory-mapped file_;
Mem_Map file_;
// Socket endpoint.
Socket_Stream *client_;
// HTTP Request holder.
HTTP_Request request_;
};
本文转载至ACE开发者网站 作者:Irfan Pyarali Tim Harrison
posted @
2007-02-27 21:17 walkspeed 阅读(3261) |
评论 (0) |
编辑 收藏
ACE_INET_Addr类,包装了网络地址
ACE_SOCK_Connector类,扮演主动连接角色,发起通讯连接。连接到远端的服务。
ACE_SOCK_Acceptor类,扮演被动连接角色,等待连接。等待远端客户的请求。
ACE_SOCK_Stream类,扮演数据通讯角色,发送和接收数据。完成客户与服务之间的通讯。
利用ACE库来开发网络通讯程序是很简单的,一个基本程序只用到以上提到的几个类,就
可以完成一个基于客户端、服务器端模型的网络应用的开发。开发者无需了解Socket在不同平
台上的实现,记忆众多并相互关联的Socket APIs。
一下以一个Hello World程序为演示。
客户端程序。发送一个Hello World到远端的服务器,并接收服务器返回的信息,将信息
打印在屏幕上。
#include <iostream>
#include <string>
#include <ace/ACE.h>
#include <ace/INET_Addr.h>
#include <ace/SOCK_Connector.h>
#include <ace/SOCK_Stream.h>
int main( int argc, char* argv[] )
{
ACE::init();//初始化ACE库,在windows下一定要
std::string str = "hello world";
//设置服务器地址
//第一个参数是端口,第二个是ip地址,也可以是域名。
//可以先定义一个地址对象,再用ACE_INET_Addr的set函数来设定。
//地址的配置很多,具体的参照文档
ACE_INET_Addr peer_addr( 5050, "127.0.0.1" );
ACE_SOCK_Stream peer_stream;//定义一个通讯队形
ACE_SOCK_Connector peer_connector;//定义一个主动连接对象
peer_connector.connect( peer_stream, peer_addr );//发起一个连接
peer_stream.send( str.c_str(), str.length() );//发送数据到服务器
str.erase();
str.resize( sizeof( "hello world" ) );
peer_stream.recv( (void*)str.c_str(), str.length() );//接收来自服务器的信息
std::cout << "from server message : " << str << std::endl;
ACE::fini();
return 0;
}
服务器端代码。接收一个远端的连接,将接收到的信息打印在屏幕上,并将接收到的信
息返回给客户端。
#include <iostream>
#include <string>
#include <ace/ACE.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/SOCK_Stream.h>
int main( int argc, char* argv[] )
{
ACE::init();
std::string str;
str.resize( sizeof( "hello world" ) );
//设置服务器地址
ACE_INET_Addr peer_addr( 5050, "127.0.0.1" );
ACE_SOCK_Stream peer_stream;
//创建被动连接角色对象
ACE_SOCK_Acceptor peer_acceptor;
//开启被动连接对象,将对象绑定到一个地址上
peer_acceptor.open( peer_addr );
//等待连接
peer_acceptor.accept( peer_stream );
//数据通讯
peer_stream.recv( (void*)str.c_str(), str.length() );
std::cout << "from client message : " << str << std::endl;
peer_stream.send( str.c_str(), str.length() );
ACE::fini();
return 0;
}
××××以上代码需要ACE库才能运转××××
利用ACE编程的基本框架。
客户端
1 创建地址对象。(ACE_INET_Addr)
2 创建主动连接对象。(ACE_SOCK_Connector)
3 创建数据通讯对象。(ACE_SOCK_Stream)
4 设置服务器地址。(ACE_INET_Addr::set)
5 将数据通讯对象和地址作为参数传给主动连接对象,发起主动连接(ACE_SOCK_Connector::connect)
6 利用通讯对象接收和发送数据。(ACE_SOCK_Stream::recv和ACE_SOCK_Stream::send)
服务器端
1 创建地址对象。(ACE_INET_Addr)
2 创建被动连接对象。(ACE_SOCK_Connector)
3 创建数据通讯对象。(ACE_SOCK_Stream)
4 设置服务器地址。(ACE_INET_Addr::set)
5 将地址作为参数传给被动连接对象,启动接收(ACE_SOCK_Acceptor::open)
6 将数据通讯对象传给被动连接对象,启动接收,接受连接(ACE_SOCK_Connector::accept)
7 利用通讯对象接收和发送数据。(ACE_SOCK_Stream::recv和ACE_SOCK_Stream::send)
posted @
2007-02-25 19:30 walkspeed 阅读(12162) |
评论 (10) |
编辑 收藏
ACE_IPC_SAP类是IPC类族的基类,封装了句柄,提供了访问句柄的基本接口,基本结构
如下
class ACE_IPC_SAP
{
public:
int enable (int value) const;
int disable (int value) const;
ACE_HANDLE get_handle (void) const;
void set_handle (ACE_HANDLE handle);
protected:
ACE_IPC_SAP (void);
~ACE_IPC_SAP (void);
private:
ACE_HANDLE handle_;
};
ACE_SOCK类是使用Socket的基类,所有使用Socket通讯的类都从这个类派生。本类的功
能包括
1 创建和销毁Socket句柄
2 获取本地和远端的网络地址
3 设置和读取Socket选项。
基本结构如下
class ACE_SOCK : public ACE_IPC_SAP
{
public:
//设置Socket的属性,包装了setsockopt系统函数
int set_option (int level,
int option,
void *optval,
int optlen) const;
//获取Socket的属性,包装了getsockopt系统函数
int get_option (int level,
int option,
void *optval,
int *optlen) const;
//获得本地地址
int get_local_addr (ACE_Addr &) const;
//获取远端地址
int get_remote_addr (ACE_Addr &) const;
//关闭Socket
int close (void);
//打开一个Socket,没有Qos
int open (int type,
int protocol_family,
int protocol,
int reuse_addr);
//打开一个Socket,有Qos
int open (int type,
int protocol_family,
int protocol,
ACE_Protocol_Info *protocolinfo,
ACE_SOCK_GROUP g,
u_long flags,
int reuse_addr);
protected:
ACE_SOCK (int type,
int protocol_family,
int protocol = 0,
int reuse_addr = 0);
ACE_SOCK (int type,
int protocol_family,
int protocol,
ACE_Protocol_Info *protocolinfo,
ACE_SOCK_GROUP g,
u_long flags,
int reuse_addr);
ACE_SOCK (void);
~ACE_SOCK (void);
};
ACE_SOCK_IO类,包装了Socket数据通讯的基本方法。本类提供的功能
1 支持数据的发送和接收
2 支持分散读操作
3 支持集中写操作
4 支持阻塞,非阻塞,定时 I/O操作
基本结构如下
class ACE_SOCK_IO : public ACE_SOCK
{
public:
ACE_SOCK_IO (void);
~ACE_SOCK_IO (void);
//接收数据
ssize_t recv (void *buf,
size_t n,
const ACE_Time_Value *timeout = 0) const;
//分散读操作
ssize_t recv (iovec iov[],
size_t n,
const ACE_Time_Value *timeout = 0) const;
//发送数据
ssize_t send (const void *buf,
size_t n,
const ACE_Time_Value *timeout = 0) const;
//集中写操作
ssize_t send (const iovec iov[],
size_t n,
const ACE_Time_Value *timeout = 0) const;
};
ACE_SOCK_Stream类,继承ACE_SOCK_IO类。在ACE_SOCK_IO类提供的功能上,添加了发送
和接收刚好n个字节的能力。基本结构如下
class ACE_Export ACE_SOCK_Stream : public ACE_SOCK_IO
{
public:
ACE_SOCK_Stream (void);
ACE_SOCK_Stream (ACE_HANDLE h);
~ACE_SOCK_Stream (void);
//刚好读取n个字节的数据
ssize_t recv_n (void *buf,
size_t len,
const ACE_Time_Value *timeout = 0,
size_t *bytes_transferred = 0) const;
//分散读刚好n个字节操作
ssize_t recvv_n (iovec iov[],
int iovcnt,
const ACE_Time_Value *timeout = 0,
size_t *bytes_transferred = 0) const;
//刚好发送n个字节的数据
ssize_t send_n (const void *buf,
size_t len,
const ACE_Time_Value *timeout = 0,
size_t *bytes_transferred = 0) const;
//集中写刚好n个字节操作
ssize_t sendv_n (const iovec iov[],
int iovcnt,
const ACE_Time_Value *timeout = 0,
size_t *bytes_transferred = 0) const;
int close_reader (void);
int close_writer (void);
int close (void);
typedef ACE_INET_Addr PEER_ADDR;
};
ACE_SOCK_Acceptor类是一个工厂类,用来被动产生一个新的通讯端点。提供如下能力
1 接收对等端的连接
2 连接可以通过阻塞、非阻塞或定时方式接受。
基本结构如下
class ACE_Export ACE_SOCK_Acceptor : public ACE_SOCK
{
public:
ACE_SOCK_Acceptor (void);
ACE_SOCK_Acceptor (const ACE_Addr &local_sap,
int reuse_addr = 0,
int protocol_family = PF_UNSPEC,
int backlog = ACE_DEFAULT_BACKLOG,
int protocol = 0);
~ACE_SOCK_Acceptor (void);
//打开一个监听
int open (const ACE_Addr &local_sap,
int reuse_addr = 0,
int protocol_family = PF_UNSPEC,
int backlog = ACE_DEFAULT_BACKLOG,
int protocol = 0);
int close (void);
//接受一个对等端的连接,产生一个通讯
int accept (ACE_SOCK_Stream &new_stream,
ACE_Addr *remote_addr = 0,
ACE_Time_Value *timeout = 0,
int restart = 1,
int reset_new_handle = 0) const;
};
ACE_SOCK_Connector类是一个工厂类,用来主动建立一个新的通讯端。提供的功能如下
1 发起一个到对等接受者的连接,并在连接后产生一个通讯对象
2 连接可以通过阻塞、非阻塞或定时方式发起
基本结构如下
class ACE_SOCK_Connector
{
public:
ACE_SOCK_Connector (void);
ACE_SOCK_Connector (ACE_SOCK_Stream &new_stream,
const ACE_Addr &remote_sap,
const ACE_Time_Value *timeout = 0,
const ACE_Addr &local_sap = ACE_Addr::sap_any,
int reuse_addr = 0,
int flags = 0,
int perms = 0,
int protocol = 0);
//发起一个连接
int connect (ACE_SOCK_Stream &new_stream,
const ACE_Addr &remote_sap,
const ACE_Time_Value *timeout = 0,
const ACE_Addr &local_sap = ACE_Addr::sap_any,
int reuse_addr = 0,
int flags = 0,
int perms = 0,
int protocol = 0);
~ACE_SOCK_Connector (void);
int complete (ACE_SOCK_Stream &new_stream,
ACE_Addr *remote_sap = 0,
const ACE_Time_Value *timeout = 0);
};
以上的类结构是简化的,以突出重点功能。要完全了解每个类,看源代码。
posted @
2007-02-24 14:15 walkspeed 阅读(3855) |
评论 (0) |
编辑 收藏
多路分配的入口函数handle_events,在框架的实现接口类中定义
多路分配的主体实现函数event_handling,在ACE_WFMO_Reactor中定义,伪
实现代码如下
int ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
int alertable)
{
int result = 0;
do
{
//等待在句柄集上发生的事件
//wait_for_multiple_events的具体实现是使用
//WaitForMultipleObjectsEx函数
DWORD wait_status = this->wait_for_multiple_events (timeout,
alertable);
//分发事件
result = this->safe_dispatch (wait_status);
}while (result == 0);
return result;
}
分发的主体函数是dispatch_handles,在ACE_WFMO_Reactor中定义,伪实现
代码如下
int ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
{
DWORD dispatch_slot = 0;
//活动的句柄总数
DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();
//查找要分发的句柄的索引
for (int number_of_handlers_dispatched = 1;;++number_of_handlers_dispatched)
{
//计算有事件发生,要分发的句柄索引
dispatch_slot += wait_status - WAIT_OBJECT_0;
//分发给相应的事件处理对象
if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
return -1;
++dispatch_slot;
if (dispatch_slot >= max_handlep1)
return number_of_handlers_dispatched;//分发了几个事件
//检查剩下的句柄中有没有有事件发生的
wait_status = this->poll_remaining_handles (dispatch_slot);
switch (wait_status)
{
case WAIT_FAILED: // Failure.
ACE_OS::set_errno_to_last_error ();
/* FALLTHRU */
case WAIT_TIMEOUT:
// There are no more handles ready, we can return.
return number_of_handlers_dispatched;//分发了几个事件
}
}
}
找到具体事件处理对象主体函数complex_dispatch_hander,在ACE_WFMO_Reactor
中定义,为代码如下
int ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
ACE_HANDLE event_handle)
{
//找到当前的分发的信息
ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
this->handler_rep_.current_info ()[slot];
WSANETWORKEVENTS events;
ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
event_handle,
&events) == SOCKET_ERROR)
problems = ACE_Event_Handler::ALL_EVENTS_MASK;
else
{
//发生的事件于要检测的事件是否相同,相同就分发
events.lNetworkEvents &= current_info.network_events_;
while (events.lNetworkEvents != 0)
{
ACE_Event_Handler *event_handler = current_info.event_handler_;
//调用事件处理对象,进行事件处理
problems |= this->upcall (current_info.event_handler_,
current_info.io_handle_,
events);
if (this->handler_rep_.scheduled_for_deletion (slot))
break;
}
}
return 0;
}
posted @
2007-02-22 11:46 walkspeed 阅读(1374) |
评论 (0) |
编辑 收藏
1
int ACE_WFMO_Reactor::handle_events (ACE_Time_Value &how_long)
{
return this->event_handling (&how_long, FALSE);
}
2
// Waits for and dispatches all events. Returns -1 on error, 0 if
// max_wait_time expired, or the number of events that were dispatched.
int ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
int alertable)
{
ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
// Make sure we are not closed
if (!this->open_for_business_ || this->deactivated_)
return -1;
// Stash the current time -- the destructor of this object will
// automatically compute how much time elapsed since this method was
// called.
ACE_Countdown_Time countdown (max_wait_time);
int result;
do
{
// Check to see if it is ok to enter ::WaitForMultipleObjects
// This will acquire <this->lock_> on success On failure, the
// lock will not be acquired
result = this->ok_to_wait (max_wait_time, alertable);
if (result != 1)
return result;
// Increment the number of active threads
++this->active_threads_;
// Release the <lock_>
this->lock_.release ();
// Update the countdown to reflect time waiting to play with the
// mut and event.
countdown.update ();
// Calculate timeout
int timeout = this->calculate_timeout (max_wait_time);
// Wait for event to happen
DWORD wait_status = this->wait_for_multiple_events (timeout,
alertable);
// Upcall
result = this->safe_dispatch (wait_status);
if (0 == result)
{
// wait_for_multiple_events timed out without dispatching
// anything. Because of rounding and conversion errors and
// such, it could be that the wait loop timed out, but
// the timer queue said it wasn't quite ready to expire a
// timer. In this case, max_wait_time won't have quite been
// reduced to 0, and we need to go around again. If max_wait_time
// is all the way to 0, just return, as the entire time the
// caller wanted to wait has been used up.
countdown.update (); // Reflect time waiting for events
if (0 == max_wait_time || max_wait_time->usec () == 0)
break;
}
}while (result == 0);
return result;
}
3
int ACE_WFMO_Reactor::safe_dispatch (DWORD wait_status)
{
int result = -1;
ACE_SEH_TRY
{
result = this->dispatch (wait_status);
}
ACE_SEH_FINALLY
{
this->update_state ();
}
return result;
}
4
int ACE_WFMO_Reactor::dispatch (DWORD wait_status)
{
int handlers_dispatched = 0;
// Expire timers
handlers_dispatched += this->expire_timers ();
switch (wait_status)
{
case WAIT_FAILED: // Failure.
ACE_OS::set_errno_to_last_error ();
return -1;
case WAIT_TIMEOUT: // Timeout.
errno = ETIME;
return handlers_dispatched;
default: // Dispatch.
// We'll let dispatch worry about abandoned mutes.
handlers_dispatched += this->dispatch_handles (wait_status);
return handlers_dispatched;
}
}
5
// Dispatches any active handles from <handles_[slot]> to
// <handles_[max_handlep1_]>, polling through our handle set looking
// for active handles.
int ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
{
// dispatch_slot is the absolute slot. Only += is used to
// increment it.
DWORD dispatch_slot = 0;
// Cache this value, this is the absolute value.
DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();
// nCount starts off at <max_handlep1>, this is a transient count of
// handles last waited on.
DWORD nCount = max_handlep1;
for (int number_of_handlers_dispatched = 1;;++number_of_handlers_dispatched)
{
const bool ok = (wait_status >= WAIT_OBJECT_0 && wait_status <= (WAIT_OBJECT_0 + nCount));
if (ok)
dispatch_slot += wait_status - WAIT_OBJECT_0;
else
// Otherwise, a handle was abandoned.
dispatch_slot += wait_status - WAIT_ABANDONED_0;
// Dispatch handler
if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
return -1;
// Increment slot
++dispatch_slot;
// We're done.
if (dispatch_slot >= max_handlep1)
return number_of_handlers_dispatched;
// Readjust nCount
nCount = max_handlep1 - dispatch_slot;
// Check the remaining handles
wait_status = this->poll_remaining_handles (dispatch_slot);
switch (wait_status)
{
case WAIT_FAILED: // Failure.
ACE_OS::set_errno_to_last_error ();
/* FALLTHRU */
case WAIT_TIMEOUT:
// There are no more handles ready, we can return.
return number_of_handlers_dispatched;
}
}
}
6
int ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
DWORD max_handlep1)
{
// Check if there are window messages that need to be dispatched
if (slot == max_handlep1)
return this->dispatch_window_messages ();
// Dispatch the handler if it has not been scheduled for deletion.
// Note that this is a very week test if there are multiple threads
// dispatching this slot as no locks are held here. Generally, you
// do not want to do something like deleting the this pointer in
// handle_close() if you have registered multiple times and there is
// more than one thread in WFMO_Reactor->handle_events().
else if (!this->handler_rep_.scheduled_for_deletion (slot))
{
ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
if (this->handler_rep_.current_info ()[slot].io_entry_)
return this->complex_dispatch_handler (slot,
event_handle);
else
return this->simple_dispatch_handler (slot,
event_handle);
}
else
// The handle was scheduled for deletion, so we will skip it.
return 0;
}
7
int ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
ACE_HANDLE event_handle)
{
// This dispatch is used for I/O entires.
ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
this->handler_rep_.current_info ()[slot];
WSANETWORKEVENTS events;
ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
event_handle,
&events) == SOCKET_ERROR)
problems = ACE_Event_Handler::ALL_EVENTS_MASK;
else
{
// Prepare for upcalls. Clear the bits from <events> representing
// events the handler is not interested in. If there are any left,
// do the upcall(s). upcall will replace events.lNetworkEvents
// with bits representing any functions that requested a repeat
// callback before checking handles again. In this case, continue
// to call back unless the handler is unregistered as a result of
// one of the upcalls. The way this is written, the upcalls will
// keep being done even if one or more upcalls reported problems.
// In practice this may turn out not so good, but let's see. If any
// problems, please notify Steve Huston <shuston@riverace.com>
// before or after you change this code.
events.lNetworkEvents &= current_info.network_events_;
while (events.lNetworkEvents != 0)
{
ACE_Event_Handler *event_handler = current_info.event_handler_;
int reference_counting_required =
event_handler->reference_counting_policy ().value () ==
ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
// Call add_reference() if needed.
if (reference_counting_required)
{
event_handler->add_reference ();
}
// Upcall
problems |= this->upcall (current_info.event_handler_,
current_info.io_handle_,
events);
// Call remove_reference() if needed.
if (reference_counting_required)
{
event_handler->remove_reference ();
}
if (this->handler_rep_.scheduled_for_deletion (slot))
break;
}
}
if (problems != ACE_Event_Handler::NULL_MASK
&& !this->handler_rep_.scheduled_for_deletion (slot) )
this->handler_rep_.unbind (event_handle, problems);
return 0;
}
8
ACE_Reactor_Mask ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
ACE_HANDLE io_handle,
WSANETWORKEVENTS &events)
{
// This method figures out what exactly has happened to the socket
// and then calls appropriate methods.
ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
// Go through the events and do the indicated upcalls. If the handler
// doesn't want to be called back, clear the bit for that event.
// At the end, set the bits back to <events> to request a repeat call.
long actual_events = events.lNetworkEvents;
int action;
if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
{
action = event_handler->handle_output (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_WRITE);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
}
}
if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
{
if (events.iErrorCode[FD_CONNECT_BIT] == 0)
{
// Successful connect
action = event_handler->handle_output (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_CONNECT);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::CONNECT_MASK);
}
}
// Unsuccessful connect
else
{
action = event_handler->handle_input (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_CONNECT);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::CONNECT_MASK);
}
}
}
if (ACE_BIT_ENABLED (actual_events, FD_OOB))
{
action = event_handler->handle_exception (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_OOB);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
}
}
if (ACE_BIT_ENABLED (actual_events, FD_READ))
{
action = event_handler->handle_input (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_READ);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
}
}
if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
&& ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
{
action = event_handler->handle_input (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_CLOSE);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
}
}
if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
{
action = event_handler->handle_input (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_ACCEPT);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
}
}
if (ACE_BIT_ENABLED (actual_events, FD_QOS))
{
action = event_handler->handle_qos (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_QOS);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
}
}
if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
{
action = event_handler->handle_group_qos (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
}
}
events.lNetworkEvents = actual_events;
return problems;
}
posted @
2007-02-22 10:54 walkspeed 阅读(1574) |
评论 (0) |
编辑 收藏
ACE这个开源项目的代码的量是很大的。对于系统编程、网络编程是非常的有用,但是对其介绍的书和资料是非常的少。要想了解和很好的使用,就必须要自己去看源代码了。(好在还有三本书,库中的事例还是比较的多)
ACE库中的代码大体可以分为三大部分
1 OS Adaptation(操作系统适配)部分。这部分主要是屏蔽各操作系统的API的不同,将系统调用接口统一到C++函数的接口,以实现平台的可移植。
2 C++ Wrapper Facade(C++外包)部分。这部分主要是将相互关联的操作和数据结构封装到C++类中,提供统一的接口。提供强类型的检测,降低不必要的认为错误。利用C++多态、继承等能力,形成一个架构,使本地的、远端的操作统一在一个相同接口和使用策略下。
3 Framework(框架)部分。为一组相关的应用提供可复用的架构。开发者在确定了应用后,选择可用的架构开发应用程序,不用考虑平台和底层机制。快速的开发。
第一部分面对大量的底层的细节,而且目标是平台的移植,如果对平台的移植感兴趣的兄弟姐妹可以看看。一般情况下没什么看的必要。
第二部分和第三部分对我们的应用和学习如何利用C++来设计程序非常的有价值,要多看。细心揣摩,特别是配上设计的问题(宏观的和微观的)。
posted @
2007-02-20 12:28 walkspeed 阅读(6954) |
评论 (3) |
编辑 收藏
chown 设置文件或目录的所有权用户。
chown 用户名.组名 文件或目录名。
例 1 chown user.group directory 2 chown user.group file
chmod 改变文件或目录的用户权限。
一个文件或目录的用户权限由三个部分组成 所有者权限 所有者所在的组的权限 其他用户的权限。每个权限又由三种权力组成,读权力 写权力 执行权力
例 1 chmod 711 file 所有者有读、写、执行权力
所有者所在组用户有执行权力
其他用户有执行权力
posted @
2007-02-04 23:35 walkspeed 阅读(573) |
评论 (0) |
编辑 收藏
想学的时候就开始学习。听过设计模式,心动了,那就开始学吧。是的,开始时不能明白那里面写些什么,不要紧,先把它们背下来。里面的实例代码用手写一边,不要只是看一边。就看那本叫设计模式的书,这不是说那些以它为根本的注解的书不好,只是个人认为注解带有注解人的理解和思考,看了反而添加了更多的不理解的概念。将写过的代码拿出来,用模式的眼光去重新审视,有了想法就去编码实现,并不但的改进。
posted @
2007-01-30 21:28 walkspeed 阅读(557) |
评论 (1) |
编辑 收藏