一动不如一静
C++博客
首页
新随笔
联系
聚合
管理
20 Posts :: 0 Stories :: 10 Comments :: 0 Trackbacks
常用链接
我的随笔
我的评论
我参与的随笔
留言簿
(4)
给我留言
查看公开留言
查看私人留言
随笔分类
ACE(10)
C++(9)
Design patter
life(13)
snmp++
tool
随笔档案
2008年2月 (1)
2007年5月 (1)
2007年3月 (2)
2007年2月 (3)
2006年12月 (2)
2006年10月 (2)
2006年6月 (9)
相册
2007年夜饭
书柜
学习资源
各种软件下载
搜索
最新评论
1. re: 关于ACE_TP_Reactor
评论内容较长,点击标题查看
--aa
2. re: 如何根据crash地址找到对应的行号
查看堆栈不就完了。。。
--等等
3. re: error LNK2019:无法解析的外部符号
不错,学习了
--创意家居
4. re: 如何根据crash地址找到对应的行号
你这个办法不太好用啊,比如在程序里面用了个strcpy出错了,出错的地址
最后就映射到strcpy.asm里面去了,怎么知道是在主程序的什么地方调用了
strcpy阿
--mikey
5. re: 如何根据crash地址找到对应的行号
这个还是有点专业哦,能不能来一个再详细点的哦,方首页让大家一起学习了!
--梦在天涯
阅读排行榜
1. error LNK2019:无法解析的外部符号(26286)
2. 如何用windug分析dump文件(还原堆栈信息)(4923)
3. window上如何设置才能获取dump文件(3926)
4. 关于ACE_TP_Reactor(3087)
5. TP_Reactor 学习笔记(2248)
评论排行榜
1. error LNK2019:无法解析的外部符号(5)
2. 如何根据crash地址找到对应的行号(3)
3. 关于MPC的问题收集(1)
4. 关于ACE_TP_Reactor(1)
5. 异步I/O(0)
TP_Reactor 学习笔记
ACE_TP_Reactor::ACE_TP_Reactor (size_t max_number_of_handles,
int
restart,
ACE_Sig_Handler
*
sh,
ACE_Timer_Queue
*
tq,
int
mask_signals,
int
s_queue)
: ACE_Select_Reactor (max_number_of_handles, restart, sh, tq,
0
,
0
, mask_signals, s_queue)
{
ACE_TRACE (
"
ACE_TP_Reactor::ACE_TP_Reactor
"
);
this
->
supress_notify_renew (
1
);
}
template
<
class
ACE_SELECT_REACTOR_TOKEN
>
ACE_Select_Reactor_T
<
ACE_SELECT_REACTOR_TOKEN
>
::ACE_Select_Reactor_T
(size_t size,
int
rs,
ACE_Sig_Handler
*
sh,
ACE_Timer_Queue
*
tq,
int
disable_notify_pipe,
ACE_Reactor_Notify
*
notify,
int
mask_signals,
int
s_queue)
: ACE_Select_Reactor_Impl (mask_signals)
, token_ (
*
this
, s_queue)
, lock_adapter_ (token_)
, deactivated_ (
0
)
{
ACE_TRACE (
"
ACE_Select_Reactor_T::ACE_Select_Reactor_T
"
);
if
(
this
->
open (size,
rs,
sh,
tq,
disable_notify_pipe,
notify)
==
-
1
)
ACE_ERROR ((LM_ERROR,
ACE_LIB_TEXT (
"
%p\n
"
),
ACE_LIB_TEXT (
"
ACE_Select_Reactor_T::open
"
)
ACE_LIB_TEXT (
"
failed inside ACE_Select_Reactor_T::CTOR
"
)));
}
template
<
class
ACE_SELECT_REACTOR_TOKEN
>
int
ACE_Select_Reactor_T
<
ACE_SELECT_REACTOR_TOKEN
>
::open
(size_t size,
int
restart,
ACE_Sig_Handler
*
sh,
ACE_Timer_Queue
*
tq,
int
disable_notify_pipe,
ACE_Reactor_Notify
*
notify)
{
ACE_TRACE (
"
ACE_Select_Reactor_T::open
"
);
ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon,
this
->
token_,
-
1
));
//
Can't initialize ourselves more than once.
if
(
this
->
initialized_)
return
-
1
;
this
->
owner_
=
ACE_Thread::self ();
this
->
restart_
=
restart;
this
->
signal_handler_
=
sh;
this
->
timer_queue_
=
tq;
this
->
notify_handler_
=
notify;
int
result
=
0
;
//
Allows the signal handler to be overridden.
if
(
this
->
signal_handler_
==
0
)
{
ACE_NEW_RETURN (
this
->
signal_handler_,
ACE_Sig_Handler,
-
1
);
if
(
this
->
signal_handler_
==
0
)
result
=
-
1
;
else
this
->
delete_signal_handler_
=
1
;
}
//
Allows the timer queue to be overridden.
if
(result
!=
-
1
&&
this
->
timer_queue_
==
0
)
{
ACE_NEW_RETURN (
this
->
timer_queue_,
ACE_Timer_Heap,
-
1
);
if
(
this
->
timer_queue_
==
0
)
result
=
-
1
;
else
this
->
delete_timer_queue_
=
1
;
}
//
Allows the Notify_Handler to be overridden.
if
(result
!=
-
1
&&
this
->
notify_handler_
==
0
)
{
ACE_NEW_RETURN (
this
->
notify_handler_,
ACE_Select_Reactor_Notify,
-
1
);
if
(
this
->
notify_handler_
==
0
)
result
=
-
1
;
else
this
->
delete_notify_handler_
=
1
;
}
if
(result
!=
-
1
&&
this
->
handler_rep_.open (size)
==
-
1
)
/**/
/*
**********handler_rep_.open**************
*/
result
=
-
1
;
else
if
(
this
->
notify_handler_
->
open (
this
,
/**/
/*
*****notify_handler_->open****
*/
0
,
disable_notify_pipe)
==
-
1
)
result
=
-
1
;
if
(result
!=
-
1
)
//
We're all set to go.
this
->
initialized_
=
1
;
else
//
This will close down all the allocated resources properly.
this
->
close ();
return
result;
}
ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (
void
)
: max_notify_iterations_ (
-
1
)
{
}
int
ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl
*
r,
ACE_Timer_Queue
*
,
int
disable_notify_pipe)
{
ACE_TRACE (
"
ACE_Select_Reactor_Notify::open
"
);
if
(disable_notify_pipe
==
0
)
{
this
->
select_reactor_
=
dynamic_cast
<
ACE_Select_Reactor_Impl
*>
(r);
if
(select_reactor_
==
0
)
{
errno
=
EINVAL;
return
-
1
;
}
if
(
this
->
notification_pipe_.open ()
==
-
1
)
return
-
1
;
#if
defined (F_SETFD)
ACE_OS::fcntl (
this
->
notification_pipe_.read_handle (), F_SETFD,
1
);
ACE_OS::fcntl (
this
->
notification_pipe_.write_handle (), F_SETFD,
1
);
#endif
/* F_SETFD */
#if
defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
ACE_Notification_Buffer
*
temp;
ACE_NEW_RETURN (temp,
ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
-
1
);
if
(
this
->
alloc_queue_.enqueue_head (temp)
==
-
1
)
{
delete [] temp;
return
-
1
;
}
for
(size_t i
=
0
; i
<
ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
++
i)
if
(free_queue_.enqueue_head (temp
+
i)
==
-
1
)
return
-
1
;
#endif
/* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
//
There seems to be a Win32 bug with this
Set this into
//
non-blocking mode.
if
(ACE::set_flags (
this
->
notification_pipe_.read_handle (),
/**/
/*
**************设置为非阻赛模式*********
*/
ACE_NONBLOCK)
==
-
1
)
return
-
1
;
else
return
this
->
select_reactor_
->
register_handler
/**/
/*
************注册处理器***************
*/
(
this
->
notification_pipe_.read_handle (),
this
,
ACE_Event_Handler::READ_MASK);
}
else
{
this
->
select_reactor_
=
0
;
return
0
;
}
}
int
ACE_Pipe::open (
int
buffer_size)
{
ACE_TRACE (
"
ACE_Pipe::open
"
);
#if
defined (ACE_LACKS_SOCKETPAIR) || defined (__Lynx__)
ACE_INET_Addr my_addr;
ACE_SOCK_Acceptor acceptor;
ACE_SOCK_Connector connector;
ACE_SOCK_Stream reader;
ACE_SOCK_Stream writer;
int
result
=
0
;
#
if
defined (ACE_WIN32)
ACE_INET_Addr local_any (static_cast
<
u_short
>
(
0
), ACE_LOCALHOST);
#
else
ACE_Addr local_any
=
ACE_Addr::sap_any;
# endif
/**/
/*
ACE_WIN32
*/
//
Bind listener to any port and then find out what the port was.
if
(acceptor.open (local_any)
==
-
1
||
acceptor.get_local_addr (my_addr)
==
-
1
)
result
=
-
1
;
else
{
ACE_INET_Addr sv_addr (my_addr.get_port_number (),
ACE_LOCALHOST);
//
Establish a connection within the same process.
if
(connector.connect (writer, sv_addr)
==
-
1
)
result
=
-
1
;
else
if
(acceptor.accept (reader)
==
-
1
)
{
writer.close ();
result
=
-
1
;
}
}
//
Close down the acceptor endpoint since we don't need it anymore.
acceptor.close ();
if
(result
==
-
1
)
return
-
1
;
this
->
handles_[
0
]
=
reader.get_handle ();
this
->
handles_[
1
]
=
writer.get_handle ();
#
if
!
defined (ACE_LACKS_TCP_NODELAY)
int
one
=
1
;
//
Make sure that the TCP stack doesn't try to buffer small writes.
//
Since this communication is purely local to the host it doesn't
//
affect network performance.
if
(writer.set_option (ACE_IPPROTO_TCP,
TCP_NODELAY,
&
one,
sizeof
one)
==
-
1
)
{
this
->
close ();
return
-
1
;
}
# endif
/**/
/*
! ACE_LACKS_TCP_NODELAY
*/
#
if
defined (ACE_LACKS_SOCKET_BUFSIZ)
ACE_UNUSED_ARG (buffer_size);
#
else
/**/
/*
! ACE_LACKS_SOCKET_BUFSIZ
*/
if
(reader.set_option (SOL_SOCKET,
SO_RCVBUF,
reinterpret_cast
<
void
*>
(
&
buffer_size),
sizeof
(buffer_size))
==
-
1
&&
errno
!=
ENOTSUP)
{
this
->
close ();
return
-
1
;
}
else
if
(writer.set_option (SOL_SOCKET,
SO_SNDBUF,
reinterpret_cast
<
void
*>
(
&
buffer_size),
sizeof
(buffer_size))
==
-
1
&&
errno
!=
ENOTSUP)
{
this
->
close ();
return
-
1
;
}
# endif
/**/
/*
! ACE_LACKS_SOCKET_BUFSIZ
*/
#elif
defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
ACE_UNUSED_ARG (buffer_size);
if
(ACE_OS::pipe (
this
->
handles_)
==
-
1
)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_LIB_TEXT (
"
%p\n
"
),
ACE_LIB_TEXT (
"
pipe
"
)),
-
1
);
#if
!defined(__QNX__)
int
arg
=
RMSGN;
//
Enable "msg no discard" mode, which ensures that record
//
boundaries are maintained when messages are sent and received.
if
(ACE_OS::ioctl (
this
->
handles_[
0
],
I_SRDOPT,
(
void
*
) arg)
==
-
1
||
ACE_OS::ioctl (
this
->
handles_[
1
],
I_SRDOPT,
(
void
*
) arg)
==
-
1
)
{
this
->
close ();
ACE_ERROR_RETURN ((LM_ERROR,
ACE_LIB_TEXT (
"
%p\n
"
),
ACE_LIB_TEXT (
"
ioctl
"
)),
-
1
);
}
#endif
/* __QNX__ */
#else
/* ! ACE_LACKS_SOCKETPAIR && ! ACE_HAS_STREAM_PIPES */
if
(ACE_OS::socketpair (AF_UNIX,
SOCK_STREAM,
0
,
this
->
handles_)
==
-
1
)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_LIB_TEXT (
"
%p\n
"
),
ACE_LIB_TEXT (
"
socketpair
"
)),
-
1
);
#
if
defined (ACE_LACKS_SOCKET_BUFSIZ)
ACE_UNUSED_ARG (buffer_size);
#
else
/**/
/*
! ACE_LACKS_SOCKET_BUFSIZ
*/
if
(ACE_OS::setsockopt (
this
->
handles_[
0
],
SOL_SOCKET,
SO_RCVBUF,
reinterpret_cast
<
const
char
*>
(
&
buffer_size),
sizeof
(buffer_size))
==
-
1
&&
errno
!=
ENOTSUP)
{
this
->
close ();
return
-
1
;
}
if
(ACE_OS::setsockopt (
this
->
handles_[
1
],
SOL_SOCKET,
SO_SNDBUF,
reinterpret_cast
<
const
char
*>
(
&
buffer_size),
sizeof
(buffer_size))
==
-
1
&&
errno
!=
ENOTSUP)
{
this
->
close ();
return
-
1
;
}
# endif
/**/
/*
! ACE_LACKS_SOCKET_BUFSIZ
*/
#endif
/* ! ACE_LACKS_SOCKETPAIR && ! ACE_HAS_STREAM_PIPES */
//
Point both the read and write HANDLES to the appropriate socket
//
HANDLEs.
return
0
;
}
ACE_INLINE
int
ACE_SOCK::set_option (
int
level,
int
option,
void
*
optval,
int
optlen)
const
{
ACE_TRACE (
"
ACE_SOCK::set_option
"
);
return
ACE_OS::setsockopt (
this
->
get_handle (), level,
option, (
char
*
) optval, optlen);
}
ACE_INLINE
int
ACE_OS::setsockopt (ACE_HANDLE handle,
int
level,
int
optname,
const
char
*
optval,
int
optlen)
{
ACE_OS_TRACE (
"
ACE_OS::setsockopt
"
);
#if
defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0) && defined(SO_REUSEPORT)
//
To work around an inconsistency with Microsofts implementation of
//
sockets, we will check for SO_REUSEADDR, and ignore it. Winsock
//
always behaves as if SO_REUSEADDR=1. Some implementations have
//
the same behaviour as Winsock, but use a new name for
//
it. SO_REUSEPORT. If you want the normal behaviour for
//
SO_REUSEADDR=0, then NT 4 sp4 and later supports
//
SO_EXCLUSIVEADDRUSE. This also requires using an updated Platform
//
SDK so it was decided to ignore the option for now. (Especially
//
since Windows always sets SO_REUSEADDR=1, which we can mimic by doing
//
nothing.)
if
(level
==
SOL_SOCKET)
{
if
(optname
==
SO_REUSEADDR)
{
return
0
;
//
Not supported by Winsock
}
if
(optname
==
SO_REUSEPORT)
{
optname
=
SO_REUSEADDR;
}
}
#endif
/*ACE_HAS_WINSOCK2*/
int
result;
ACE_SOCKCALL (::setsockopt ((ACE_SOCKET) handle,
level,
optname,
(ACE_SOCKOPT_TYPE1) optval,
optlen),
int
,
-
1
,
result);
#if
defined (WSAEOPNOTSUPP)
if
(result
==
-
1
&&
errno
==
WSAEOPNOTSUPP)
#else
if
(result
==
-
1
)
#endif
/* WSAEOPNOTSUPP */
errno
=
ENOTSUP;
return
result;
}
注册的部分:
int
ACE_TP_Reactor::register_handler (ACE_Event_Handler
*
eh,
ACE_Reactor_Mask mask)
{
return
ACE_Select_Reactor::register_handler (eh,
mask);
}
int
ACE_TP_Reactor::handle_events (ACE_Time_Value
*
max_wait_time)
{
ACE_TRACE (
"
ACE_TP_Reactor::handle_events
"
);
//
Stash the current time -- the destructor of this object will
//
automatically compute how much time elapsed since this method was
//
called.
ACE_Countdown_Time countdown (max_wait_time);
//
//
The order of these events is very subtle, modify with care.
//
//
Instantiate the token guard which will try grabbing the token for
//
this thread.
ACE_TP_Token_Guard guard (
this
->
token_);
int
result
=
guard.acquire_read_token (max_wait_time);
//
这里有获取token锁
//
If the guard is NOT the owner just return the retval
if
(
!
guard.is_owner ())
return
result;
//
After getting the lock just just for deactivation..
if
(
this
->
deactivated_)
return
-
1
;
//
Update the countdown to reflect time waiting for the token.
countdown.update ();
return
this
->
dispatch_i (max_wait_time,
guard);
}
int
ACE_TP_Reactor::dispatch_i (ACE_Time_Value
*
max_wait_time,
ACE_TP_Token_Guard
&
guard)
{
int
event_count
=
this
->
get_event_for_dispatching (max_wait_time);
int
result
=
0
;
//
Note: We are passing the <event_count> around, to have record of
//
how many events still need processing. May be this could be
//
useful in future.
//
Dispatch signals
if
(event_count
==
-
1
)
{
//
Looks like we dont do any upcalls in dispatch signals. If at
//
a later point of time, we decide to handle signals we have to
//
release the lock before we make any upcalls.. What is here
//
now is not the right thing
//
//
@@ We need to do better..
return
this
->
handle_signals (event_count,
guard);
}
//
If there are no signals and if we had received a proper
//
event_count then first look at dispatching timeouts. We need to
//
handle timers early since they may have higher latency
//
constraints than I/O handlers. Ideally, the order of dispatching
//
should be a strategy
//
NOTE: The event count does not have the number of timers that
//
needs dispatching. But we are still passing this along. We dont
//
need to do that. In the future we *may* have the timers also
//
returned through the <event_count>. Just passing that along for
//
that day.
result
=
this
->
handle_timer_events (event_count,
guard);
if
(result
>
0
)
return
result;
//
Else just go ahead fall through for further handling.
if
(event_count
>
0
)
{
//
Next dispatch the notification handlers (if there are any to
//
dispatch). These are required to handle multiple-threads
//
that are trying to update the <Reactor>.
result
=
this
->
handle_notify_events (event_count,
guard);
if
(result
>
0
)
return
result;
//
Else just fall through for further handling
}
if
(event_count
>
0
)
{
//
Handle socket events
return
this
->
handle_socket_events (event_count,
guard);
}
return
0
;
}
int
ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value
*
max_wait_time)
{
//
If the reactor handler state has changed, clear any remembered
//
ready bits and re-scan from the master wait_set.
if
(
this
->
state_changed_)
{
this
->
ready_set_.rd_mask_.reset ();
this
->
ready_set_.wr_mask_.reset ();
this
->
ready_set_.ex_mask_.reset ();
this
->
state_changed_
=
false
;
}
else
{
//
This is a hack
somewhere, under certain conditions (which
//
I don't understand
) the mask will have all of its bits clear,
//
yet have a size_ > 0. This is an attempt to remedy the affect,
//
without knowing why it happens.
this
->
ready_set_.rd_mask_.sync (
this
->
ready_set_.rd_mask_.max_set ());
this
->
ready_set_.wr_mask_.sync (
this
->
ready_set_.wr_mask_.max_set ());
this
->
ready_set_.ex_mask_.sync (
this
->
ready_set_.ex_mask_.max_set ());
}
return
this
->
wait_for_multiple_events (
this
->
ready_set_,
max_wait_time);
}
template
<
class
ACE_SELECT_REACTOR_TOKEN
>
int
ACE_Select_Reactor_T
<
ACE_SELECT_REACTOR_TOKEN
>
::wait_for_multiple_events
(ACE_Select_Reactor_Handle_Set
&
dispatch_set,
ACE_Time_Value
*
max_wait_time)
{
ACE_TRACE (
"
ACE_Select_Reactor_T::wait_for_multiple_events
"
);
u_long width
=
0
;
ACE_Time_Value timer_buf (
0
);
ACE_Time_Value
*
this_timeout;
int
number_of_active_handles
=
this
->
any_ready (dispatch_set);
//
If there are any bits enabled in the <ready_set_> then we'll
//
handle those first, otherwise we'll block in <select>.
if
(number_of_active_handles
==
0
)
{
do
{
this_timeout
=
this
->
timer_queue_
->
calculate_timeout (max_wait_time,
&
timer_buf);
width
=
(u_long)
this
->
handler_rep_.max_handlep1 ();
dispatch_set.rd_mask_
=
this
->
wait_set_.rd_mask_;
/**/
/*
***************wait_set_ ----> ready_set_*********
*/
dispatch_set.wr_mask_
=
this
->
wait_set_.wr_mask_;
dispatch_set.ex_mask_
=
this
->
wait_set_.ex_mask_;
number_of_active_handles
=
ACE_OS::select (
int
(width),
dispatch_set.rd_mask_,
dispatch_set.wr_mask_,
dispatch_set.ex_mask_,
this_timeout);
}
while
(number_of_active_handles
==
-
1
&&
this
->
handle_error ()
>
0
);
if
(number_of_active_handles
>
0
)
{
#if
!defined (ACE_WIN32)
//
Resynchronize the fd_sets so their "max" is set properly.
dispatch_set.rd_mask_.sync (
this
->
handler_rep_.max_handlep1 ());
dispatch_set.wr_mask_.sync (
this
->
handler_rep_.max_handlep1 ());
dispatch_set.ex_mask_.sync (
this
->
handler_rep_.max_handlep1 ());
#endif
/* ACE_WIN32 */
}
else
if
(number_of_active_handles
==
-
1
)
{
//
Normally, select() will reset the bits in dispatch_set
//
so that only those filed descriptors that are ready will
//
have bits set. However, when an error occurs, the bit
//
set remains as it was when the select call was first made.
//
Thus, we now have a dispatch_set that has every file
//
descriptor that was originally waited for, which is not
//
correct. We must clear all the bit sets because we
//
have no idea if any of the file descriptors is ready.
//
//
NOTE: We dont have a test case to reproduce this
//
problem. But pleae dont ignore this and remove it off.
dispatch_set.rd_mask_.reset ();
dispatch_set.wr_mask_.reset ();
dispatch_set.ex_mask_.reset ();
}
}
//
Return the number of events to dispatch.
return
number_of_active_handles;
}
int
ACE_TP_Reactor::handle_socket_events (
int
&
event_count,
ACE_TP_Token_Guard
&
guard)
{
//
We got the lock, lets handle some I/O events.
ACE_EH_Dispatch_Info dispatch_info;
this
->
get_socket_event_info (dispatch_info);
//
If there is any event handler that is ready to be dispatched, the
//
dispatch information is recorded in dispatch_info.
if
(
!
dispatch_info.dispatch ())
{
return
0
;
}
//
Suspend the handler so that other threads don't start dispatching
//
it.
//
//
NOTE: This check was performed in older versions of the
//
TP_Reactor. Looks like it is a waste..
if
(dispatch_info.event_handler_
!=
this
->
notify_handler_)
this
->
suspend_i (dispatch_info.handle_);
int
resume_flag
=
dispatch_info.event_handler_
->
resume_handler ();
int
reference_counting_required
=
dispatch_info.event_handler_
->
reference_counting_policy ().value ()
==
ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
//
Call add_reference() if needed.
if
(reference_counting_required)
{
dispatch_info.event_handler_
->
add_reference ();
}
//
Release the lock. Others threads can start waiting.
//
这里放锁
guard.release_token ();
int
result
=
0
;
//
If there was an event handler ready, dispatch it.
//
Decrement the event left
--
event_count;
//
Dispatched an event
if
(
this
->
dispatch_socket_event (dispatch_info)
==
0
)
++
result;
//
Resume handler if required.
if
(dispatch_info.event_handler_
!=
this
->
notify_handler_
&&
resume_flag
==
ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
this
->
resume_handler (dispatch_info.handle_);
//
Call remove_reference() if needed.
if
(reference_counting_required)
{
dispatch_info.event_handler_
->
remove_reference ();
}
return
result;
}
int
ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info
&
event
)
{
//
Nothing to dispatch yet
event
.reset ();
//
Check for dispatch in write, except, read. Only catch one, but if
//
one is caught, be sure to clear the handle from each mask in case
//
there is more than one mask set for it. This would cause problems
//
if the handler is suspended for dispatching, but its set bit in
//
another part of ready_set_ kept it from being dispatched.
int
found_io
=
0
;
ACE_HANDLE handle;
//
@@todo: We can do quite a bit of code reduction here. Let me get
//
it to work before I do this.
{
ACE_Handle_Set_Iterator handle_iter (
this
->
ready_set_.wr_mask_);
/**/
/*
**********这里是ready_set_**********
*/
什么时间被给值的呢?
while
(
!
found_io
&&
(handle
=
handle_iter ())
!=
ACE_INVALID_HANDLE)
{
if
(
this
->
is_suspended_i (handle))
continue
;
//
Remember this info
event
.
set
(handle,
this
->
handler_rep_.find (handle),
ACE_Event_Handler::WRITE_MASK,
&
ACE_Event_Handler::handle_output);
this
->
clear_handle_read_set (handle);
found_io
=
1
;
}
}
if
(
!
found_io)
{
ACE_Handle_Set_Iterator handle_iter (
this
->
ready_set_.ex_mask_);
while
(
!
found_io
&&
(handle
=
handle_iter ())
!=
ACE_INVALID_HANDLE)
{
if
(
this
->
is_suspended_i (handle))
continue
;
//
Remember this info
event
.
set
(handle,
this
->
handler_rep_.find (handle),
ACE_Event_Handler::EXCEPT_MASK,
&
ACE_Event_Handler::handle_exception);
this
->
clear_handle_read_set (handle);
found_io
=
1
;
}
}
if
(
!
found_io)
{
ACE_Handle_Set_Iterator handle_iter (
this
->
ready_set_.rd_mask_);
while
(
!
found_io
&&
(handle
=
handle_iter ())
!=
ACE_INVALID_HANDLE)
{
if
(
this
->
is_suspended_i (handle))
continue
;
//
Remember this info
event
.
set
(handle,
this
->
handler_rep_.find (handle),
ACE_Event_Handler::READ_MASK,
&
ACE_Event_Handler::handle_input);
this
->
clear_handle_read_set (handle);
found_io
=
1
;
}
}
return
found_io;
}
int
ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info
&
dispatch_info)
{
ACE_TRACE (
"
ACE_TP_Reactor::dispatch_socket_event
"
);
ACE_HANDLE handle
=
dispatch_info.handle_;
ACE_Event_Handler
*
event_handler
=
dispatch_info.event_handler_;
ACE_Reactor_Mask mask
=
dispatch_info.mask_;
ACE_EH_PTMF callback
=
dispatch_info.callback_;
//
Check for removed handlers.
if
(event_handler
==
0
)
return
-
1
;
//
Upcall. If the handler returns positive value (requesting a
//
reactor callback) don't set the ready-bit because it will be
//
ignored if the reactor state has changed. Just call back
//
as many times as the handler requests it. Other threads are off
//
handling other things.
int
status
=
1
;
while
(status
>
0
)
status
=
(event_handler
->*
callback) (handle);
//
If negative, remove from Reactor
if
(status
<
0
)
{
int
retval
=
this
->
remove_handler (handle, mask);
return
retval;
}
//
assert (status >= 0);
return
0
;
}
posted on 2007-02-25 20:41
一动不如一静
阅读(2248)
评论(0)
编辑
收藏
引用
所属分类:
ACE
只有注册用户
登录
后才能发表评论。
【推荐】100%开源!大型工业跨平台软件C++源码提供,建模,组态!
相关文章:
TP_Reactor 学习笔记
select_reactor的原码学习笔记
N个Reactor是否就意味着支持N*FD_SETSIZE个连接呢?
异步I/O
关于ACE_TP_Reactor
关于MPC的问题收集
网络编程卷2 ACE_WFMO_Reactor的并发考虑
ACE的相关的几个网址
非阻塞Acceptor的动机
error LNK2019:无法解析的外部符号
网站导航:
博客园
IT新闻
BlogJava
博问
Chat2DB
管理
Powered by:
C++博客
Copyright © 一动不如一静