1
int ACE_WFMO_Reactor::handle_events (ACE_Time_Value &how_long)
{
return this->event_handling (&how_long, FALSE);
}
2
// Waits for and dispatches all events. Returns -1 on error, 0 if
// max_wait_time expired, or the number of events that were dispatched.
int ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
int alertable)
{
ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
// Make sure we are not closed
if (!this->open_for_business_ || this->deactivated_)
return -1;
// Stash the current time -- the destructor of this object will
// automatically compute how much time elapsed since this method was
// called.
ACE_Countdown_Time countdown (max_wait_time);
int result;
do
{
// Check to see if it is ok to enter ::WaitForMultipleObjects
// This will acquire <this->lock_> on success On failure, the
// lock will not be acquired
result = this->ok_to_wait (max_wait_time, alertable);
if (result != 1)
return result;
// Increment the number of active threads
++this->active_threads_;
// Release the <lock_>
this->lock_.release ();
// Update the countdown to reflect time waiting to play with the
// mut and event.
countdown.update ();
// Calculate timeout
int timeout = this->calculate_timeout (max_wait_time);
// Wait for event to happen
DWORD wait_status = this->wait_for_multiple_events (timeout,
alertable);
// Upcall
result = this->safe_dispatch (wait_status);
if (0 == result)
{
// wait_for_multiple_events timed out without dispatching
// anything. Because of rounding and conversion errors and
// such, it could be that the wait loop timed out, but
// the timer queue said it wasn't quite ready to expire a
// timer. In this case, max_wait_time won't have quite been
// reduced to 0, and we need to go around again. If max_wait_time
// is all the way to 0, just return, as the entire time the
// caller wanted to wait has been used up.
countdown.update (); // Reflect time waiting for events
if (0 == max_wait_time || max_wait_time->usec () == 0)
break;
}
}while (result == 0);
return result;
}
3
int ACE_WFMO_Reactor::safe_dispatch (DWORD wait_status)
{
int result = -1;
ACE_SEH_TRY
{
result = this->dispatch (wait_status);
}
ACE_SEH_FINALLY
{
this->update_state ();
}
return result;
}
4
int ACE_WFMO_Reactor::dispatch (DWORD wait_status)
{
int handlers_dispatched = 0;
// Expire timers
handlers_dispatched += this->expire_timers ();
switch (wait_status)
{
case WAIT_FAILED: // Failure.
ACE_OS::set_errno_to_last_error ();
return -1;
case WAIT_TIMEOUT: // Timeout.
errno = ETIME;
return handlers_dispatched;
default: // Dispatch.
// We'll let dispatch worry about abandoned mutes.
handlers_dispatched += this->dispatch_handles (wait_status);
return handlers_dispatched;
}
}
5
// Dispatches any active handles from <handles_[slot]> to
// <handles_[max_handlep1_]>, polling through our handle set looking
// for active handles.
int ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
{
// dispatch_slot is the absolute slot. Only += is used to
// increment it.
DWORD dispatch_slot = 0;
// Cache this value, this is the absolute value.
DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();
// nCount starts off at <max_handlep1>, this is a transient count of
// handles last waited on.
DWORD nCount = max_handlep1;
for (int number_of_handlers_dispatched = 1;;++number_of_handlers_dispatched)
{
const bool ok = (wait_status >= WAIT_OBJECT_0 && wait_status <= (WAIT_OBJECT_0 + nCount));
if (ok)
dispatch_slot += wait_status - WAIT_OBJECT_0;
else
// Otherwise, a handle was abandoned.
dispatch_slot += wait_status - WAIT_ABANDONED_0;
// Dispatch handler
if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
return -1;
// Increment slot
++dispatch_slot;
// We're done.
if (dispatch_slot >= max_handlep1)
return number_of_handlers_dispatched;
// Readjust nCount
nCount = max_handlep1 - dispatch_slot;
// Check the remaining handles
wait_status = this->poll_remaining_handles (dispatch_slot);
switch (wait_status)
{
case WAIT_FAILED: // Failure.
ACE_OS::set_errno_to_last_error ();
/* FALLTHRU */
case WAIT_TIMEOUT:
// There are no more handles ready, we can return.
return number_of_handlers_dispatched;
}
}
}
6
int ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
DWORD max_handlep1)
{
// Check if there are window messages that need to be dispatched
if (slot == max_handlep1)
return this->dispatch_window_messages ();
// Dispatch the handler if it has not been scheduled for deletion.
// Note that this is a very week test if there are multiple threads
// dispatching this slot as no locks are held here. Generally, you
// do not want to do something like deleting the this pointer in
// handle_close() if you have registered multiple times and there is
// more than one thread in WFMO_Reactor->handle_events().
else if (!this->handler_rep_.scheduled_for_deletion (slot))
{
ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
if (this->handler_rep_.current_info ()[slot].io_entry_)
return this->complex_dispatch_handler (slot,
event_handle);
else
return this->simple_dispatch_handler (slot,
event_handle);
}
else
// The handle was scheduled for deletion, so we will skip it.
return 0;
}
7
int ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
ACE_HANDLE event_handle)
{
// This dispatch is used for I/O entires.
ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
this->handler_rep_.current_info ()[slot];
WSANETWORKEVENTS events;
ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
event_handle,
&events) == SOCKET_ERROR)
problems = ACE_Event_Handler::ALL_EVENTS_MASK;
else
{
// Prepare for upcalls. Clear the bits from <events> representing
// events the handler is not interested in. If there are any left,
// do the upcall(s). upcall will replace events.lNetworkEvents
// with bits representing any functions that requested a repeat
// callback before checking handles again. In this case, continue
// to call back unless the handler is unregistered as a result of
// one of the upcalls. The way this is written, the upcalls will
// keep being done even if one or more upcalls reported problems.
// In practice this may turn out not so good, but let's see. If any
// problems, please notify Steve Huston <shuston@riverace.com>
// before or after you change this code.
events.lNetworkEvents &= current_info.network_events_;
while (events.lNetworkEvents != 0)
{
ACE_Event_Handler *event_handler = current_info.event_handler_;
int reference_counting_required =
event_handler->reference_counting_policy ().value () ==
ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
// Call add_reference() if needed.
if (reference_counting_required)
{
event_handler->add_reference ();
}
// Upcall
problems |= this->upcall (current_info.event_handler_,
current_info.io_handle_,
events);
// Call remove_reference() if needed.
if (reference_counting_required)
{
event_handler->remove_reference ();
}
if (this->handler_rep_.scheduled_for_deletion (slot))
break;
}
}
if (problems != ACE_Event_Handler::NULL_MASK
&& !this->handler_rep_.scheduled_for_deletion (slot) )
this->handler_rep_.unbind (event_handle, problems);
return 0;
}
8
ACE_Reactor_Mask ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
ACE_HANDLE io_handle,
WSANETWORKEVENTS &events)
{
// This method figures out what exactly has happened to the socket
// and then calls appropriate methods.
ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
// Go through the events and do the indicated upcalls. If the handler
// doesn't want to be called back, clear the bit for that event.
// At the end, set the bits back to <events> to request a repeat call.
long actual_events = events.lNetworkEvents;
int action;
if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
{
action = event_handler->handle_output (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_WRITE);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
}
}
if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
{
if (events.iErrorCode[FD_CONNECT_BIT] == 0)
{
// Successful connect
action = event_handler->handle_output (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_CONNECT);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::CONNECT_MASK);
}
}
// Unsuccessful connect
else
{
action = event_handler->handle_input (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_CONNECT);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::CONNECT_MASK);
}
}
}
if (ACE_BIT_ENABLED (actual_events, FD_OOB))
{
action = event_handler->handle_exception (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_OOB);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
}
}
if (ACE_BIT_ENABLED (actual_events, FD_READ))
{
action = event_handler->handle_input (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_READ);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
}
}
if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
&& ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
{
action = event_handler->handle_input (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_CLOSE);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
}
}
if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
{
action = event_handler->handle_input (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_ACCEPT);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
}
}
if (ACE_BIT_ENABLED (actual_events, FD_QOS))
{
action = event_handler->handle_qos (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_QOS);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
}
}
if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
{
action = event_handler->handle_group_qos (io_handle);
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
}
}
events.lNetworkEvents = actual_events;
return problems;
}
posted on 2007-02-22 10:54
walkspeed 阅读(1580)
评论(0) 编辑 收藏 引用 所属分类:
ACE Farmeworks