posts - 58,  comments - 75,  trackbacks - 0

1
int ACE_WFMO_Reactor::handle_events (ACE_Time_Value &how_long)
{
  return this->event_handling (&how_long, FALSE);
}

2
// Waits for and dispatches all events.  Returns -1 on error, 0 if
// max_wait_time expired, or the number of events that were dispatched.
int ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
                                      int alertable)
{
  ACE_TRACE ("ACE_WFMO_Reactor::event_handling");

  // Make sure we are not closed
  if (!this->open_for_business_ || this->deactivated_)
      return -1;

  // Stash the current time -- the destructor of this object will
  // automatically compute how much time elapsed since this method was
  // called.
  ACE_Countdown_Time countdown (max_wait_time);

  int result;
 
  do
  {
      // Check to see if it is ok to enter ::WaitForMultipleObjects
      // This will acquire <this->lock_> on success On failure, the
      // lock will not be acquired
     
      result = this->ok_to_wait (max_wait_time, alertable);
      if (result != 1)
        return result;

      // Increment the number of active threads
      ++this->active_threads_;

      // Release the <lock_>
      this->lock_.release ();

      // Update the countdown to reflect time waiting to play with the
      // mut and event.
     
      countdown.update ();

      // Calculate timeout
      int timeout = this->calculate_timeout (max_wait_time);

      // Wait for event to happen
      DWORD wait_status = this->wait_for_multiple_events (timeout,
                                                          alertable);

      // Upcall
      result = this->safe_dispatch (wait_status);
      if (0 == result)
      {
          // wait_for_multiple_events timed out without dispatching
          // anything.  Because of rounding and conversion errors and
          // such, it could be that the wait loop timed out, but
          // the timer queue said it wasn't quite ready to expire a
          // timer. In this case, max_wait_time won't have quite been
          // reduced to 0, and we need to go around again. If max_wait_time
          // is all the way to 0, just return, as the entire time the
          // caller wanted to wait has been used up.
          countdown.update ();     // Reflect time waiting for events
         
          if (0 == max_wait_time || max_wait_time->usec () == 0)
            break;
      }
  }while (result == 0);

  return result;
}

3
int ACE_WFMO_Reactor::safe_dispatch (DWORD wait_status)
{
  int result = -1;
  ACE_SEH_TRY
  {
      result = this->dispatch (wait_status);
  }
  ACE_SEH_FINALLY
  {
      this->update_state ();
  }

  return result;
}

4
int ACE_WFMO_Reactor::dispatch (DWORD wait_status)
{
  int handlers_dispatched = 0;

  // Expire timers
  handlers_dispatched += this->expire_timers ();

  switch (wait_status)
  {
  case WAIT_FAILED: // Failure.
      ACE_OS::set_errno_to_last_error ();
      return -1;

  case WAIT_TIMEOUT: // Timeout.
      errno = ETIME;
      return handlers_dispatched;

  default:  // Dispatch.
      // We'll let dispatch worry about abandoned mutes.
      handlers_dispatched += this->dispatch_handles (wait_status);
      return handlers_dispatched;
  }
}

5
// Dispatches any active handles from <handles_[slot]> to
// <handles_[max_handlep1_]>, polling through our handle set looking
// for active handles.

int ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
{
  // dispatch_slot is the absolute slot.  Only += is used to
  // increment it.
  DWORD dispatch_slot = 0;

  // Cache this value, this is the absolute value.
  DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();

  // nCount starts off at <max_handlep1>, this is a transient count of
  // handles last waited on.
  DWORD nCount = max_handlep1;

  for (int number_of_handlers_dispatched = 1;;++number_of_handlers_dispatched)
  {
      const bool ok = (wait_status >= WAIT_OBJECT_0 && wait_status <= (WAIT_OBJECT_0 + nCount));

      if (ok)
        dispatch_slot += wait_status - WAIT_OBJECT_0;
      else
        // Otherwise, a handle was abandoned.
        dispatch_slot += wait_status - WAIT_ABANDONED_0;

      // Dispatch handler
      if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
        return -1;

      // Increment slot
      ++dispatch_slot;

      // We're done.
      if (dispatch_slot >= max_handlep1)
        return number_of_handlers_dispatched;

      // Readjust nCount
      nCount = max_handlep1 - dispatch_slot;

      // Check the remaining handles
      wait_status = this->poll_remaining_handles (dispatch_slot);
      switch (wait_status)
      {
        case WAIT_FAILED: // Failure.
          ACE_OS::set_errno_to_last_error ();
          /* FALLTHRU */
        case WAIT_TIMEOUT:
          // There are no more handles ready, we can return.
          return number_of_handlers_dispatched;
      }
  }
}

6
int ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
                                        DWORD max_handlep1)
{
  // Check if there are window messages that need to be dispatched
  if (slot == max_handlep1)
    return this->dispatch_window_messages ();

  // Dispatch the handler if it has not been scheduled for deletion.
  // Note that this is a very week test if there are multiple threads
  // dispatching this slot as no locks are held here. Generally, you
  // do not want to do something like deleting the this pointer in
  // handle_close() if you have registered multiple times and there is
  // more than one thread in WFMO_Reactor->handle_events().
  else if (!this->handler_rep_.scheduled_for_deletion (slot))
  {
      ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);

      if (this->handler_rep_.current_info ()[slot].io_entry_)
        return this->complex_dispatch_handler (slot,
                                               event_handle);
      else
        return this->simple_dispatch_handler (slot,
                                              event_handle);
  }
  else
    // The handle was scheduled for deletion, so we will skip it.
    return 0;
}

