头文件
1#include <ace/Thread_Mutex.h>
2#include <ace/Guard_T.h>
3#include <ace/Svc_Handler.h>
4#include <ace/Condition_T.h>
5#include <ace/Atomic_Op.h>
6
7#include <queue>
8
9// ------------------------------------------------------------------------------------------------------------------------
10
11
12class Svc_Thread_Pool : public ACE_Task_Base
13{
14public:
15
16 enum
17 {
18 DEFAULT_POOL_SIZE = 8,
19 MAX_TIMEOUT = 5,
20 };
21
22
23 Svc_Thread_Pool ();
24
25 ~Svc_Thread_Pool ();
26
27 int open (int pool_size = DEFAULT_POOL_SIZE);
28
29 virtual int open (void *args)
30 {
31 return ACE_Task_Base::open(args);
32 };
33
34 int svc ();
35
36 int add_to_pool (ACE_Task_Base * tb);
37
38 bool done ()
39 {
40 return (_shutdown == 0) ? false : true;
41 };
42
43 void shutdown ();
44
45protected:
46 ACE_Thread_Mutex _workers_lock;
47 ACE_Condition<ACE_Thread_Mutex> _cond;
48 std::queue<ACE_Task_Base *> _workers_queue;
49 ACE_Atomic_Op<ACE_Thread_Mutex, int> _shutdown;
50};
51
52// ------------------------------------------------------------------------------------------------------------------------
53
实现
1
2
3// ------------------------------------------------------------------------------------------------------------------------
4
5Svc_Thread_Pool::Svc_Thread_Pool ()
6 : ACE_Task_Base(ACE_Thread_Manager::instance ()),
7 _workers_lock(), //_cond_lock(),
8 _cond (/**//*_cond_lock*/_workers_lock),
9 _shutdown (0)
10{
11}
12
13// ------------------------------------------------------------------------------------------------------------------------
14
15Svc_Thread_Pool::~Svc_Thread_Pool ()
16{
17}
18
19// ------------------------------------------------------------------------------------------------------------------------
20
21int Svc_Thread_Pool::open (int pool_size)
22{
23 return activate (THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, pool_size);
24}
25
26// ------------------------------------------------------------------------------------------------------------------------
27
28int Svc_Thread_Pool::svc ()
29{
30 ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Svc_Thread_Pool svc() started \n")));
31
32 while (!done ()) {
33 // 锁定队列并获取一个 ACE_Task_Base *。
34
35 ACE_Task_Base * tb = 0;
36 assert (_workers_lock.acquire () == 0);
37 if (_workers_queue.size () != 0) {
38 tb = _workers_queue.front ();
39 _workers_queue.pop ();
40 // 解锁 队列
41 assert (_workers_lock.release () == 0);
42
43 // 执行 ACE_Task_Base::svc ()
44 if (tb) {
45 // svc 中应该自己解决自己数据的同步问题。并且自己加入 Svc_Thread_Pool 中
46 // ACE_Task_Base 应该自己保存 Svc_Thread_Pool 的对象指针。
47 tb->svc ();
48 }
49 }
50
51 else {
52 // 解锁 队列
53 assert (_cond.wait () == 0);
54 assert (_workers_lock.release () == 0);
55
56
57 ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Svc_Thread_Pool recv msg \n")));
58 }
59 }
60
61 ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Svc_Thread_Pool svc() end !!\n")));
62
63 return 0;
64}
65
66// ------------------------------------------------------------------------------------------------------------------------
67
68int Svc_Thread_Pool::add_to_pool (ACE_Task_Base * tb)
69{
70 if (_shutdown != 0) {
71 return -1;
72 }
73 // 锁定队列,并加入工作者线程
74 assert (_workers_lock.acquire () == 0);
75 _workers_queue.push (tb);
76 // 通知
77 assert (_cond.signal () == 0);
78 // 解锁队列
79 assert (_workers_lock.release () == 0);
80
81 printf ("add_to_pool\n");
82 return 0;
83}
84
85// ------------------------------------------------------------------------------------------------------------------------
86
87void Svc_Thread_Pool::shutdown ()
88{
89 _shutdown = 1;
90 assert (_workers_lock.acquire () == 0); // lock queue
91 while (_workers_queue.size () != 0) {
92 _workers_queue.pop ();
93 }
94 assert (_cond.broadcast () == 0);
95 assert (_workers_lock.release () == 0); // unlock queue
96
97
98
99 ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t)broadcast\n")));
100
101 //assert (_workers_lock.acquire () == 0);
102 //assert (_cond.broadcast () == 0);
103 //assert (_workers_lock.release () == 0);
104
105
106 this->wait();
107 printf ("shutdown\n");
108}
109
110// ------------------------------------------------------------------------------------------------------------------------
111
测试
1
2#include "../cmn/extend_ace/Svc_Thread_Pool.h"
3
4Svc_Thread_Pool tp;
5
6class My_Task : public ACE_Task_Base
7{
8public:
9 My_Task ()
10 {
11 j = i ++;
12 }
13
14 int svc ()
15 {
16 // 先锁定 handle_close ().禁止svc的时候被reactor handle_close (),或者禁止调用handle_close,让他在svc中自己调用。
17 // lock
18 //ACE_OS::sleep (0);
19 printf ("%d: thread: %d\n", j, ACE_Thread::self ());
20 //tp.add_to_pool (this);
21 return 0;
22 // unlock
23 }
24 static int i;
25 int j;
26};
27
28int My_Task::i = 0;
29
30int test ()
31{
32 tp.open (8);
33
34 ACE_OS::sleep (3);
35 My_Task mt1, mt2, mt3, mt4, mt5, mt6, mt7, mt8, mt9, mt10;
36
37 tp.add_to_pool (&mt1);
38 tp.add_to_pool (&mt2);
39 tp.add_to_pool (&mt3);
40 tp.add_to_pool (&mt4);
41 tp.add_to_pool (&mt5);
42 tp.add_to_pool (&mt6);
43 tp.add_to_pool (&mt7);
44 tp.add_to_pool (&mt8);
45 tp.add_to_pool (&mt9);
46 tp.add_to_pool (&mt10);
47
48
49
50 ACE_OS::sleep (6);
51 tp.shutdown ();
52 //ACE_OS::sleep (5);
53
54 //tp.wait();
55 system ("pause");
56 return 0;
57}
58
posted on 2009-06-12 23:21
风化血 阅读(2202)
评论(0) 编辑 收藏 引用