随笔-167  评论-8  文章-0  trackbacks-0
这是一个基于ACE的线程库实现,属于半同步半异步类型的线程池,感觉实现得非常优雅,代码是由网上下的好几份代码拼凑而成的(ACE的源码包中的tests目录下有大量的实例,研究这些例子是学习ACE的好办法,只是由于注释都是一堆堆的英文,有时候感觉头疼,就懒得去看它了)。这个线程池由一个线程池管理器管理着五个线程来处理消息,当五个处理线程都在处理消息时,接收新的消息将导致线程管理器被阻塞。消息处理线程处理完发给自己的消息后将被阻塞,其将重新被管理器管理器放入队列中。越发感觉到ACE的强大,只可惜我们的程序用不上。一个原因是我们程序本身处理的数据量并不会太大;另外我们的程序只要求跑在Solaris上面,不会出现异构的平台;最后ACE库本身太繁杂了,很多地方比如网络相关的函数我们是不会用的,不过如果现在我们正在使用的网络库也使用ACE的话,那么使用ACE简直再好不过了。
#include "ace/OS.h"
#include 
"ace/Task.h"
#include  
"ace/Thread.h"
#include 
"ace/Synch.h"

class Worker;

class IManager
{
public:
    
virtual int return_to_work (Worker *worker) = 0;
};

class Worker : public ACE_Task<ACE_MT_SYNCH>
{
public:
    Worker (IManager 
*manager) : manager_(manager) { }

    
//线程启动之后进入本函数
    virtual int svc (void)
    {
        thread_id_ 
= ACE_Thread::self();

        
//工作线程启动之后只有收到MB_HANGUP类型的消息它才会退出
        while (1)
        {
            ACE_Message_Block 
*mb = 0;

            
//如果队列中没有数据,本线程将被阻塞
            if (this->getq(mb) == -1)
                ACE_ERROR_BREAK((LM_ERROR, ACE_TEXT (
"%p "), ACE_TEXT ("getq")));

            
// 如果是MB_HANGUP消息,就结束线程
            if (mb->msg_type() == ACE_Message_Block::MB_HANGUP)
            {
                ACE_DEBUG ((LM_INFO,
                    ACE_TEXT (
"(%t) Shutting down ")));
                    mb
->release();

                
break;
            }

            
// Process the message.
            process_message (mb);

            
// Return to work.
            
// 这里会将自己放到线程池中,并通过workers_cond_来通知manager
            this->manager_->return_to_work (this);
        }

        
return 0;
    }

    ACE_thread_t thread_id(
void)
    {
        
return thread_id_;
    }

private:

    
//处理消息
    void process_message (ACE_Message_Block *mb)
    {
        ACE_TRACE (ACE_TEXT (
"Worker::process_message"));

        
int msgId;

        ACE_OS::memcpy (
&msgId, mb->rd_ptr(), sizeof(int));

        mb
->release();

        ACE_DEBUG ((LM_DEBUG,
            ACE_TEXT (
"(%t) Started processing message %d "),
            msgId));

        ACE_OS::sleep(
3);

        ACE_DEBUG ((LM_DEBUG,
            ACE_TEXT (
"(%t) Finished processing message %d "),
            msgId));
    }

    
//指向线程池管理器
    IManager *manager_;

    
//保存本线程id号
    ACE_thread_t thread_id_;
};

class Manager : public ACE_Task<ACE_MT_SYNCH>public IManager
{
public:
    
enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};

    Manager ()
        : shutdown_(
0), workers_lock_(), workers_cond_(workers_lock_)
    {
        ACE_TRACE (ACE_TEXT (
"Manager::Manager"));
    }

    
/* 线程处理函数 */
    
int svc (void)
    {
        ACE_TRACE (ACE_TEXT (
"Manager::svc"));

        ACE_DEBUG ((LM_INFO, ACE_TEXT (
"(%t) Manager started ")));

        
// Create pool.
        create_worker_pool();

        
while (true)
        {
            ACE_Message_Block 
*mb = 0;
            ACE_Time_Value tv ((
long)MAX_TIMEOUT);
            tv 
+= ACE_OS::time (0);

            
// Get a message request.
            if (this->getq (mb, &tv) < 0)
            {
                shut_down ();
                
break;
            }

            
// Choose a worker.
            Worker *worker = 0;

            
/* 
            这对大括号中的代码从worker线程池中获取一个工作线程,线程池由
            this->workers_lock_互斥体加以保护,如果没有worker可用,manager
            会阻塞在workers_lock_条件变量上,等待某个线程回来工作
            
*/
            {
                ACE_GUARD_RETURN (ACE_Thread_Mutex,
                    worker_mon, 
this->workers_lock_, -1);

                
/* 
                阻塞在workers_lock_.wait()上直到有worker可用,当某个worker回来后
                会把自己放到线程池队列上,同时通过触发workers_cond_来通知manager
                
*/
                
while (this->workers_.is_empty ())
                    workers_cond_.wait ();

                
/* 将获取的worker从线程池队列中删除 */
                
this->workers_.dequeue_head (worker);
            }

            
// Ask the worker to do the job.
            
// 将请求消息放入到worker的消息队列中
            worker->putq (mb);
        }

        
return 0;
    }

    
