一动不如一静

C++博客 首页 新随笔 联系 聚合 管理
  20 Posts :: 0 Stories :: 10 Comments :: 0 Trackbacks
ACE_TP_Reactor::ACE_TP_Reactor (size_t max_number_of_handles,
                                
int  restart,
                                ACE_Sig_Handler 
* sh,
                                ACE_Timer_Queue 
* tq,
                                
int  mask_signals,
                                
int  s_queue)
  : ACE_Select_Reactor (max_number_of_handles, restart, sh, tq, 
0 0 , mask_signals, s_queue)
{
  ACE_TRACE (
" ACE_TP_Reactor::ACE_TP_Reactor " );
  
this -> supress_notify_renew ( 1 );
}

template 
< class  ACE_SELECT_REACTOR_TOKEN >
ACE_Select_Reactor_T
< ACE_SELECT_REACTOR_TOKEN > ::ACE_Select_Reactor_T
  (size_t size,
   
int  rs,
   ACE_Sig_Handler 
* sh,
   ACE_Timer_Queue 
* tq,
   
int  disable_notify_pipe,
   ACE_Reactor_Notify 
* notify,
   
int  mask_signals,
   
int  s_queue)
    : ACE_Select_Reactor_Impl (mask_signals)
    , token_ (
* this , s_queue)
    , lock_adapter_ (token_)
    , deactivated_ (
0 )
{
  ACE_TRACE (
" ACE_Select_Reactor_T::ACE_Select_Reactor_T " );

  
if  ( this -> open (size,
                  rs,
                  sh,
                  tq,
                  disable_notify_pipe,
                  notify) 
==   - 1 )
    ACE_ERROR ((LM_ERROR,
                ACE_LIB_TEXT (
" %p\n " ),
                ACE_LIB_TEXT (
" ACE_Select_Reactor_T::open  " )
                ACE_LIB_TEXT (
" failed inside ACE_Select_Reactor_T::CTOR " )));
}

template 
< class  ACE_SELECT_REACTOR_TOKEN >   int
ACE_Select_Reactor_T
< ACE_SELECT_REACTOR_TOKEN > ::open
  (size_t size,
   
int  restart,
   ACE_Sig_Handler 
* sh,
   ACE_Timer_Queue 
* tq,
   
int  disable_notify_pipe,
   ACE_Reactor_Notify 
* notify)
{
  ACE_TRACE (
" ACE_Select_Reactor_T::open " );
  ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, 
this -> token_,  - 1 ));

  
//  Can't initialize ourselves more than once.
   if  ( this -> initialized_)
    
return   - 1 ;

  
this -> owner_  =  ACE_Thread::self ();
  
this -> restart_  =  restart;
  
this -> signal_handler_  =  sh;
  
this -> timer_queue_  =  tq;
  
this -> notify_handler_  =  notify;

  
int  result  =   0 ;

  
//  Allows the signal handler to be overridden.
   if  ( this -> signal_handler_  ==   0 )
    
{
      ACE_NEW_RETURN (
this -> signal_handler_,
                      ACE_Sig_Handler,
                      
- 1 );

      
if  ( this -> signal_handler_  ==   0 )
        result 
=   - 1 ;
      
else
        
this -> delete_signal_handler_  =   1 ;
    }


  
//  Allows the timer queue to be overridden.
   if  (result  !=   - 1   &&   this -> timer_queue_  ==   0 )
    
{
      ACE_NEW_RETURN (
this -> timer_queue_,
                      ACE_Timer_Heap,
                      
- 1 );

      
if  ( this -> timer_queue_  ==   0 )
        result 
=   - 1 ;
      
else
        
this -> delete_timer_queue_  =   1 ;
    }


  
//  Allows the Notify_Handler to be overridden.
   if  (result  !=   - 1   &&   this -> notify_handler_  ==   0 )
    
{
      ACE_NEW_RETURN (
this -> notify_handler_,
                      ACE_Select_Reactor_Notify,
                      
- 1 );

      
if  ( this -> notify_handler_  ==   0 )
        result 
=   - 1 ;
      
else
        
this -> delete_notify_handler_  =   1 ;
    }


  
