lua

thead

http://blog.csdn.net/hzyong_c/article/details/8012963   
  1 /**
  2  * threadpool.c
  3  *
  4  * This file will contain your implementation of a threadpool.
  5  * 此文件包含线路池的具体实现
  6  */
  7 
  8 #include <stdio.h>
  9 #include <stdlib.h>
 10 #include <unistd.h>
 11 #include <pthread.h>
 12 #include <string.h>
 13 
 14 #include "threadpool.h"
 15 
 16 typedef struct _thread_st {
 17     pthread_t id;
 18     pthread_mutex_t mutex;
 19     pthread_cond_t cond;
 20     dispatch_fn fn;
 21     void *arg;
 22     threadpool parent;
 23 } _thread;
 24 
 25 // _threadpool is the internal threadpool structure that is
 26 // cast to type "threadpool" before it given out to callers
 27 // _threadpool是内部线程池结构,转换成类型“threadpool”在提交给使用者之前
 28 typedef struct _threadpool_st {
 29     // you should fill in this structure with whatever you need
 30     pthread_mutex_t tp_mutex;
 31     pthread_cond_t tp_idle;
 32     pthread_cond_t tp_full;
 33     pthread_cond_t tp_empty;
 34     _thread ** tp_list;
 35     int tp_index;
 36     int tp_max_index;
 37     int tp_stop;
 38 
 39     int tp_total;
 40 } _threadpool;
 41 
 42 threadpool create_threadpool(int num_threads_in_pool)
 43 {
 44     _threadpool *pool;
 45 
 46     // sanity check the argument
 47         //参数检查
 48     if ((num_threads_in_pool <= 0) || (num_threads_in_pool > MAXT_IN_POOL))
 49         return NULL;
 50 
 51     pool = (_threadpool *) malloc(sizeof(_threadpool));
 52     if (pool == NULL) {
 53         fprintf(stderr, "Out of memory creating a new threadpool! ");
 54         return NULL;
 55     }
 56 
 57     // add your code here to initialize the newly created threadpool
 58     pthread_mutex_init( &pool->tp_mutex, NULL );
 59     pthread_cond_init( &pool->tp_idle, NULL );
 60     pthread_cond_init( &pool->tp_full, NULL );
 61     pthread_cond_init( &pool->tp_empty, NULL );
 62     pool->tp_max_index = num_threads_in_pool;
 63     pool->tp_index = 0;
 64     pool->tp_stop = 0;
 65     pool->tp_total = 0;
 66     pool->tp_list = ( _thread ** )malloc( sizeofvoid * ) * MAXT_IN_POOL );
 67     memset( pool->tp_list, 0, sizeofvoid * ) * MAXT_IN_POOL );
 68 
 69     return (threadpool) pool;
 70 }
 71 
 72 int save_thread( _threadpool * pool, _thread * thread )
 73 {
 74     int ret = -1;
 75 
 76     pthread_mutex_lock( &pool->tp_mutex );
 77 
 78     if( pool->tp_index < pool->tp_max_index ) {
 79         pool->tp_list[ pool->tp_index ] = thread;
 80         pool->tp_index++;
 81         ret = 0;
 82 
 83         pthread_cond_signal( &pool->tp_idle );
 84 
 85         if( pool->tp_index >= pool->tp_total ) {
 86             pthread_cond_signal( &pool->tp_full );
 87         }
 88     }
 89 
 90     pthread_mutex_unlock( &pool->tp_mutex );
 91 
 92     return ret;
 93 }
 94 
 95 void * wrapper_fn( void * arg )
 96 {
 97     _thread * thread = (_thread*)arg;
 98     _threadpool * pool = (_threadpool*)thread->parent;
 99 
100     for( ; 0 == ((_threadpool*)thread->parent)->tp_stop; ) {
101         thread->fn( thread->arg );
102 
103         pthread_mutex_lock( &thread->mutex );
104         if( 0 == save_thread( thread->parent, thread ) ) {
105             pthread_cond_wait( &thread->cond, &thread->mutex );
106             pthread_mutex_unlock( &thread->mutex );
107         } else {
108             pthread_mutex_unlock( &thread->mutex );
109             pthread_cond_destroy( &thread->cond );
110             pthread_mutex_destroy( &thread->mutex );
111 
112             free( thread );
113             break;
114         }
115     }
116 
117     pthread_mutex_lock( &pool->tp_mutex );
118     pool->tp_total--;
119     if( pool->tp_total <= 0 ) pthread_cond_signal( &pool->tp_empty );
120     pthread_mutex_unlock( &pool->tp_mutex );
121 
122     return NULL;
123 }
124 
125 int dispatch_threadpool(threadpool from_me, dispatch_fn dispatch_to_here, void *arg)
126 {
127     int ret = 0;
128 
129         _threadpool *pool = (_threadpool *) from_me;
130     pthread_attr_t attr;
131     _thread * thread = NULL;
132 
133     // add your code here to dispatch a thread
134     pthread_mutex_lock( &pool->tp_mutex );
135 
136     if( pool->tp_index <= 0 && pool->tp_total >= pool->tp_max_index ) {
137         pthread_cond_wait( &pool->tp_idle, &pool->tp_mutex );
138     }
139 
140     if( pool->tp_index <= 0 ) {
141         _thread * thread = ( _thread * )malloc( sizeof( _thread ) );
142         thread->id = 0;
143         pthread_mutex_init( &thread->mutex, NULL );
144         pthread_cond_init( &thread->cond, NULL );
145         thread->fn = dispatch_to_here;
146         thread->arg = arg;
147         thread->parent = pool;
148 
149         pthread_attr_init( &attr );
150         pthread_attr_setdetachstate( &attr,PTHREAD_CREATE_DETACHED );
151 
152         if( 0 == pthread_create( &thread->id, &attr, wrapper_fn, thread ) ) {
153             pool->tp_total++;
154             printf( "create thread#%ld ", thread->id );
155         } else {
156             ret = -1;
157             printf( "cannot create thread " );
158             pthread_mutex_destroy( &thread->mutex );
159             pthread_cond_destroy( &thread->cond );
160             free( thread );
161         }
162     } else {
163         pool->tp_index--;
164         thread = pool->tp_list[ pool->tp_index ];
165         pool->tp_list[ pool->tp_index ] = NULL;
166 
167         thread->fn = dispatch_to_here;
168         thread->arg = arg;
169         thread->parent = pool;
170 
171         pthread_mutex_lock( &thread->mutex );
172         pthread_cond_signal( &thread->cond ) ;
173         pthread_mutex_unlock ( &thread->mutex );
174     }
175 
176     pthread_mutex_unlock( &pool->tp_mutex );
177 
178     return ret;
179 }
180 
181 void destroy_threadpool(threadpool destroyme)
182 {
183     _threadpool *pool = (_threadpool *) destroyme;
184 
185     // add your code here to kill a threadpool
186     int i = 0;
187 
188     pthread_mutex_lock( &pool->tp_mutex );
189 
190     if( pool->tp_index < pool->tp_total ) {
191         printf( "waiting for %d thread(s) to finish ", pool->tp_total - pool->tp_index );
192         pthread_cond_wait( &pool->tp_full, &pool->tp_mutex );
193     }
194 
195     pool->tp_stop = 1;
196 
197     for( i = 0; i < pool->tp_index; i++ ) {
198         _thread * thread = pool->tp_list[ i ];
199 
200         pthread_mutex_lock( &thread->mutex );
201         pthread_cond_signal( &thread->cond ) ;
202         pthread_mutex_unlock ( &thread->mutex );
203     }
204 
205     if( pool->tp_total > 0 ) {
206         printf( "waiting for %d thread(s) to exit ", pool->tp_total );
207         pthread_cond_wait( &pool->tp_empty, &pool->tp_mutex );
208     }
209 
210     for( i = 0; i < pool->tp_index; i++ ) {
211         free( pool->tp_list[ i ] );
212         pool->tp_list[ i ] = NULL;
213     }
214 
215     pthread_mutex_unlock( &pool->tp_mutex );
216 
217     pool->tp_index = 0;
218 
219     pthread_mutex_destroy( &pool->tp_mutex );
220     pthread_cond_destroy( &pool->tp_idle );
221     pthread_cond_destroy( &pool->tp_full );
222     pthread_cond_destroy( &pool->tp_empty );
223 
224     free( pool->tp_list );
225     free( pool );
226 }
227   1 class RtThread
  2 {
  3 public:
  4     /// RtThread state flags    
  5     enum State
  6     {        
  7         RUNNING,        
  8         STOP,        
  9         SUSPEND
 10     };
 11 
 12 private:
 13 #ifdef _WIN32    
 14     static DWORD WINAPI    ThreadProc(void* args);
 15 #else
 16     static void*        ThreadProc(void* args);
 17 #endif
 18 
 19 public:
 20     /// Working in current thread    
 21     bool                IsThread();
 22 
 23     /// Current thread is running    
 24     bool                IsRunning();
 25 
 26     /// Get the m_isAutoDel field    
 27     bool                IsAutoDelete();
 28 
 29     /// Set the m_isAutoDel field    
 30     void                SetAutoDelete(bool autoDel);
 31 
 32     /// Exit the thread, this method is be called in the thread    
 33     //void                Exit(int code);
 34 
 35     ///
 36     void                Join();
 37 
 38     /// Get current thread state    
 39     State                 GetState();
 40 
 41     /// Get current thread handle
 42     const RtThreadHandle&    GetHandle() { return m_handle; }
 43 
 44     static void            Sleep(int sec);    
 45 
 46 public:
 47     /// Start a thread    
 48     virtual void         Start();
 49 
 50     /// Stop a thread
 51     virtual void         Terminate(int code);
 52 
 53     /// Suspend a thread
 54     virtual void         Suspend();
 55 
 56     /// Resume a thread    
 57     virtual void         Resume();
 58 
 59 public:
 60     void                LockThread() { m_lock.Lock(); }
 61 
 62     void                UnlockThread() { m_lock.Unlock(); }
 63 
 64 protected:
 65     /// Initalize the thread when a thread is first created    
 66     virtual int            OnInitThread();
 67 
 68     /// Uninitalize the thread when a thread will exit    
 69     virtual int            OnExitThread(int code);
 70 
 71     /// This method must be overridden to privode
 72     /// the thread main loop    
 73     virtual int         OnRun();    
 74     
 75 
 76 public:
 77     /// Consructor, initalize fields
 78     RtThread();
 79 
 80     /// Destructor, close thread if the thread is running
 81     virtual ~RtThread();
 82 
 83 private:
 84 #ifdef _WIN32
 85     /// Win32 thread id
 86     DWORD                m_id;
 87 #endif
 88 
 89     /// RtThread handle
 90     RtThreadHandle        m_handle;    
 91 
 92     /// Current thread state
 93     State                m_state;
 94 
 95     /// Delete the instance automatic after the thread exit
 96     bool                m_isAutoDel;
 97 
 98     /// RtThread state lock
 99     RtThreadLock        m_lock;
100 
101 #ifndef _WIN32
102     ///    
103     GWEvent                m_suspend;
104 #endif // _WIN32
105 };
106 
108 
109 } // namespace rt2_coreinline RtThread::State RtThread::GetState()
{
    RtThreadLock::AutoLock lock(m_lock);

    return m_state;
}

inline bool RtThread::IsRunning()
{
    RtThreadLock::AutoLock lock(m_lock);

    return m_state == RUNNING;
}

#ifdef _WIN32
inline void RtThread::Join()
{
    if (m_handle)
    {
        ::WaitForSingleObject(m_handle, INFINITE);
        ::CloseHandle(m_handle);
    }
}

inline void RtThread::Sleep(int sec)
{
    ::Sleep(sec * 1000);
}
#else
inline void RtThread::Join()
{
    if (m_handle)
    {
        pthread_join(m_handle, NULL);
    }
}

inline void RtThread::Sleep(int sec)
{
    ::sleep(sec);
}
#endif

http://www.oschina.net/code/snippet_12_1321

posted on 2013-09-29 23:31 chib 阅读(482) 评论(0)  编辑 收藏 引用


只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理


<2009年5月>
262728293012
3456789
10111213141516
17181920212223
24252627282930
31123456

导航

统计

常用链接

留言簿(1)

随笔档案

牛人录

时政史料

投资管理

源码库

搜索

最新评论

阅读排行榜

评论排行榜