首先从定义一个reactor开始。
ACE_TP_Reactor select_reactor_one(g_unOneMaxHandle, 0, 0, 0, 1);
上面的这句话触发了一下的一些行为。主要就是给event_handlers_分配了内存。这里就决定了你能支持多少条连接
int
ACE_Select_Reactor_Handler_Repository::open (size_t size)
{
ACE_TRACE (
"
ACE_Select_Reactor_Handler_Repository::open
"
);
this
->
max_size_
=
size;
this
->
max_handlep1_
=
0
;
#if
defined (ACE_WIN32)
//
Try to allocate the memory.*
ACE_NEW_RETURN (
this
->
event_handlers_,
ACE_Event_Tuple[size],
-
1
);
//
Initialize the ACE_Event_Handler * to { ACE_INVALID_HANDLE, 0 }.
for
(size_t h
=
0
; h
<
size;
++
h)
{
ACE_SELECT_REACTOR_HANDLE (h)
=
ACE_INVALID_HANDLE;
//
对event_handlers_进行初始化
ACE_SELECT_REACTOR_EVENT_HANDLER (
this
, h)
=
0
;
}
#else
//
Try to allocate the memory.
ACE_NEW_RETURN (
this
->
event_handlers_,
ACE_Event_Handler
*
[size],
-
1
);
//
Initialize the ACE_Event_Handler * to NULL.
for
(size_t h
=
0
; h
<
size;
++
h)
ACE_SELECT_REACTOR_EVENT_HANDLER (
this
, h)
=
0
;
#endif
/* ACE_WIN32 */
//
Try to increase the number of handles if <size> is greater than 检查给定的size是否超出了ACE_MaxHandle
//
the current limit.
return
ACE::set_handle_limit (static_cast
<
int
>
(size),
1
);
}
接下来就是注册监听了。
acceptor.open(addr);
那么这句话又干了些什么呢?
int
ACE_SOCK_Acceptor::open (const ACE_Addr &local_sap,
int reuse_addr,
int protocol_family,
int backlog,
int protocol)
{
ACE_TRACE ("ACE_SOCK_Acceptor::open");
if (local_sap != ACE_Addr::sap_any)
protocol_family = local_sap.get_type ();
else if (protocol_family == PF_UNSPEC)
{
#if defined (ACE_HAS_IPV6)
protocol_family = ACE::ipv6_enabled () ? PF_INET6 : PF_INET;
#else
protocol_family = PF_INET;
#endif /* ACE_HAS_IPV6 */
}
if (ACE_SOCK::open (SOCK_STREAM,
protocol_family,
protocol,
reuse_addr) == -1)
return -1;
else
return this->shared_open (local_sap,
protocol_family,
backlog);
}
int
ACE_SOCK::open (int type,
int protocol_family,
int protocol,
int reuse_addr)
{
ACE_TRACE ("ACE_SOCK::open");
int one = 1;
this->set_handle (ACE_OS::socket (protocol_family,
type,
protocol));
if (this->get_handle () == ACE_INVALID_HANDLE)
return -1;
else if (protocol_family != PF_UNIX
&& reuse_addr
&& this->set_option (SOL_SOCKET,
SO_REUSEADDR,
&one,
sizeof one) == -1)
{
this->close ();
return -1;
}
return 0;
}
ACE_INLINE ACE_HANDLE
ACE_OS::socket (int domain,
int type,
int proto)
{
ACE_OS_TRACE ("ACE_OS::socket");
ACE_SOCKCALL_RETURN (::socket (domain,
type,
proto),
ACE_HANDLE,
ACE_INVALID_HANDLE);
}
int
ACE_SOCK_Acceptor::shared_open (const ACE_Addr &local_sap,
int protocol_family,
int backlog)
{
ACE_TRACE ("ACE_SOCK_Acceptor::shared_open");
int error = 0;
#if defined (ACE_HAS_IPV6)
ACE_ASSERT (protocol_family == PF_INET || protocol_family == PF_INET6);
if (protocol_family == PF_INET6)
{
sockaddr_in6 local_inet6_addr;
ACE_OS::memset (reinterpret_cast<void *> (&local_inet6_addr),
0,
sizeof local_inet6_addr);
if (local_sap == ACE_Addr::sap_any)
{
local_inet6_addr.sin6_family = AF_INET6;
local_inet6_addr.sin6_port = 0;
local_inet6_addr.sin6_addr = in6addr_any;
}
else
local_inet6_addr = *reinterpret_cast<sockaddr_in6 *> (local_sap.get_addr ());
// We probably don't need a bind_port written here.
// There are currently no supported OS's that define
// ACE_LACKS_WILDCARD_BIND.
if (ACE_OS::bind (this->get_handle (),
reinterpret_cast<sockaddr *> (&local_inet6_addr),
sizeof local_inet6_addr) == -1)
error = 1;
}
else
#endif
if (protocol_family == PF_INET)
{
sockaddr_in local_inet_addr; /**//***************************addr**********************/
ACE_OS::memset (reinterpret_cast<void *> (&local_inet_addr),
0,
sizeof local_inet_addr);
if (local_sap == ACE_Addr::sap_any)
{
local_inet_addr.sin_port = 0;
}
else
local_inet_addr = *reinterpret_cast<sockaddr_in *> (local_sap.get_addr ());
if (local_inet_addr.sin_port == 0)
{
if (ACE::bind_port (this->get_handle (),
ACE_NTOHL (ACE_UINT32 (local_inet_addr.sin_addr.s_addr))) == -1)
error = 1;
}
else if (ACE_OS::bind (this->get_handle (), /**//***********************bind**********************/
reinterpret_cast<sockaddr *> (&local_inet_addr),
sizeof local_inet_addr) == -1)
error = 1;
}
else if (ACE_OS::bind (this->get_handle (),
(sockaddr *) local_sap.get_addr (),
local_sap.get_size ()) == -1)
error = 1;
if (error != 0
|| ACE_OS::listen (this->get_handle (), /**//****************listen ***************************/
backlog) == -1)
{
error = 1;
this->close ();
}
return error ? -1 : 0;
} 接下来就是注册了。
return m_Reactor->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
那么这句话又做了什么呢?
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler
(ACE_Event_Handler *handler,
ACE_Reactor_Mask mask)
{
ACE_TRACE ("ACE_Select_Reactor_T::register_handler");
ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1));
return this->register_handler_i (handler->get_handle (), handler, mask); /**//**************所以一定要实现get_handle()方法**************/
}
template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler_i
(ACE_HANDLE handle,
ACE_Event_Handler *event_handler,
ACE_Reactor_Mask mask)
{
ACE_TRACE ("ACE_Select_Reactor_T::register_handler_i");
// Insert the <handle, event_handle> tuple into the Handler
// Repository.
return this->handler_rep_.bind (handle, event_handler, mask);
}
int
ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
ACE_Event_Handler *event_handler,
ACE_Reactor_Mask mask)
{
ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
if (handle == ACE_INVALID_HANDLE)
handle = event_handler->get_handle ();
if (this->invalid_handle (handle))
return -1;
// Is this handle already in the Reactor?
int existing_handle = 0;
#if defined (ACE_WIN32)
ssize_t assigned_slot = -1;
for (ssize_t i = 0; i < this->max_handlep1_; ++i)
{
// If handle is already registered.
if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
{
// Cannot use a different handler for an existing handle.
if (ACE_SELECT_REACTOR_EVENT_HANDLER (this, i) !=
event_handler)
return -1;
// Remember location.
assigned_slot = i;
// Remember that this handle is already registered in the
// Reactor.
existing_handle = 1;
// We can stop looking now.
break;
}
else
// Here's the first free slot, so let's take it.
if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE &&
assigned_slot == -1)
{
assigned_slot = i;
}
}
if (assigned_slot > -1)
// We found a spot.
{
ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle;
ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler;
}
else if (this->max_handlep1_ < this->max_size_) // 第一次添加一定走这里
{
// Insert at the end of the active portion.
ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle; //event_handlers_[max_handlep1_] .handle_= handle
ACE_SELECT_REACTOR_EVENT_HANDLER (this, this->max_handlep1_) = event_handler; //event_handlers_[max_handlep1_].event_handle_ = event_handler
++this->max_handlep1_; // max_handlep1_增加了
}
else
{
// No more room at the inn!
errno = ENOMEM;
return -1;
}
#else
// Check if this handle is already registered.
ACE_Event_Handler *current_handler =
ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle);
if (current_handler)
{
// Cannot use a different handler for an existing handle.
if (current_handler != event_handler)
return -1;
// Remember that this handle is already registered in the
// Reactor.
existing_handle = 1;
}
ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler;
if (this->max_handlep1_ < handle + 1)
this->max_handlep1_ = handle + 1;
#endif /* ACE_WIN32 */
if (this->select_reactor_.is_suspended_i (handle)) // 检查是否在suspend_set_集合中
{
this->select_reactor_.bit_ops (handle,
mask,
this->select_reactor_.suspend_set_,
ACE_Reactor::ADD_MASK);
}
else
{
this->select_reactor_.bit_ops (handle, //注意这里添加到wait_set_集合中去了,这个是最要关注的地方
mask,
this->select_reactor_.wait_set_,
ACE_Reactor::ADD_MASK);
// Note the fact that we've changed the state of the <wait_set_>,
// which is used by the dispatching loop to determine whether it can
// keep going or if it needs to reconsult select().
// this->select_reactor_.state_changed_ = 1;
}
// If new entry, call add_reference() if needed.
if (!existing_handle)
event_handler->add_reference ();
return 0;
}
/**//*this->select_reactor_.wait_set_中包含这样三个变量rd_mask_/wr_mask_/ex_mask_,每一个结构都入下所示
rd_mask_
size_=0
max_handle_=0xffffffff
mask_
fd_count=0
fd_array=0x00126da8 数组长度为1024
nbits_ 256位的固定数值的数组
*/
int
ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle, /**//*************设置到wait_set_中******************/
ACE_Reactor_Mask mask,
ACE_Select_Reactor_Handle_Set &handle_set,
int ops)
{
ACE_FDS_PTMF ptmf = &ACE_Handle_Set::set_bit;
u_long omask = ACE_Event_Handler::NULL_MASK;
// Find the old reactor masks. This automatically does the work of
// the GET_MASK operation.
if (handle_set.rd_mask_.is_set (handle))
ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK);
if (handle_set.wr_mask_.is_set (handle))
ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK);
if (handle_set.ex_mask_.is_set (handle))
ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK);
switch (ops)
{
case ACE_Reactor::GET_MASK:
// The work for this operation is done in all cases at the
// begining of the function.
break;
case ACE_Reactor::CLR_MASK:
ptmf = &ACE_Handle_Set::clr_bit;
// State was changed. we need to reflect that change in the
// dispatch_mask I assume that only ACE_Reactor::CLR_MASK should
// be treated here which means we need to clear the handle|mask
// from the current dispatch handler
this->clear_dispatch_mask (handle, mask);
/**//* FALLTHRU */
case ACE_Reactor::SET_MASK:
/**//* FALLTHRU */
case ACE_Reactor::ADD_MASK:
// The following code is rather subtle Note that if we are
// doing a ACE_Reactor::SET_MASK then if the bit is not enabled
// in the mask we need to clear the bit from the ACE_Handle_Set.
// On the other hand, if we are doing a ACE_Reactor::CLR_MASK or
// a ACE_Reactor::ADD_MASK we just carry out the operations
// specified by the mask.
// READ, ACCEPT, and CONNECT flag will place the handle in the
// read set.
if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
|| ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
|| ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
{
(handle_set.rd_mask_.*ptmf) (handle);
}
else if (ops == ACE_Reactor::SET_MASK)
handle_set.rd_mask_.clr_bit (handle);
// WRITE and CONNECT flag will place the handle in the write set
if (ACE_BIT_ENABLED (mask,
ACE_Event_Handler::WRITE_MASK)
|| ACE_BIT_ENABLED (mask,
ACE_Event_Handler::CONNECT_MASK))
{
(handle_set.wr_mask_.*ptmf) (handle);
}
else if (ops == ACE_Reactor::SET_MASK)
handle_set.wr_mask_.clr_bit (handle);
// EXCEPT (and CONNECT on Win32) flag will place the handle in
// the except set.
if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK)
#if defined (ACE_WIN32)
|| ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)
#endif /* ACE_WIN32 */
)
{
(handle_set.ex_mask_.*ptmf) (handle);
}
else if (ops == ACE_Reactor::SET_MASK)
handle_set.ex_mask_.clr_bit (handle);
break;
default:
return -1;
}
return omask;
}
ACE_INLINE void
ACE_Handle_Set::set_bit (ACE_HANDLE handle)
{
ACE_TRACE ("ACE_Handle_Set::set_bit");
if ((handle != ACE_INVALID_HANDLE)
&& (!this->is_set (handle)))
{
#if defined (ACE_WIN32)
FD_SET ((SOCKET) handle,
&this->mask_);
this->size_++;
#else /* ACE_WIN32 */
#if defined (ACE_HAS_BIG_FD_SET)
if (this->size_ == 0)
FD_ZERO (&this->mask_);
if (handle < this->min_handle_)
this->min_handle_ = handle;
#endif /* ACE_HAS_BIG_FD_SET */
FD_SET (handle,
&this->mask_);
this->size_++;
if (handle > this->max_handle_)
this->max_handle_ = handle;
#endif /* ACE_WIN32 */
}
}
然后就是分发了:
template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events
(ACE_Time_Value *max_wait_time)
{
ACE_TRACE ("ACE_Select_Reactor_T::handle_events");
#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
// Stash the current time -- the destructor of this object will
// automatically compute how much time elapsed since this method was
// called.
ACE_Countdown_Time countdown (max_wait_time);
ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1);
if (ACE_OS::thr_equal (ACE_Thread::self (),
this->owner_) == 0 || this->deactivated_)
return -1;
// Update the countdown to reflect time waiting for the mutex.
countdown.update ();
#else
if (this->deactivated_)
return -1;
#endif /* ACE_MT_SAFE */
return this->handle_events_i (max_wait_time);
}
template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events_i
(ACE_Time_Value *max_wait_time)
{
int result = -1;
ACE_SEH_TRY
{
// We use the data member dispatch_set_ as the current dispatch
// set.
// We need to start from a clean dispatch_set
this->dispatch_set_.rd_mask_.reset ();
this->dispatch_set_.wr_mask_.reset ();
this->dispatch_set_.ex_mask_.reset ();
int number_of_active_handles =
this->wait_for_multiple_events (this->dispatch_set_,
max_wait_time);
result =
this->dispatch (number_of_active_handles,
this->dispatch_set_);
}
ACE_SEH_EXCEPT (this->release_token ())
{
// As it stands now, we catch and then rethrow all Win32
// structured exceptions so that we can make sure to release the
// <token_> lock correctly.
}
return result;
}
template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::wait_for_multiple_events
(ACE_Select_Reactor_Handle_Set &dispatch_set,
ACE_Time_Value *max_wait_time)
{
u_long width = 0;
ACE_Time_Value timer_buf (0);
ACE_Time_Value *this_timeout;
int number_of_active_handles = this->any_ready (dispatch_set);
// If there are any bits enabled in the <ready_set_> then we'll
// handle those first, otherwise we'll block in <select>.
if (number_of_active_handles == 0)
{
do
{
this_timeout =
this->timer_queue_->calculate_timeout (max_wait_time,
&timer_buf);
width = (u_long) this->handler_rep_.max_handlep1 ();
dispatch_set.rd_mask_ = this->wait_set_.rd_mask_;
dispatch_set.wr_mask_ = this->wait_set_.wr_mask_;
dispatch_set.ex_mask_ = this->wait_set_.ex_mask_;
number_of_active_handles = ACE_OS::select (int (width),
dispatch_set.rd_mask_,
dispatch_set.wr_mask_,
dispatch_set.ex_mask_,
this_timeout);
}
while (number_of_active_handles == -1 && this->handle_error () > 0);
if (number_of_active_handles > 0)
{
#if !defined (ACE_WIN32)
// Resynchronize the fd_sets so their "max" is set properly.
dispatch_set.rd_mask_.sync (this->handler_rep_.max_handlep1 ());
dispatch_set.wr_mask_.sync (this->handler_rep_.max_handlep1 ());
dispatch_set.ex_mask_.sync (this->handler_rep_.max_handlep1 ());
#endif /* ACE_WIN32 */
}
else if (number_of_active_handles == -1)
{
// Normally, select() will reset the bits in dispatch_set
// so that only those filed descriptors that are ready will
// have bits set. However, when an error occurs, the bit
// set remains as it was when the select call was first made.
// Thus, we now have a dispatch_set that has every file
// descriptor that was originally waited for, which is not
// correct. We must clear all the bit sets because we
// have no idea if any of the file descriptors is ready.
//
// NOTE: We dont have a test case to reproduce this
// problem. But pleae dont ignore this and remove it off.
dispatch_set.rd_mask_.reset ();
dispatch_set.wr_mask_.reset ();
dispatch_set.ex_mask_.reset ();
}
}
// Return the number of events to dispatch.
return number_of_active_handles;
}
template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch
(int active_handle_count,
ACE_Select_Reactor_Handle_Set &dispatch_set)
{
ACE_TRACE ("ACE_Select_Reactor_T::dispatch");
int io_handlers_dispatched = 0;
int other_handlers_dispatched = 0;
int signal_occurred = 0;
// The following do/while loop keeps dispatching as long as there
// are still active handles. Note that the only way we should ever
// iterate more than once through this loop is if signals occur
// while we're dispatching other handlers.
do
{
// Note that we keep track of changes to our state. If any of
// the dispatch_*() methods below return -1 it means that the
// <wait_set_> state has changed as the result of an
// <ACE_Event_Handler> being dispatched. This means that we
// need to bail out and rerun the select() loop since our
// existing notion of handles in <dispatch_set> may no longer be
// correct.
//
// In the beginning, our state starts out unchanged. After
// every iteration (i.e., due to signals), our state starts out
// unchanged again.
this->state_changed_ = false;
// Perform the Template Method for dispatching all the handlers.
// First check for interrupts.
if (active_handle_count == -1)
{
// Bail out -- we got here since <select> was interrupted.
if (ACE_Sig_Handler::sig_pending () != 0)
{
ACE_Sig_Handler::sig_pending (0);
// If any HANDLES in the <ready_set_> are activated as a
// result of signals they should be dispatched since
// they may be time critical
active_handle_count = this->any_ready (dispatch_set);
// Record the fact that the Reactor has dispatched a
// handle_signal() method. We need this to return the
// appropriate count below.
signal_occurred = 1;
}
else
return -1;
}
// Handle timers early since they may have higher latency
// constraints than I/O handlers. Ideally, the order of
// dispatching should be a strategy
else if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1)
// State has changed or timer queue has failed, exit loop.
break;
// Check to see if there are no more I/O handles left to
// dispatch AFTER we've handled the timers
else if (active_handle_count == 0)
return io_handlers_dispatched
+ other_handlers_dispatched
+ signal_occurred;
// Next dispatch the notification handlers (if there are any to
// dispatch). These are required to handle multi-threads that
// are trying to update the <Reactor>.
else if (this->dispatch_notification_handlers
(dispatch_set,
active_handle_count,
other_handlers_dispatched) == -1)
// State has changed or a serious failure has occured, so exit
// loop.
break;
// Finally, dispatch the I/O handlers.
else if (this->dispatch_io_handlers
(dispatch_set,
active_handle_count,
io_handlers_dispatched) == -1)
// State has changed, so exit loop.
break;
// if state changed, we need to re-eval active_handle_count,
// so we will not end with an endless loop
if (this->state_changed_)
{
active_handle_count = this->dispatch_set_.rd_mask_.num_set ()
+ this->dispatch_set_.wr_mask_.num_set ()
+ this->dispatch_set_.ex_mask_.num_set ();
}
}
while (active_handle_count > 0);
return io_handlers_dispatched + other_handlers_dispatched + signal_occurred;
}
template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_handlers
(ACE_Select_Reactor_Handle_Set &dispatch_set,
int &number_of_active_handles,
int &number_of_handlers_dispatched)
{
ACE_TRACE ("ACE_Select_Reactor_T::dispatch_io_handlers");
// Handle output events (this code needs to come first to handle the
// obscure case of piggy-backed data coming along with the final
// handshake message of a nonblocking connection).
if (this->dispatch_io_set (number_of_active_handles,
number_of_handlers_dispatched,
ACE_Event_Handler::WRITE_MASK,
dispatch_set.wr_mask_,
this->ready_set_.wr_mask_,
&ACE_Event_Handler::handle_output) == -1)
{
number_of_active_handles -= number_of_handlers_dispatched;
return -1;
}
// ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Select_Reactor_T::dispatch - EXCEPT\n")));
if (this->dispatch_io_set (number_of_active_handles,
number_of_handlers_dispatched,
ACE_Event_Handler::EXCEPT_MASK,
dispatch_set.ex_mask_,
this->ready_set_.ex_mask_,
&ACE_Event_Handler::handle_exception) == -1)
{
number_of_active_handles -= number_of_handlers_dispatched;
return -1;
}
// ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Select_Reactor_T::dispatch - READ\n")));
if (this->dispatch_io_set (number_of_active_handles,
number_of_handlers_dispatched,
ACE_Event_Handler::READ_MASK,
dispatch_set.rd_mask_,
this->ready_set_.rd_mask_,
&ACE_Event_Handler::handle_input) == -1)
{
number_of_active_handles -= number_of_handlers_dispatched;
return -1;
}
number_of_active_handles -= number_of_handlers_dispatched;
return 0;
}
template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_set
(int number_of_active_handles,
int &number_of_handlers_dispatched,
int mask,
ACE_Handle_Set &dispatch_mask,
ACE_Handle_Set &ready_mask,
ACE_EH_PTMF callback)
{
ACE_TRACE ("ACE_Select_Reactor_T::dispatch_io_set");
ACE_HANDLE handle;
ACE_Handle_Set_Iterator handle_iter (dispatch_mask);
while ((handle = handle_iter ()) != ACE_INVALID_HANDLE &&
number_of_handlers_dispatched < number_of_active_handles)
{
++number_of_handlers_dispatched;
this->notify_handle (handle,
mask,
ready_mask,
this->handler_rep_.find (handle),
callback);
// clear the bit from that dispatch mask,
// so when we need to restart the iteration (rebuilding the iterator)
// we will not dispatch the already dispatched handlers
this->clear_dispatch_mask (handle, mask);
if (this->state_changed_)
{
handle_iter.reset_state ();
this->state_changed_ = false;
}
}
return 0;
}
template <class ACE_SELECT_REACTOR_TOKEN> void
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::notify_handle
(ACE_HANDLE handle,
ACE_Reactor_Mask mask,
ACE_Handle_Set &ready_mask,
ACE_Event_Handler *event_handler,
ACE_EH_PTMF ptmf)
{
ACE_TRACE ("ACE_Select_Reactor_T::notify_handle");
// Check for removed handlers.
if (event_handler == 0)
return;
int reference_counting_required =
event_handler->reference_counting_policy ().value () ==
ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
// Call add_reference() if needed.
if (reference_counting_required)
{
event_handler->add_reference ();
}
int status = (event_handler->*ptmf) (handle);
if (status < 0)
this->remove_handler_i (handle, mask);
else if (status > 0)
ready_mask.set_bit (handle);
// Call remove_reference() if needed.
if (reference_counting_required)
{
event_handler->remove_reference ();
}
} 最后看看,当我们hand_input返回-1的时候,又干了什么呢?
template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler_i
(ACE_HANDLE handle,
ACE_Reactor_Mask mask)
{
ACE_TRACE ("ACE_Select_Reactor_T::remove_handler_i");
// Unbind this handle.
return this->handler_rep_.unbind (handle, mask);
}
// Remove the binding of <ACE_HANDLE>.
int
ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
ACE_Reactor_Mask mask)
{
ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
size_t slot = 0;
ACE_Event_Handler *event_handler = this->find (handle, &slot);
if (event_handler == 0)
return -1;
// Clear out the <mask> bits in the Select_Reactor's wait_set.
this->select_reactor_.bit_ops (handle,
mask,
this->select_reactor_.wait_set_,
ACE_Reactor::CLR_MASK);
// And suspend_set.
this->select_reactor_.bit_ops (handle,
mask,
this->select_reactor_.suspend_set_,
ACE_Reactor::CLR_MASK);
// Note the fact that we've changed the state of the <wait_set_>,
// which is used by the dispatching loop to determine whether it can
// keep going or if it needs to reconsult select().
// this->select_reactor_.state_changed_ = 1;
// If there are no longer any outstanding events on this <handle>
// then we can totally shut down the Event_Handler.
int has_any_wait_mask =
(this->select_reactor_.wait_set_.rd_mask_.is_set (handle)
|| this->select_reactor_.wait_set_.wr_mask_.is_set (handle)
|| this->select_reactor_.wait_set_.ex_mask_.is_set (handle));
int has_any_suspend_mask =
(this->select_reactor_.suspend_set_.rd_mask_.is_set (handle)
|| this->select_reactor_.suspend_set_.wr_mask_.is_set (handle)
|| this->select_reactor_.suspend_set_.ex_mask_.is_set (handle));
int complete_removal = 0;
if (!has_any_wait_mask && !has_any_suspend_mask)
{
// The handle has been completed removed.
complete_removal = 1;
ACE_SELECT_REACTOR_EVENT_HANDLER (this, slot) = 0;
#if defined (ACE_WIN32)
ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE;
if (this->max_handlep1_ == (int) slot + 1)
{
// We've deleted the last entry (i.e., i + 1 == the current
// size of the array), so we need to figure out the last
// valid place in the array that we should consider in
// subsequent searches.
int i;
for (i = this->max_handlep1_ - 1;
i >= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE;
--i)
continue;
this->max_handlep1_ = i + 1;
}
#else
if (this->max_handlep1_ == handle + 1)
{
// We've deleted the last entry, so we need to figure out
// the last valid place in the array that is worth looking
// at.
ACE_HANDLE wait_rd_max =
this->select_reactor_.wait_set_.rd_mask_.max_set ();
ACE_HANDLE wait_wr_max =
this->select_reactor_.wait_set_.wr_mask_.max_set ();
ACE_HANDLE wait_ex_max =
this->select_reactor_.wait_set_.ex_mask_.max_set ();
ACE_HANDLE suspend_rd_max =
this->select_reactor_.suspend_set_.rd_mask_.max_set ();
ACE_HANDLE suspend_wr_max =
this->select_reactor_.suspend_set_.wr_mask_.max_set ();
ACE_HANDLE suspend_ex_max =
this->select_reactor_.suspend_set_.ex_mask_.max_set ();
// Compute the maximum of six values.
this->max_handlep1_ = wait_rd_max;
if (this->max_handlep1_ < wait_wr_max)
this->max_handlep1_ = wait_wr_max;
if (this->max_handlep1_ < wait_ex_max)
this->max_handlep1_ = wait_ex_max;
if (this->max_handlep1_ < suspend_rd_max)
this->max_handlep1_ = suspend_rd_max;
if (this->max_handlep1_ < suspend_wr_max)
this->max_handlep1_ = suspend_wr_max;
if (this->max_handlep1_ < suspend_ex_max)
this->max_handlep1_ = suspend_ex_max;
++this->max_handlep1_;
}
#endif /* ACE_WIN32 */
}
int requires_reference_counting =
event_handler->reference_counting_policy ().value () ==
ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
// Close down the <Event_Handler> unless we've been instructed not
// to.
if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
event_handler->handle_close (handle, mask);
// Call remove_reference() if the removal is complete and reference
// counting is needed.
if (complete_removal &&
requires_reference_counting)
{
event_handler->remove_reference ();
}
return 0;
}
void
ACE_Select_Reactor_Impl::clear_dispatch_mask (ACE_HANDLE handle,
ACE_Reactor_Mask mask)
{
ACE_TRACE ("ACE_Select_Reactor_Impl::clear_dispatch_mask");
// Use handle and mask in order to modify the sets
// (wait/suspend/ready/dispatch), that way, the dispatch_io_set loop
// will not be interrupt, and there will no reason to rescan the
// wait_set and re-calling select function, which is *very*
// expensive. It seems that wait/suspend/ready sets are getting
// updated in register/remove bind/unbind etc functions. The only
// thing need to be updated is the dispatch_set (also can be found
// in that file code as dispatch_mask). Because of that, we need
// that dispatch_set to be member of the ACE_Select_Reactor_impl in
// Select_Reactor_Base.h file That way we will have access to that
// member in that function.
// We kind of invalidate the iterator in dispatch_io_set because its
// an array and index built from the original dispatch-set. Take a
// look at dispatch_io_set for more details.
// We only need to clr_bit, because we are interested in clearing the
// handles that was removed, so no dispatching to these handles will
// occur.
if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) ||
ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK))
{
this->dispatch_set_.rd_mask_.clr_bit (handle);
}
if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK))
{
this->dispatch_set_.wr_mask_.clr_bit (handle);
}
if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK))
{
this->dispatch_set_.ex_mask_.clr_bit (handle);
}
// That will make the dispatch_io_set iterator re-start and rescan
// the dispatch set.
this->state_changed_ = true;
}