if  (result  !=   - 1   &&   this -> handler_rep_.open (size)  ==   - 1 )       /* **********handler_rep_.open************** */
    result 
=   - 1 ;
  
else   if  ( this -> notify_handler_ -> open ( this ,                     /* *****notify_handler_->open**** */
                                        
0 ,
                                        disable_notify_pipe) 
==   - 1 )
    result 
=   - 1 ;

  
if  (result  !=   - 1 )
    
//  We're all set to go.
     this -> initialized_  =   1 ;
  
else
    
//  This will close down all the allocated resources properly.
     this -> close ();

  
return  result;
}

ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (
void )
  : max_notify_iterations_ (
- 1 )
{
}

int
ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl 
* r,
                                 ACE_Timer_Queue 
* ,
                                 
int  disable_notify_pipe)
{
  ACE_TRACE (
" ACE_Select_Reactor_Notify::open " );

  
if  (disable_notify_pipe  ==   0 )
    
{
      
this -> select_reactor_  =
        dynamic_cast
< ACE_Select_Reactor_Impl  *>  (r);

      
if  (select_reactor_  ==   0 )
        
{
          errno 
=  EINVAL;
          
return   - 1 ;
        }


      
if  ( this -> notification_pipe_.open ()  ==   - 1 )
        
return   - 1 ;
#if  defined (F_SETFD)
      ACE_OS::fcntl (
this -> notification_pipe_.read_handle (), F_SETFD,  1 );
      ACE_OS::fcntl (
this -> notification_pipe_.write_handle (), F_SETFD,  1 );
#endif  /* F_SETFD */

#if  defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
      ACE_Notification_Buffer 
* temp;

      ACE_NEW_RETURN (temp,
                      ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
                      
- 1 );

      
if  ( this -> alloc_queue_.enqueue_head (temp)  ==   - 1 )
        
{
          delete [] temp;
          
return   - 1 ;
        }


      
for  (size_t i  =   0 ; i  <  ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;  ++ i)
        
if  (free_queue_.enqueue_head (temp  +  i)  ==   - 1 )
          
return   - 1 ;

#endif  /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */

      
//  There seems to be a Win32 bug with this  Set this into
      
//  non-blocking mode.
       if  (ACE::set_flags ( this -> notification_pipe_.read_handle (),                              /* **************设置为非阻赛模式********* */
                          ACE_NONBLOCK) 
==   - 1 )
        
return   - 1 ;
      
else
        
return   this -> select_reactor_ -> register_handler                                         /* ************注册处理器*************** */
          (
this -> notification_pipe_.read_handle (),
           
this ,
           ACE_Event_Handler::READ_MASK);
    }

  
else
    
{
      
this -> select_reactor_  =   0 ;
      
return   0 ;
    }

}

