http://blog.csdn.net/hzyong_c/article/details/8012963 1 /**
2 * threadpool.c
3 *
4 * This file will contain your implementation of a threadpool.
5 * 此文件包含线路池的具体实现
6 */
7
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <unistd.h>
11 #include <pthread.h>
12 #include <string.h>
13
14 #include "threadpool.h"
15
16 typedef struct _thread_st {
17 pthread_t id;
18 pthread_mutex_t mutex;
19 pthread_cond_t cond;
20 dispatch_fn fn;
21 void *arg;
22 threadpool parent;
23 } _thread;
24
25 // _threadpool is the internal threadpool structure that is
26 // cast to type "threadpool" before it given out to callers
27 // _threadpool是内部线程池结构,转换成类型“threadpool”在提交给使用者之前
28 typedef struct _threadpool_st {
29 // you should fill in this structure with whatever you need
30 pthread_mutex_t tp_mutex;
31 pthread_cond_t tp_idle;
32 pthread_cond_t tp_full;
33 pthread_cond_t tp_empty;
34 _thread ** tp_list;
35 int tp_index;
36 int tp_max_index;
37 int tp_stop;
38
39 int tp_total;
40 } _threadpool;
41
42 threadpool create_threadpool(int num_threads_in_pool)
43 {
44 _threadpool *pool;
45
46 // sanity check the argument
47 //参数检查
48 if ((num_threads_in_pool <= 0) || (num_threads_in_pool > MAXT_IN_POOL))
49 return NULL;
50
51 pool = (_threadpool *) malloc(sizeof(_threadpool));
52 if (pool == NULL) {
53 fprintf(stderr, "Out of memory creating a new threadpool! ");
54 return NULL;
55 }
56
57 // add your code here to initialize the newly created threadpool
58 pthread_mutex_init( &pool->tp_mutex, NULL );
59 pthread_cond_init( &pool->tp_idle, NULL );
60 pthread_cond_init( &pool->tp_full, NULL );
61 pthread_cond_init( &pool->tp_empty, NULL );
62 pool->tp_max_index = num_threads_in_pool;
63 pool->tp_index = 0;
64 pool->tp_stop = 0;
65 pool->tp_total = 0;
66 pool->tp_list = ( _thread ** )malloc( sizeof( void * ) * MAXT_IN_POOL );
67 memset( pool->tp_list, 0, sizeof( void * ) * MAXT_IN_POOL );
68
69 return (threadpool) pool;
70 }
71
72 int save_thread( _threadpool * pool, _thread * thread )
73 {
74 int ret = -1;
75
76 pthread_mutex_lock( &pool->tp_mutex );
77
78 if( pool->tp_index < pool->tp_max_index ) {
79 pool->tp_list[ pool->tp_index ] = thread;
80 pool->tp_index++;
81 ret = 0;
82
83 pthread_cond_signal( &pool->tp_idle );
84
85 if( pool->tp_index >= pool->tp_total ) {
86 pthread_cond_signal( &pool->tp_full );
87 }
88 }
89
90 pthread_mutex_unlock( &pool->tp_mutex );
91
92 return ret;
93 }
94
95 void * wrapper_fn( void * arg )
96 {
97 _thread * thread = (_thread*)arg;
98 _threadpool * pool = (_threadpool*)thread->parent;
99
100 for( ; 0 == ((_threadpool*)thread->parent)->tp_stop; ) {
101 thread->fn( thread->arg );
102
103 pthread_mutex_lock( &thread->mutex );
104 if( 0 == save_thread( thread->parent, thread ) ) {
105 pthread_cond_wait( &thread->cond, &thread->mutex );
106 pthread_mutex_unlock( &thread->mutex );
107 } else {
108 pthread_mutex_unlock( &thread->mutex );
109 pthread_cond_destroy( &thread->cond );
110 pthread_mutex_destroy( &thread->mutex );
111
112 free( thread );
113 break;
114 }
115 }
116
117 pthread_mutex_lock( &pool->tp_mutex );
118 pool->tp_total--;
119 if( pool->tp_total <= 0 ) pthread_cond_signal( &pool->tp_empty );
120 pthread_mutex_unlock( &pool->tp_mutex );
121
122 return NULL;
123 }
124
125 int dispatch_threadpool(threadpool from_me, dispatch_fn dispatch_to_here, void *arg)
126 {
127 int ret = 0;
128
129 _threadpool *pool = (_threadpool *) from_me;
130 pthread_attr_t attr;
131 _thread * thread = NULL;
132
133 // add your code here to dispatch a thread
134 pthread_mutex_lock( &pool->tp_mutex );
135
136 if( pool->tp_index <= 0 && pool->tp_total >= pool->tp_max_index ) {
137 pthread_cond_wait( &pool->tp_idle, &pool->tp_mutex );
138 }
139
140 if( pool->tp_index <= 0 ) {
141 _thread * thread = ( _thread * )malloc( sizeof( _thread ) );
142 thread->id = 0;
143 pthread_mutex_init( &thread->mutex, NULL );
144 pthread_cond_init( &thread->cond, NULL );
145 thread->fn = dispatch_to_here;
146 thread->arg = arg;
147 thread->parent = pool;
148
149 pthread_attr_init( &attr );
150 pthread_attr_setdetachstate( &attr,PTHREAD_CREATE_DETACHED );
151
152 if( 0 == pthread_create( &thread->id, &attr, wrapper_fn, thread ) ) {
153 pool->tp_total++;
154 printf( "create thread#%ld ", thread->id );
155 } else {
156 ret = -1;
157 printf( "cannot create thread " );
158 pthread_mutex_destroy( &thread->mutex );
159 pthread_cond_destroy( &thread->cond );
160 free( thread );
161 }
162 } else {
163 pool->tp_index--;
164 thread = pool->tp_list[ pool->tp_index ];
165 pool->tp_list[ pool->tp_index ] = NULL;
166
167 thread->fn = dispatch_to_here;
168 thread->arg = arg;
169 thread->parent = pool;
170
171 pthread_mutex_lock( &thread->mutex );
172 pthread_cond_signal( &thread->cond ) ;
173 pthread_mutex_unlock ( &thread->mutex );
174 }
175
176 pthread_mutex_unlock( &pool->tp_mutex );
177
178 return ret;
179 }
180
181 void destroy_threadpool(threadpool destroyme)
182 {
183 _threadpool *pool = (_threadpool *) destroyme;
184
185 // add your code here to kill a threadpool
186 int i = 0;
187
188 pthread_mutex_lock( &pool->tp_mutex );
189
190 if( pool->tp_index < pool->tp_total ) {
191 printf( "waiting for %d thread(s) to finish ", pool->tp_total - pool->tp_index );
192 pthread_cond_wait( &pool->tp_full, &pool->tp_mutex );
193 }
194
195 pool->tp_stop = 1;
196
197 for( i = 0; i < pool->tp_index; i++ ) {
198 _thread * thread = pool->tp_list[ i ];
199
200 pthread_mutex_lock( &thread->mutex );
201 pthread_cond_signal( &thread->cond ) ;
202 pthread_mutex_unlock ( &thread->mutex );
203 }
204
205 if( pool->tp_total > 0 ) {
206 printf( "waiting for %d thread(s) to exit ", pool->tp_total );
207 pthread_cond_wait( &pool->tp_empty, &pool->tp_mutex );
208 }
209
210 for( i = 0; i < pool->tp_index; i++ ) {
211 free( pool->tp_list[ i ] );
212 pool->tp_list[ i ] = NULL;
213 }
214
215 pthread_mutex_unlock( &pool->tp_mutex );
216
217 pool->tp_index = 0;
218
219 pthread_mutex_destroy( &pool->tp_mutex );
220 pthread_cond_destroy( &pool->tp_idle );
221 pthread_cond_destroy( &pool->tp_full );
222 pthread_cond_destroy( &pool->tp_empty );
223
224 free( pool->tp_list );
225 free( pool );
226 }
227 1 class RtThread
2 {
3 public:
4 /// RtThread state flags
5 enum State
6 {
7 RUNNING,
8 STOP,
9 SUSPEND
10 };
11
12 private:
13 #ifdef _WIN32
14 static DWORD WINAPI ThreadProc(void* args);
15 #else
16 static void* ThreadProc(void* args);
17 #endif
18
19 public:
20 /// Working in current thread
21 bool IsThread();
22
23 /// Current thread is running
24 bool IsRunning();
25
26 /// Get the m_isAutoDel field
27 bool IsAutoDelete();
28
29 /// Set the m_isAutoDel field
30 void SetAutoDelete(bool autoDel);
31
32 /// Exit the thread, this method is be called in the thread
33 //void Exit(int code);
34
35 ///
36 void Join();
37
38 /// Get current thread state
39 State GetState();
40
41 /// Get current thread handle
42 const RtThreadHandle& GetHandle() { return m_handle; }
43
44 static void Sleep(int sec);
45
46 public:
47 /// Start a thread
48 virtual void Start();
49
50 /// Stop a thread
51 virtual void Terminate(int code);
52
53 /// Suspend a thread
54 virtual void Suspend();
55
56 /// Resume a thread
57 virtual void Resume();
58
59 public:
60 void LockThread() { m_lock.Lock(); }
61
62 void UnlockThread() { m_lock.Unlock(); }
63
64 protected:
65 /// Initalize the thread when a thread is first created
66 virtual int OnInitThread();
67
68 /// Uninitalize the thread when a thread will exit
69 virtual int OnExitThread(int code);
70
71 /// This method must be overridden to privode
72 /// the thread main loop
73 virtual int OnRun();
74
75
76 public:
77 /// Consructor, initalize fields
78 RtThread();
79
80 /// Destructor, close thread if the thread is running
81 virtual ~RtThread();
82
83 private:
84 #ifdef _WIN32
85 /// Win32 thread id
86 DWORD m_id;
87 #endif
88
89 /// RtThread handle
90 RtThreadHandle m_handle;
91
92 /// Current thread state
93 State m_state;
94
95 /// Delete the instance automatic after the thread exit
96 bool m_isAutoDel;
97
98 /// RtThread state lock
99 RtThreadLock m_lock;
100
101 #ifndef _WIN32
102 ///
103 GWEvent m_suspend;
104 #endif // _WIN32
105 };
106
108
109 } // namespace rt2_coreinline RtThread::State RtThread::GetState()
{
RtThreadLock::AutoLock
lock(m_lock);
return m_state;
}
inline
bool RtThread::IsRunning()
{
RtThreadLock::AutoLock
lock(m_lock);
return m_state == RUNNING;
}
#ifdef _WIN32
inline
void RtThread::Join()
{
if (m_handle)
{
::WaitForSingleObject(m_handle, INFINITE);
::CloseHandle(m_handle);
}
}
inline
void RtThread::Sleep(
int sec)
{
::Sleep(sec * 1000);
}
#elseinline
void RtThread::Join()
{
if (m_handle)
{
pthread_join(m_handle, NULL);
}
}
inline
void RtThread::Sleep(
int sec)
{
::sleep(sec);
}
#endif
http://www.oschina.net/code/snippet_12_1321