一动不如一静

C++博客 首页 新随笔 联系 聚合 管理
  20 Posts :: 0 Stories :: 10 Comments :: 0 Trackbacks
前段时间用了ACE_TP_Reactor做了一个东西,但是对这块东西不是很有底,所以借着假期仔细的看了一下这一块的东西,又上网查了一下相关的资料。

在Addison-Wesley - C++NetworkProgrammingVol2的4.3 The ACE_TP_Reactor Class中有这样几句话,让我足足想了一天。

1.Multiple threads running an ACE_TP_Reactor event loop can process events concurrently on different handles
2.They can also dispatch timeout and I/O callback methods concurrently on the same event handler
3.The only serialization in the ACE_TP_Reactor occurs when I/O events occur concurrently on the same handle
4.After a thread obtains a set of active handles from select(), the other reactor threads dispatch from that handle set instead of calling select() again

其实上面的3句话主要表达这样的几层含义
1.并行处理不同handle上的事件
2.并行处理同一event handler上的超时回调函数和I/O回调函数
3.多个线程串行处理同一个handle上的I/O事件。
4.虽然TP_Reactor是用leader/follow模式轮流调用select,但是如果一个select()获得了多个激活的handle,那么其他的线程会分发这些handle ,而不是去再次调用select.
 (这点还没有想通,也没有看见是如何实现的?)

“多个线程串行处理同一个handle上的I/O事件”
       这个是如何达到的呢?ACE源码中,当处理I/O事件的时候,会将HANDLE挂起,使得不再对该HANDLE做事件侦听。来达到同一个handle上的I/O事件是被多个线程串行地处理。

"并行处理同一event handler上的超时回掉函数和I/O回调函数"
       这样好像就比较麻烦了。因为这就意味着TP_Reactor只保证同一个handle下不会有多线程同时调用I/O事件,但是却有可能同时调用超时回调函数和I/O回调函数。如果在这两个函数中有对数据的访问和操作,这就意味着需要有锁的引入。例外,如果在定时器处理中,超过定时的事件间隔,就会有令一个线程再次调用定时器的处理函数,一下子引入了很多同步的问题。如何解决这个问题呢?

方法一:

            更改ACE的源码,象处理socket事件一样,在处理定时事件的时候,也把HANDLE挂起。来自http://cpunion.cnblogs.com/archive/2005/08/09/210941.html

int
ACE_TP_Reactor::handle_timer_events (
int & /*event_count*/,
                                     ACE_TP_Token_Guard 
&guard)
{
  
// Get the current time
  ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
                           
this->timer_queue_->timer_skew ());

  
// Look for a node in the timer queue whose timer <= the present
  
// time.
  ACE_Timer_Node_Dispatch_Info info;

  
if (this->timer_queue_->dispatch_info (cur_time,
                                         info))
    {
        
// ******** fixed by lijie ***********
        if (info.type_->get_handle () != ACE_INVALID_HANDLE)
        {
            
if (this->is_suspended_i (info.type_->get_handle ()))
                
return 0;

            
this->suspend_handler (info.type_->get_handle ());
        }
        
// ******** end **********************

      
const void *upcall_act = 0;

      
// Preinvoke.
      this->timer_queue_->preinvoke (info,
                                     cur_time,
                                     upcall_act);

      

      
// Release the token before dispatching notifies
      guard.release_token ();

      
// call the functor
      this->timer_queue_->upcall (info,
                                  cur_time);

      
// Postinvoke
      this->timer_queue_->postinvoke (info,
                                      cur_time,
                                      upcall_act);

      
// We have dispatched a timer
      return 1;
    }

  
return 0;
}

handle_timer处理完以后,返回以前,加上这句话
this->reactor ()->resume_handler (this->get_handle ());

当然别忘了为Handler编写get_handle()函数:
ACE_HANDLE Test_Handler::get_handle () const
{
    
return this->peer ().get_handle ();
}


方法二:
             利用ACE_PipeACE_Message_Queue把所有的事件都排队到同一个I/O HANDLE上去,再由ACE_TP_Reactor通过多个线程顺序串行地触发我们旧的event_handler来处理这些已经排好队的事件/消息。我比较赞成用这样方法。该方法来自:http://blog.csdn.net/zhdaniel/archive/2006/06/29/850888.aspx
   

      


方法三:
             ^_^干脆就不要对同一个event  handler注册I/O事件和其他事件。





posted on 2006-10-07 11:29 一动不如一静 阅读(3087) 评论(1)  编辑 收藏 引用 所属分类: ACE

Feedback

# re: 关于ACE_TP_Reactor 2012-11-08 09:30 aa
ACE_TP_Reactor ,register_handler(this, READ_MASK|WRITE_MASK)
只响应handle_output,handle_input不响应,请问是什么原因?  回复  更多评论
  


只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理