int
ACE_Pipe::open (
int  buffer_size)
{
  ACE_TRACE (
" ACE_Pipe::open " );

#if  defined (ACE_LACKS_SOCKETPAIR) || defined (__Lynx__)
  ACE_INET_Addr my_addr;
  ACE_SOCK_Acceptor acceptor;
  ACE_SOCK_Connector connector;
  ACE_SOCK_Stream reader;
  ACE_SOCK_Stream writer;
  
int  result  =   0 ;
if  defined (ACE_WIN32)
  ACE_INET_Addr local_any  (static_cast
< u_short >  ( 0 ), ACE_LOCALHOST);
else
  ACE_Addr local_any 
=  ACE_Addr::sap_any;
# endif 
/*  ACE_WIN32  */

  
//  Bind listener to any port and then find out what the port was.
   if  (acceptor.open (local_any)  ==   - 1
      
||  acceptor.get_local_addr (my_addr)  ==   - 1 )
    result 
=   - 1 ;
  
else
    
{
      ACE_INET_Addr sv_addr (my_addr.get_port_number (),
                             ACE_LOCALHOST);

      
//  Establish a connection within the same process.
       if  (connector.connect (writer, sv_addr)  ==   - 1 )
        result 
=   - 1 ;
      
else   if  (acceptor.accept (reader)  ==   - 1 )
        
{
          writer.close ();
          result 
=   - 1 ;
        }

    }


  
//  Close down the acceptor endpoint since we don't need it anymore.
  acceptor.close ();
  
if  (result  ==   - 1 )
    
return   - 1 ;

  
this -> handles_[ 0 =  reader.get_handle ();
  
this -> handles_[ 1 =  writer.get_handle ();

if   ! defined (ACE_LACKS_TCP_NODELAY)
  
int  one  =   1 ;

  
//  Make sure that the TCP stack doesn't try to buffer small writes.
  
//  Since this communication is purely local to the host it doesn't
  
//  affect network performance.

  
if  (writer.set_option (ACE_IPPROTO_TCP,
                         TCP_NODELAY,
                         
& one,
                         
sizeof  one)  ==   - 1 )
    
{
      
this -> close ();
      
return   - 1 ;
    }

# endif 
/*  ! ACE_LACKS_TCP_NODELAY  */

if  defined (ACE_LACKS_SOCKET_BUFSIZ)
    ACE_UNUSED_ARG (buffer_size);
else    /*  ! ACE_LACKS_SOCKET_BUFSIZ  */
  
if  (reader.set_option (SOL_SOCKET,
                         SO_RCVBUF,
                         reinterpret_cast 
< void   *>  ( & buffer_size),
                         
sizeof  (buffer_size))  ==   - 1
      
&&  errno  !=  ENOTSUP)
    
{
      
this -> close ();
      
return   - 1 ;
    }

  
else   if  (writer.set_option (SOL_SOCKET,
                              SO_SNDBUF,
                              reinterpret_cast 
< void   *>  ( & buffer_size),
                              
sizeof  (buffer_size))  ==   - 1
           
&&  errno  !=  ENOTSUP)
    
{
      
this -> close ();
      
return   - 1 ;
    }

# endif 
/*  ! ACE_LACKS_SOCKET_BUFSIZ  */

#elif  defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
  ACE_UNUSED_ARG (buffer_size);
  
if  (ACE_OS::pipe ( this -> handles_)  ==   - 1 )
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_LIB_TEXT (
" %p\n " ),
                       ACE_LIB_TEXT (
" pipe " )),
                      
- 1 );

#if  !defined(__QNX__)
  
int  arg  =  RMSGN;

  
//  Enable "msg no discard" mode, which ensures that record
  
//  boundaries are maintained when messages are sent and received.
   if  (ACE_OS::ioctl ( this -> handles_[ 0 ],
                     I_SRDOPT,
                     (
void   * ) arg)  ==   - 1
      
||  ACE_OS::ioctl ( this -> handles_[ 1 ],
                        I_SRDOPT,
                        (
void   * ) arg)  ==   - 1 )
    
{
      
this -> close ();
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_LIB_TEXT (
" %p\n " ),
                         ACE_LIB_TEXT (
" ioctl " )),  - 1 );
    }

#endif  /* __QNX__ */

#else   /* ! ACE_LACKS_SOCKETPAIR && ! ACE_HAS_STREAM_PIPES */
  
if  (ACE_OS::socketpair (AF_UNIX,
                          SOCK_STREAM,
                          
0 ,
                          
this -> handles_)  ==   - 1 )
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_LIB_TEXT (
" %p\n " ),
                       ACE_LIB_TEXT (
" socketpair " )),
                      
- 1 );
if  defined (ACE_LACKS_SOCKET_BUFSIZ)
  ACE_UNUSED_ARG (buffer_size);
else    /*  ! ACE_LACKS_SOCKET_BUFSIZ  */
  
if  (ACE_OS::setsockopt ( this -> handles_[ 0 ],
                          SOL_SOCKET,
                          SO_RCVBUF,
                          reinterpret_cast 
< const   char   *>  ( & buffer_size),
                          
sizeof  (buffer_size))  ==   - 1
      
&&  errno  !=  ENOTSUP)
    
{
      
this -> close ();
      
return   - 1 ;
    }

  
if  (ACE_OS::setsockopt ( this -> handles_[ 1 ],
                          SOL_SOCKET,
                          SO_SNDBUF,
                          reinterpret_cast 
< const   char   *>  ( & buffer_size),
                          
sizeof  (buffer_size))  ==   - 1
      
&&  errno  !=  ENOTSUP)
    