7
int ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
                                                ACE_HANDLE event_handle)
{
  // This dispatch is used for I/O entires.

  ACE_WFMO_Reactor_Handler_Repository::Current_Info &current_info =
    this->handler_rep_.current_info ()[slot];

  WSANETWORKEVENTS events;
  ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
  if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
                              event_handle,
                              &events) == SOCKET_ERROR)
    problems = ACE_Event_Handler::ALL_EVENTS_MASK;
  else
  {
      // Prepare for upcalls. Clear the bits from <events> representing
      // events the handler is not interested in. If there are any left,
      // do the upcall(s). upcall will replace events.lNetworkEvents
      // with bits representing any functions that requested a repeat
      // callback before checking handles again. In this case, continue
      // to call back unless the handler is unregistered as a result of
      // one of the upcalls. The way this is written, the upcalls will
      // keep being done even if one or more upcalls reported problems.
      // In practice this may turn out not so good, but let's see. If any
      // problems, please notify Steve Huston <shuston@riverace.com>
      // before or after you change this code.
      events.lNetworkEvents &= current_info.network_events_;
      while (events.lNetworkEvents != 0)
      {
          ACE_Event_Handler *event_handler = current_info.event_handler_;

          int reference_counting_required =
            event_handler->reference_counting_policy ().value () ==
            ACE_Event_Handler::Reference_Counting_Policy::ENABLED;

          // Call add_reference() if needed.
          if (reference_counting_required)
          {
              event_handler->add_reference ();
          }

          // Upcall
          problems |= this->upcall (current_info.event_handler_,
                                    current_info.io_handle_,
                                    events);

          // Call remove_reference() if needed.
          if (reference_counting_required)
          {
              event_handler->remove_reference ();
          }

          if (this->handler_rep_.scheduled_for_deletion (slot))
            break;
      }
  }

  if (problems != ACE_Event_Handler::NULL_MASK
      && !this->handler_rep_.scheduled_for_deletion (slot)  )
    this->handler_rep_.unbind (event_handle, problems);

  return 0;
}

8
ACE_Reactor_Mask ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
                                           ACE_HANDLE io_handle,
                                           WSANETWORKEVENTS &events)
{
  // This method figures out what exactly has happened to the socket
  // and then calls appropriate methods.
  ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;

  // Go through the events and do the indicated upcalls. If the handler
  // doesn't want to be called back, clear the bit for that event.
  // At the end, set the bits back to <events> to request a repeat call.

  long actual_events = events.lNetworkEvents;
  int action;

  if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
  {
      action = event_handler->handle_output (io_handle);
      if (action <= 0)
      {
          ACE_CLR_BITS (actual_events, FD_WRITE);
          if (action == -1)
            ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
      }
  }

  if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
  {
      if (events.iErrorCode[FD_CONNECT_BIT] == 0)
      {
          // Successful connect
          action = event_handler->handle_output (io_handle);
          if (action <= 0)
          {
              ACE_CLR_BITS (actual_events, FD_CONNECT);
              if (action == -1)
                ACE_SET_BITS (problems, ACE_Event_Handler::CONNECT_MASK);
          }
      }
      // Unsuccessful connect
      else
      {
          action = event_handler->handle_input (io_handle);
          if (action <= 0)
          {
              ACE_CLR_BITS (actual_events, FD_CONNECT);
              if (action == -1)
                ACE_SET_BITS (problems, ACE_Event_Handler::CONNECT_MASK);
          }
      }
  }

  if (ACE_BIT_ENABLED (actual_events, FD_OOB))
  {
      action = event_handler->handle_exception (io_handle);
      if (action <= 0)
      {
          ACE_CLR_BITS (actual_events, FD_OOB);
          if (action == -1)
            ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
      }
  }

  if (ACE_BIT_ENABLED (actual_events, FD_READ))
  {
      action = event_handler->handle_input (io_handle);
      if (action <= 0)
      {
          ACE_CLR_BITS (actual_events, FD_READ);
          if (action == -1)
            ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
      }
  }

  if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
      && ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
  {
      action = event_handler->handle_input (io_handle);
      if (action <= 0)
      {
          ACE_CLR_BITS (actual_events, FD_CLOSE);
          if (action == -1)
            ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
      }
  }

  if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
  {
      action = event_handler->handle_input (io_handle);
      if (action <= 0)
      {
          ACE_CLR_BITS (actual_events, FD_ACCEPT);
          if (action == -1)
            ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
      }
  }

  if (ACE_BIT_ENABLED (actual_events, FD_QOS))
  {
      action = event_handler->handle_qos (io_handle);
      if (action <= 0)
      {
          ACE_CLR_BITS (actual_events, FD_QOS);
          if (action == -1)
            ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
      }
  }

  if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
  {
      action = event_handler->handle_group_qos (io_handle);
      if (action <= 0)
      {
          ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
          if (action == -1)
            ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
      }
  }

  events.lNetworkEvents = actual_events;
  return problems;
}

posted on 2007-02-22 10:54 walkspeed 阅读(1574) 评论(0)  编辑 收藏 引用 所属分类: ACE Farmeworks

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理



<2007年5月>
293012345
6789101112
13141516171819
20212223242526
272829303112
3456789

常用链接

留言簿(4)

随笔分类(64)

随笔档案(58)

文章分类(3)

文章档案(3)

相册

收藏夹(9)

C++零碎

好友

搜索

  •  

积分与排名

  • 积分 - 159658
  • 排名 - 163

最新评论

阅读排行榜

评论排行榜