int shut_down (void)
    {
         ACE_TRACE (ACE_TEXT (
"ACE_ThreadPool::DestroyPool"));

         ACE_Unbounded_Queue
<Worker* >::ITERATOR iter = this->workers_.begin();

        Worker
** worker_ptr = NULL;

         
do
        {
            iter.next (worker_ptr);

            Worker 
*worker = (*worker_ptr);

            
// Send the hangup message.
            ACE_Message_Block *mb;
            ACE_NEW_RETURN(
                mb,
                ACE_Message_Block(
0,
                ACE_Message_Block::MB_HANGUP),
                
-1);

            worker
->putq(mb);

            
// Wait for the exit.
            worker->wait();

            ACE_ASSERT (worker
->msg_queue()->is_empty ());

            delete worker;
         }
while (iter.advance());

         
return 0;
    };

    ACE_thread_t thread_id (Worker 
*worker);

    
/* 提供给worker的接口,用于在worker完成处理后,将自己放入到线程池队列,并通知manager */
    
virtual int return_to_work (Worker *worker)
    {
        ACE_GUARD_RETURN (ACE_Thread_Mutex,
            worker_mon, 
this->workers_lock_, -1);

        ACE_DEBUG ((LM_DEBUG,
            ACE_TEXT (
"(%t) Worker %u returning to work. "),
            worker
->thr_mgr()->thr_self()));

        
// 将worker放入到线程池队列
        this->workers_.enqueue_tail (worker);

        
// 触发条件变量,通知manager
        this->workers_cond_.signal ();

        
return 0;
    }

private:
// 创建worker线程池
    int create_worker_pool (void)
    {
        ACE_GUARD_RETURN (ACE_Thread_Mutex,
            worker_mon,
            
this->workers_lock_,
            
-1);

        
for (int i = 0; i < POOL_SIZE; i++)
        {
            Worker 
*worker;

            
// 创建worker
            ACE_NEW_RETURN (worker, Worker (this), -1);

            
// 放入线程池队列
            this->workers_.enqueue_tail (worker);

            
// 激活线程,调用该函数后,worker线程被创建,由于worker
            
// 是ACE_Task的子类,线程激活后,从svc函数开始执行
            worker->activate ();
        }

        
return 0;
    }

private:
    
int shutdown_;

    
/* workers_lock_ 线程池队列的互斥体,在对线程池进行操作时,需要通过互斥锁来保护
    所以在所有的线程池队列队列操作前都有这样的语句:
        ACE_GUARD_RETURN (ACE_Thread_Mutex,
        worker_mon, this->workers_lock_, -1);
        
*/
    ACE_Thread_Mutex workers_lock_;
    ACE_Condition
<ACE_Thread_Mutex> workers_cond_;

    
/* 线程池队列 */
    ACE_Unbounded_Queue
<Worker* > workers_;
};

int ACE_TMAIN (int, ACE_TCHAR *[])
{
    Manager tp;
    tp.activate ();

    
// Wait for a moment every time you send a message.
    ACE_Time_Value tv;
    tv.msec (
100);

    ACE_Message_Block 
*mb;
    
for (int i = 0; i < 10; i++)
    {
        ACE_NEW_RETURN(mb, ACE_Message_Block(
sizeof(int)), -1);

        ACE_OS::memcpy (mb
->wr_ptr(), &i, sizeof(int));

        ACE_OS::sleep(tv);

        
// Add a new work item.
        
// 这里将请求消息首先发到了manager线程,由manager线程负责分发
        tp.putq (mb);
    }

    
// 主线程等待子线程结束
    ACE_Thread_Manager::instance()->wait();

    
return 0;
}


posted on 2009-11-02 18:13 老马驿站 阅读(2174) 评论(0)  编辑 收藏 引用 所属分类: ACE