{
      
this -> close ();
      
return   - 1 ;
    }

# endif 
/*  ! ACE_LACKS_SOCKET_BUFSIZ  */
#endif   /* ! ACE_LACKS_SOCKETPAIR && ! ACE_HAS_STREAM_PIPES */
  
//  Point both the read and write HANDLES to the appropriate socket
  
//  HANDLEs.

  
return   0 ;
}

ACE_INLINE 
int
ACE_SOCK::set_option (
int  level,
                      
int  option,
                      
void   * optval,
                      
int  optlen)  const
{
  ACE_TRACE (
" ACE_SOCK::set_option " );
  
return  ACE_OS::setsockopt ( this -> get_handle (), level,
                             option, (
char   * ) optval, optlen);
}

ACE_INLINE 
int
ACE_OS::setsockopt (ACE_HANDLE handle,
                    
int  level,
                    
int  optname,
                    
const   char   * optval,
                    
int  optlen)
{
  ACE_OS_TRACE (
" ACE_OS::setsockopt " );

  
#if  defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0) && defined(SO_REUSEPORT)
  
//  To work around an inconsistency with Microsofts implementation of
  
//  sockets, we will check for SO_REUSEADDR, and ignore it. Winsock
  
//  always behaves as if SO_REUSEADDR=1. Some implementations have
  
//  the same behaviour as Winsock, but use a new name for
  
//  it. SO_REUSEPORT.  If you want the normal behaviour for
  
//  SO_REUSEADDR=0, then NT 4 sp4 and later supports
  
//  SO_EXCLUSIVEADDRUSE. This also requires using an updated Platform
  
//  SDK so it was decided to ignore the option for now. (Especially
  
//  since Windows always sets SO_REUSEADDR=1, which we can mimic by doing
  
//  nothing.)
   if  (level  ==  SOL_SOCKET)  {
    
if  (optname  ==  SO_REUSEADDR)  {
      
return   0 //  Not supported by Winsock
    }

    
if  (optname  ==  SO_REUSEPORT)  {
      optname 
=  SO_REUSEADDR;
    }

  }

  
#endif  /*ACE_HAS_WINSOCK2*/

  
int  result;
  ACE_SOCKCALL (::setsockopt ((ACE_SOCKET) handle,
                              level,
                              optname,
                              (ACE_SOCKOPT_TYPE1) optval,
                              optlen),
                
int ,
                
- 1 ,
                result);
#if  defined (WSAEOPNOTSUPP)
  
if  (result  ==   - 1   &&  errno  ==  WSAEOPNOTSUPP)
#else  
  
if  (result  ==   - 1 )
#endif  /* WSAEOPNOTSUPP */
    errno 
=  ENOTSUP;
  
return  result;
}




注册的部分:

int
ACE_TP_Reactor::register_handler (ACE_Event_Handler 
* eh,
                                  ACE_Reactor_Mask mask)
{
  
return  ACE_Select_Reactor::register_handler (eh,
                                               mask);
}



int
ACE_TP_Reactor::handle_events (ACE_Time_Value 
* max_wait_time)
{
  ACE_TRACE (
" ACE_TP_Reactor::handle_events " );

  
//  Stash the current time -- the destructor of this object will
  
//  automatically compute how much time elapsed since this method was
  
//  called.
  ACE_Countdown_Time countdown (max_wait_time);

  
//
  
//  The order of these events is very subtle, modify with care.
  
//

  
//  Instantiate the token guard which will try grabbing the token for
  
//  this thread.
  ACE_TP_Token_Guard guard ( this -> token_);

  
int  result  =  guard.acquire_read_token (max_wait_time);                                  // 这里有获取token锁

  
//  If the guard is NOT the owner just return the retval
   if  ( ! guard.is_owner ())
    
return  result;

  
//  After getting the lock just just for deactivation..
   if  ( this -> deactivated_)
    
return   - 1 ;

  
//  Update the countdown to reflect time waiting for the token.
  countdown.update ();

  
return   this -> dispatch_i (max_wait_time,
                           guard);
}


