头文件
1
#include <ace/Thread_Mutex.h>
2
#include <ace/Guard_T.h>
3
#include <ace/Svc_Handler.h>
4
#include <ace/Condition_T.h>
5
#include <ace/Atomic_Op.h>
6
7
#include <queue>
8
9
// ------------------------------------------------------------------------------------------------------------------------
10
11
12
class Svc_Thread_Pool : public ACE_Task_Base
13

{
14
public:
15
16
enum
17
{
18
DEFAULT_POOL_SIZE = 8,
19
MAX_TIMEOUT = 5,
20
};
21
22
23
Svc_Thread_Pool ();
24
25
~Svc_Thread_Pool ();
26
27
int open (int pool_size = DEFAULT_POOL_SIZE);
28
29
virtual int open (void *args)
30
{
31
return ACE_Task_Base::open(args);
32
};
33
34
int svc ();
35
36
int add_to_pool (ACE_Task_Base * tb);
37
38
bool done ()
39
{
40
return (_shutdown == 0) ? false : true;
41
};
42
43
void shutdown ();
44
45
protected:
46
ACE_Thread_Mutex _workers_lock;
47
ACE_Condition<ACE_Thread_Mutex> _cond;
48
std::queue<ACE_Task_Base *> _workers_queue;
49
ACE_Atomic_Op<ACE_Thread_Mutex, int> _shutdown;
50
};
51
52
// ------------------------------------------------------------------------------------------------------------------------
53
实现
1
2
3
// ------------------------------------------------------------------------------------------------------------------------
4
5
Svc_Thread_Pool::Svc_Thread_Pool ()
6
: ACE_Task_Base(ACE_Thread_Manager::instance ()),
7
_workers_lock(), //_cond_lock(),
8
_cond (/**//*_cond_lock*/_workers_lock),
9
_shutdown (0)
10

{
11
}
12
13
// ------------------------------------------------------------------------------------------------------------------------
14
15
Svc_Thread_Pool::~Svc_Thread_Pool ()
16

{
17
}
18
19
// ------------------------------------------------------------------------------------------------------------------------
20
21
int Svc_Thread_Pool::open (int pool_size)
22

{
23
return activate (THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, pool_size);
24
}
25
26
// ------------------------------------------------------------------------------------------------------------------------
27
28
int Svc_Thread_Pool::svc ()
29

{
30
ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Svc_Thread_Pool svc() started \n")));
31
32
while (!done ())
{
33
// 锁定队列并获取一个 ACE_Task_Base *。
34
35
ACE_Task_Base * tb = 0;
36
assert (_workers_lock.acquire () == 0);
37
if (_workers_queue.size () != 0)
{
38
tb = _workers_queue.front ();
39
_workers_queue.pop ();
40
// 解锁 队列
41
assert (_workers_lock.release () == 0);
42
43
// 执行 ACE_Task_Base::svc ()
44
if (tb)
{
45
// svc 中应该自己解决自己数据的同步问题。并且自己加入 Svc_Thread_Pool 中
46
// ACE_Task_Base 应该自己保存 Svc_Thread_Pool 的对象指针。
47
tb->svc ();
48
}
49
}
50
51
else
{
52
// 解锁 队列
53
assert (_cond.wait () == 0);
54
assert (_workers_lock.release () == 0);
55
56
57
ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Svc_Thread_Pool recv msg \n")));
58
}
59
}
60
61
ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Svc_Thread_Pool svc() end !!\n")));
62
63
return 0;
64
}
65
66
// ------------------------------------------------------------------------------------------------------------------------
67
68
int Svc_Thread_Pool::add_to_pool (ACE_Task_Base * tb)
69

{
70
if (_shutdown != 0)
{
71
return -1;
72
}
73
// 锁定队列,并加入工作者线程
74
assert (_workers_lock.acquire () == 0);
75
_workers_queue.push (tb);
76
// 通知
77
assert (_cond.signal () == 0);
78
// 解锁队列
79
assert (_workers_lock.release () == 0);
80
81
printf ("add_to_pool\n");
82
return 0;
83
}
84
85
// ------------------------------------------------------------------------------------------------------------------------
86
87
void Svc_Thread_Pool::shutdown ()
88

{
89
_shutdown = 1;
90
assert (_workers_lock.acquire () == 0); // lock queue
91
while (_workers_queue.size () != 0)
{
92
_workers_queue.pop ();
93
}
94
assert (_cond.broadcast () == 0);
95
assert (_workers_lock.release () == 0); // unlock queue
96
97
98
99
ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t)broadcast\n")));
100
101
//assert (_workers_lock.acquire () == 0);
102
//assert (_cond.broadcast () == 0);
103
//assert (_workers_lock.release () == 0);
104
105
106
this->wait();
107
printf ("shutdown\n");
108
}
109
110
// ------------------------------------------------------------------------------------------------------------------------
111
测试
1
2
#include "../cmn/extend_ace/Svc_Thread_Pool.h"
3
4
Svc_Thread_Pool tp;
5
6
class My_Task : public ACE_Task_Base
7

{
8
public:
9
My_Task ()
10
{
11
j = i ++;
12
}
13
14
int svc ()
15
{
16
// 先锁定 handle_close ().禁止svc的时候被reactor handle_close (),或者禁止调用handle_close,让他在svc中自己调用。
17
// lock
18
//ACE_OS::sleep (0);
19
printf ("%d: thread: %d\n", j, ACE_Thread::self ());
20
//tp.add_to_pool (this);
21
return 0;
22
// unlock
23
}
24
static int i;
25
int j;
26
};
27
28
int My_Task::i = 0;
29
30
int test ()
31

{
32
tp.open (8);
33
34
ACE_OS::sleep (3);
35
My_Task mt1, mt2, mt3, mt4, mt5, mt6, mt7, mt8, mt9, mt10;
36
37
tp.add_to_pool (&mt1);
38
tp.add_to_pool (&mt2);
39
tp.add_to_pool (&mt3);
40
tp.add_to_pool (&mt4);
41
tp.add_to_pool (&mt5);
42
tp.add_to_pool (&mt6);
43
tp.add_to_pool (&mt7);
44
tp.add_to_pool (&mt8);
45
tp.add_to_pool (&mt9);
46
tp.add_to_pool (&mt10);
47
48
49
50
ACE_OS::sleep (6);
51
tp.shutdown ();
52
//ACE_OS::sleep (5);
53
54
//tp.wait();
55
system ("pause");
56
return 0;
57
}
58
posted on 2009-06-12 23:21
风化血 阅读(2209)
评论(0) 编辑 收藏 引用