前段时间用了ACE_TP_Reactor做了一个东西,但是对这块东西不是很有底,所以借着假期仔细的看了一下这一块的东西,又上网查了一下相关的资料。
在Addison-Wesley - C++NetworkProgrammingVol2的4.3 The ACE_TP_Reactor Class中有这样几句话,让我足足想了一天。
1.Multiple threads running an
ACE_TP_Reactor event loop can process
events concurrently on different handles
2.They can also dispatch timeout and I/O callback methods concurrently on the same
event handler
3.The only serialization in the
ACE_TP_Reactor occurs when I/O events
occur concurrently on the
same handle
4.After a thread obtains a set of active handles from
select(), the other
reactor threads dispatch from that handle set instead of calling
select() again
其实上面的3句话主要表达这样的几层含义
1.并行处理不同handle上的事件
2.并行处理同一event handler上的超时回调函数和I/O回调函数
3.多个线程串行处理同一个handle上的I/O事件。
4.虽然TP_Reactor是用leader/follow模式轮流调用select,但是如果一个select()获得了多个激活的handle,那么其他的线程会分发这些handle ,而不是去再次调用select.
(这点还没有想通,也没有看见是如何实现的?)
“多个线程串行处理同一个handle上的I/O事件” 这个是如何达到的呢?ACE源码中,当处理I/O事件的时候,会将
HANDLE挂起,使得不再对该
HANDLE做事件侦听。来达到同一个
handle上的I/O事件是被多个线程串行地处理。"并行处理同一event handler上的超时回掉函数和I/O回调函数" 这样好像就比较麻烦了。因为这就意味着TP_Reactor只保证同一个handle下不会有多线程同时调用I/O事件,但是却有可能同时调用超时回调函数和I/O回调函数。如果在这两个函数中有对数据的访问和操作,这就意味着需要有锁的引入。例外,如果在定时器处理中,超过定时的事件间隔,就会有令一个线程再次调用定时器的处理函数,一下子引入了很多同步的问题。如何解决这个问题呢?
方法一:
更改ACE的源码,象处理socket事件一样,在处理定时事件的时候,也把HANDLE挂起。来自http://cpunion.cnblogs.com/archive/2005/08/09/210941.html
int
ACE_TP_Reactor::handle_timer_events (int & /*event_count*/,
ACE_TP_Token_Guard &guard)
{
// Get the current time
ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
this->timer_queue_->timer_skew ());
// Look for a node in the timer queue whose timer <= the present
// time.
ACE_Timer_Node_Dispatch_Info info;
if (this->timer_queue_->dispatch_info (cur_time,
info))
{
// ******** fixed by lijie ***********
if (info.type_->get_handle () != ACE_INVALID_HANDLE)
{
if (this->is_suspended_i (info.type_->get_handle ()))
return 0;
this->suspend_handler (info.type_->get_handle ());
}
// ******** end **********************
const void *upcall_act = 0;
// Preinvoke.
this->timer_queue_->preinvoke (info,
cur_time,
upcall_act);
// Release the token before dispatching notifies
guard.release_token ();
// call the functor
this->timer_queue_->upcall (info,
cur_time);
// Postinvoke
this->timer_queue_->postinvoke (info,
cur_time,
upcall_act);
// We have dispatched a timer
return 1;
}
return 0;
} handle_timer处理完以后,返回以前,加上这句话
this->reactor ()->resume_handler (this->get_handle ());
当然别忘了为Handler编写get_handle()函数:
ACE_HANDLE Test_Handler::get_handle () const
{
return this->peer ().get_handle ();
}
方法二:
利用ACE_Pipe和ACE_Message_Queue把所有的事件都排队到同一个I/O HANDLE上去,再由ACE_TP_Reactor通过多个线程顺序串行地触发我们旧的event_handler来处理这些已经排好队的事件/消息。我比较赞成用这样方法。该方法来自:http://blog.csdn.net/zhdaniel/archive/2006/06/29/850888.aspx
方法三:
^_^干脆就不要对同一个event handler注册I/O事件和其他事件。