int
ACE_TP_Reactor::dispatch_i (ACE_Time_Value 
* max_wait_time,
                            ACE_TP_Token_Guard 
& guard)
{
  
int  event_count  =
    
this -> get_event_for_dispatching (max_wait_time);

  
int  result  =   0 ;

  
//  Note: We are passing the <event_count> around, to have record of
  
//  how many events still need processing. May be this could be
  
//  useful in future.

  
//  Dispatch signals
   if  (event_count  ==   - 1 )
    
{
      
//  Looks like we dont do any upcalls in dispatch signals. If at
      
//  a later point of time, we decide to handle signals we have to
      
//  release the lock before we make any upcalls.. What is here
      
//  now is not the right thing
      
//
      
//  @@ We need to do better..
       return   this -> handle_signals (event_count,
                                   guard);
    }


  
//  If there are no signals and if we had received a proper
  
//  event_count then first look at dispatching timeouts. We need to
  
//  handle timers early since they may have higher latency
  
//  constraints than I/O handlers.  Ideally, the order of dispatching
  
//  should be a strategy

  
//  NOTE: The event count does not have the number of timers that
  
//  needs dispatching. But we are still passing this along. We dont
  
//  need to do that. In the future we *may* have the timers also
  
//  returned through the <event_count>. Just passing that along for
  
//  that day.
  result  =   this -> handle_timer_events (event_count,
                                      guard);

  
if  (result  >   0 )
    
return  result;

  
//  Else just go ahead fall through for further handling.

  
if  (event_count  >   0 )
    
{
      
//  Next dispatch the notification handlers (if there are any to
      
//  dispatch).  These are required to handle multiple-threads
      
//  that are trying to update the <Reactor>.
      result  =   this -> handle_notify_events (event_count,
                                           guard);

      
if  (result  >   0 )
        
return  result;

      
//  Else just fall through for further handling
    }


  
if  (event_count  >   0 )
    
{
      
//  Handle socket events
       return   this -> handle_socket_events (event_count,
                                         guard);
    }


  
return   0 ;
}

int
ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value 
* max_wait_time)
{
  
//  If the reactor handler state has changed, clear any remembered
  
//  ready bits and re-scan from the master wait_set.
   if  ( this -> state_changed_)
    
{
      
this -> ready_set_.rd_mask_.reset ();
      
this -> ready_set_.wr_mask_.reset ();
      
this -> ready_set_.ex_mask_.reset ();

      
this -> state_changed_  =   false ;
    }

  
else
    
{
      
//  This is a hack somewhere, under certain conditions (which
      
//  I don't understand) the mask will have all of its bits clear,
      
//  yet have a size_ > 0. This is an attempt to remedy the affect,
      
//  without knowing why it happens.

      
this -> ready_set_.rd_mask_.sync ( this -> ready_set_.rd_mask_.max_set ());
      
this -> ready_set_.wr_mask_.sync ( this -> ready_set_.wr_mask_.max_set ());
      
this -> ready_set_.ex_mask_.sync ( this -> ready_set_.ex_mask_.max_set ());
    }


  
return   this -> wait_for_multiple_events ( this -> ready_set_,
                                         max_wait_time);
}


template 
< class  ACE_SELECT_REACTOR_TOKEN >   int
ACE_Select_Reactor_T
< ACE_SELECT_REACTOR_TOKEN > ::wait_for_multiple_events
  (ACE_Select_Reactor_Handle_Set 
& dispatch_set,
   ACE_Time_Value 
* max_wait_time)
{
  ACE_TRACE (
" ACE_Select_Reactor_T::wait_for_multiple_events " );
  u_long width 
=   0 ;
  ACE_Time_Value timer_buf (
0 );
  ACE_Time_Value 
* this_timeout;

  
int  number_of_active_handles  =   this -> any_ready (dispatch_set);

  
//  If there are any bits enabled in the <ready_set_> then we'll
  
//  handle those first, otherwise we'll block in <select>.

  
if  (number_of_active_handles  ==   0 )
    
{
      
do
        
{
          this_timeout 
=
            
this -> timer_queue_ -> calculate_timeout (max_wait_time,
                                                   
& timer_buf);
          width 
=  (u_long)  this -> handler_rep_.max_handlep1 ();

          dispatch_set.rd_mask_ 
=   this -> wait_set_.rd_mask_;                                         /* ***************wait_set_ ----> ready_set_********* */
          dispatch_set.wr_mask_ 
=   this -> wait_set_.wr_mask_;
          dispatch_set.ex_mask_ 
=   this -> wait_set_.ex_mask_;
          number_of_active_handles 
=  ACE_OS::select ( int  (width),
                                                     dispatch_set.rd_mask_,
                                                     dispatch_set.wr_mask_,
                                                     dispatch_set.ex_mask_,
                                                     this_timeout);
        }

      
while  (number_of_active_handles  ==   - 1   &&   this -> handle_error ()  >   0 );

      
if  (number_of_active_handles  >   0 )
        
{
#if  !defined (ACE_WIN32)
          
//  Resynchronize the fd_sets so their "max" is set properly.
          dispatch_set.rd_mask_.sync ( this -> handler_rep_.max_handlep1 ());
          dispatch_set.wr_mask_.sync (
this -> handler_rep_.max_handlep1 ());
          dispatch_set.ex_mask_.sync (
this -> handler_rep_.max_handlep1 ());
#endif  /* ACE_WIN32 */
        }

      
else   if  (number_of_active_handles  ==   - 1 )
        
{
          
//  Normally, select() will reset the bits in dispatch_set
          
//  so that only those filed descriptors that are ready will
          
//  have bits set.  However, when an error occurs, the bit
          
//  set remains as it was when the select call was first made.
          
//  Thus, we now have a dispatch_set that has every file
          
//  descriptor that was originally waited for, which is not
          
//  correct.  We must clear all the bit sets because we
          
//  have no idea if any of the file descriptors is ready.
          
//
          
//  NOTE: We dont have a test case to reproduce this
          
//  problem. But pleae dont ignore this and remove it off.
          dispatch_set.rd_mask_.reset ();
          dispatch_set.wr_mask_.reset ();
          dispatch_set.ex_mask_.reset ();
        }

    }


  
//  Return the number of events to dispatch.
   return  number_of_active_handles;
}

   
   
int
ACE_TP_Reactor::handle_socket_events (
int   & event_count,
                                      ACE_TP_Token_Guard 
& guard)
{

  
//  We got the lock, lets handle some I/O events.
  ACE_EH_Dispatch_Info dispatch_info;

  
this -> get_socket_event_info (dispatch_info);

  
//  If there is any event handler that is ready to be dispatched, the
  
//  dispatch information is recorded in dispatch_info.
   if  ( ! dispatch_info.dispatch ())
    
{
      
return   0 ;
    }


  
//  Suspend the handler so that other threads don't start dispatching
  
//  it.
  
//
  
//  NOTE: This check was performed in older versions of the
  
//  TP_Reactor. Looks like it is a waste..
   if  (dispatch_info.event_handler_  !=   this -> notify_handler_)
    
this -> suspend_i (dispatch_info.handle_);

  
int  resume_flag  =
    dispatch_info.event_handler_
-> resume_handler ();

  
int  reference_counting_required  =
    dispatch_info.event_handler_
-> reference_counting_policy ().value ()  ==
    ACE_Event_Handler::Reference_Counting_Policy::ENABLED;

  
//  Call add_reference() if needed.
   if  (reference_counting_required)
    
{
      dispatch_info.event_handler_
-> add_reference ();
    }


  
//  Release the lock.  Others threads can start waiting.                                     // 这里放锁
  guard.release_token ();

  
int  result  =   0 ;

  
//  If there was an event handler ready, dispatch it.
  
//  Decrement the event left
   -- event_count;

  
//  Dispatched an event
   if  ( this -> dispatch_socket_event (dispatch_info)  ==   0 )
    
++ result;

  
//  Resume handler if required.
   if  (dispatch_info.event_handler_  !=   this -> notify_handler_  &&
      resume_flag 
==  ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
    
this -> resume_handler (dispatch_info.handle_);

  
//  Call remove_reference() if needed.
   if  (reference_counting_required)
    
{
      dispatch_info.event_handler_
-> remove_reference ();
    }


  
return  result;
}


int
ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info 
& event )
{
  
//  Nothing to dispatch yet
   event .reset ();

  
//  Check for dispatch in write, except, read. Only catch one, but if
  
//  one is caught, be sure to clear the handle from each mask in case
  
//  there is more than one mask set for it. This would cause problems
  
//  if the handler is suspended for dispatching, but its set bit in
  
//  another part of ready_set_ kept it from being dispatched.
   int  found_io  =   0 ;
  ACE_HANDLE handle;

  
//  @@todo: We can do quite a bit of code reduction here. Let me get
  
//  it to work before I do this.
   {
    ACE_Handle_Set_Iterator handle_iter (
this -> ready_set_.wr_mask_);                     /* **********这里是ready_set_********** */ 什么时间被给值的呢?

    
while  ( ! found_io  &&  (handle  =  handle_iter ())  !=  ACE_INVALID_HANDLE)
      
{
        
if  ( this -> is_suspended_i (handle))
          
continue ;

        
//  Remember this info
         event . set  (handle,
                   
this -> handler_rep_.find (handle),
                   ACE_Event_Handler::WRITE_MASK,
                   
& ACE_Event_Handler::handle_output);

        
this -> clear_handle_read_set (handle);
        found_io 
=   1 ;
      }

  }


  
if  ( ! found_io)
    
{
      ACE_Handle_Set_Iterator handle_iter (
this -> ready_set_.ex_mask_);

      
while  ( ! found_io  &&  (handle  =  handle_iter ())  !=  ACE_INVALID_HANDLE)
        
{
          
if  ( this -> is_suspended_i (handle))
            
continue ;

          
//  Remember this info
           event . set  (handle,
                     
this -> handler_rep_.find (handle),
                     ACE_Event_Handler::EXCEPT_MASK,
                     
& ACE_Event_Handler::handle_exception);

          
this -> clear_handle_read_set (handle);

          found_io 
=   1 ;
        }

    }


  
if  ( ! found_io)
    
{
      ACE_Handle_Set_Iterator handle_iter (
this -> ready_set_.rd_mask_);

      
while  ( ! found_io  &&  (handle  =  handle_iter ())  !=  ACE_INVALID_HANDLE)
        
{
          
if  ( this -> is_suspended_i (handle))
            
continue ;

          
//  Remember this info
           event . set  (handle,
                     
this -> handler_rep_.find (handle),
                     ACE_Event_Handler::READ_MASK,
                     
& ACE_Event_Handler::handle_input);

          
this -> clear_handle_read_set (handle);
          found_io 
=   1 ;
        }

    }


  
return  found_io;
}

int
ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info 
& dispatch_info)
{
  ACE_TRACE (
" ACE_TP_Reactor::dispatch_socket_event " );

  ACE_HANDLE handle 
=  dispatch_info.handle_;
  ACE_Event_Handler 
* event_handler  =  dispatch_info.event_handler_;
  ACE_Reactor_Mask mask 
=  dispatch_info.mask_;
  ACE_EH_PTMF callback 
=  dispatch_info.callback_;

  
//  Check for removed handlers.
   if  (event_handler  ==   0 )
    
return   - 1 ;

  
//  Upcall. If the handler returns positive value (requesting a
  
//  reactor callback) don't set the ready-bit because it will be
  
//  ignored if the reactor state has changed. Just call back
  
//  as many times as the handler requests it. Other threads are off
  
//  handling other things.
   int  status  =   1 ;
  
while  (status  >   0 )
    status 
=  (event_handler ->* callback) (handle);

  
//  If negative, remove from Reactor
   if  (status  <   0 )
    
{
      
int  retval  =
        
this -> remove_handler (handle, mask);

      
return  retval;
    }


  
//  assert (status >= 0);
   return   0 ;
}
posted on 2007-02-25 20:41 一动不如一静 阅读(2244) 评论(0)  编辑 收藏 引用 所属分类: ACE

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理