勤能补拙,Expter

成都游戏Coder,记录游戏开发过程的笔记和心得!

一个简单线程池的实现

        以前写线程池是在网络编程的时候,一个线程池处理一个网络套接字,随着连接的增多,效率很低,
        最近主要是在封装一个ipc程序(进程间通信机制) 主要
    涉及的技术:  
    Winsock, 线程池

    因为光是基于线程池的技术效率还是很低,打算重新把其代码整理重新封装通过,在此基础上通过完成端口来封装一个简单的高并发服务器。

    可能涉及的技术
    Winsock:   windows网络通信
    完成端口:   Windows上服务器的大规模连接机制。
    线程池:    高效高利用率的线程机制。

        本文主要实现一个线程池的例子,从基本原理入手,一个线程池会记录每个线程的信息,以及每个线程的处理。
         一般一个简单线程池至少包含下列组成部分。

         1.线程池管理器(ThreadPoolManager):用于创建并管理线程池 
         2.工作线程(WorkThread): 线程池中线程 
         3.任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
         4.任务队列:用于存放没有处理的任务。提供一种缓冲机制。 
         
       

  1#pragma once
  2
  3#include "lock.h"
  4
  5/// 线程池
  6namespace tp_ipc_peer_namespace
  7{
  8    /// 接口
  9    class task_object
 10    {
 11    public:
 12        virtual ~task_object(){}
 13        virtual void exec() = 0;
 14    }
;
 15
 16    template< typename Functor>
 17    class task 
 18        : public task_object
 19    {
 20
 21        /// 禁止操作
 22    private:
 23        task( const task &);
 24        task & operator =const task & );
 25
 26    public:
 27        typedef Functor functor_type;
 28
 29        task( const functor_type & functor)
 30            : functor_( functor )
 31        { }
 32
 33        /// 执行
 34        virtual void exec()
 35        {
 36            functor_();
 37        }

 38
 39    private:
 40        Functor functor_;
 41        
 42    }
;
 43
 44    class ctpool
 45    {
 46        typedef ctpool self_type;
 47        
 48    public:
 49        ctpool(void)
 50            :tpool_running_( true )
 51        
 52            _m_start_threads( 1 );
 53        }

 54        ctpool ( unsigned threadsize )
 55            :tpool_running_(true)
 56        {
 57            _m_start_threads( threadsize );
 58        }

 59
 60        template< typename Function>
 61        void push( const Function & f)
 62        {
 63            /// 枷锁
 64            task_lock_.enter();
 65
 66            task_container_.push_back( new tp_ipc_peer_namespace::task<Function>( f ) );
 67            
 68            task_lock_.leave();
 69
 70        }

 71        
 72        ~ctpool(void){}
 73
 74    private:
 75
 76        /// 创建线程池
 77        void _m_start_threads( unsigned size )
 78        {
 79            if ( size == 0 )
 80                size = 4;
 81
 82            for ( unsigned i = 0 ; i < size ; i++)
 83            {
 84                tinfo_type tinfo;
 85                tinfo.state = 0;
 86                tinfo.handle = (HANDLE)::_beginthreadex( 0 , 0 , _m_work_thread , NULL , NULL ,&(tinfo.tid) );
 87                threads_.push_back(  tinfo );
 88            }

 89        }

 90        
 91        /// 唤醒
 92        void _m_wakeup()
 93        {
 94            HANDLE handle = 0;
 95
 96            /// 对共享区 枷锁
 97            tlock_.enter();
 98            std::vector<tinfo_type>::iterator it = threads_.begin(), end = threads_.end();
 99
100            for ( ; it != end ; ++it )
101            {
102                if ( it->state == 0 ) /// 在等待状态
103                {
104                    handle      =  it->handle ;
105                    it->state = 1;
106                    break;
107                }

108            }

109            tlock_.leave();
110
111            while ( ::ResumeThread( handle ) != 1)
112                ;
113        }

114
115        /// 挂起某个线程
116        void _m_suspend()
117        {
118            unsigned tid = ::GetCurrentThreadId();
119            HANDLE   handle = 0;
120
121            tlock_.enter();
122
123            /// 对共享区 枷锁
124            tlock_.enter();
125            std::vector<tinfo_type>::iterator it = threads_.begin(), end = threads_.end();
126
127            for ( ; it != end ; ++it )
128            {
129                if ( it->tid == tid ) /// 运行ID
130                {
131                    handle      =  it->handle ;
132                    it->state = 0;
133                    break;
134                }

135            }

136            tlock_.leave();
137
138            /// 挂起
139            if ( handle)
140            {
141                ::SuspendThread( handle );
142            }

143        }

144
145        /// 获取task
146        tp_ipc_peer_namespace::task_object * _m_read_task()
147        {
148            while( tpool_running_ )
149            {
150                tp_ipc_peer_namespace::task_object * task = NULL;
151                
152                /// 对共享区 枷锁
153                task_lock_.enter();
154                if ( task_container_.size() )
155                {
156                    task = *( task_container_.begin() );
157                    task_container_.erase( task_container_.begin() );
158                }

159                task_lock_.leave();
160
161                if ( task )
162                {
163                    return task;
164                }

165                else
166                    _m_suspend();
167            }

168            return NULL;
169        }

170
171    private:
172        static  unsigned __stdcall _m_work_thread(void * arg)
173        {
174            
175            self_type & self = *reinterpret_cast<self_type*>(arg);
176            tp_ipc_peer_namespace::task_object * task = 0;
177
178            ::SuspendThread(::GetCurrentThread());
179
180            whiletrue )
181            {
182                task = self._m_read_task();
183                if ( task )
184                {
185                    task->exec();
186
187                    delete task ;
188                    task = 0;
189                }

190                else
191                    break;
192            }

193            
194            ::_endthreadex( 0 );
195            return 0;
196        }

197
198    private:
199        /// 线程信息
200        struct tinfo_type
201        {
202            HANDLE            handle;
203            unsigned        tid;
204            unsigned long    state;  // 0 = sleep;
205        }
;
206
207        /// user define var
208    private:
209        /// 线程运行状态
210        volatile bool                tpool_running_;
211        /// 一个临界区类
212        sync::csectionlock            tlock_;                
213        /// 线程信息
214        std::vector<tinfo_type>        threads_;        
215        /// 
216        sync::csectionlock            task_lock_;
217        /// 一个回调函数
218        std::vector<task_object* > task_container_;
219
220    }
;
221
222
223}





       备注:在设计ipc的时候参考  http://man.chinaunix.net/tech/lyceum/linuxK/ipc/ipc.html
                线程池设计                  http://www.ibm.com/developerworks/cn/java/l-threadPool/

posted on 2009-08-09 18:10 expter 阅读(4172) 评论(8)  编辑 收藏 引用

评论

# re: 一个简单线程池的实现[未登录] 2009-08-10 01:57 fox

不用线程,只是select轮询又如何?不见得性能很差。  回复  更多评论   

# re: 一个简单线程池的实现 2009-08-10 09:19 abettor

@fox
select的话,如果进程同时占用的socket超过100,效率将显著下降。
另外select模型毕竟还是串行操作,无法做到真正的recv/send并行。  回复  更多评论   

# re: 一个简单线程池的实现[未登录] 2009-08-10 17:57 欲三更

你一个线程select几十个不就行了?  回复  更多评论   

# re: 一个简单线程池的实现 2009-08-10 19:43 expter

@fox
select模型 确实能提高效率,我这里只是介绍一个线程池,他是我在处理IPC机制的一个模型  回复  更多评论   

# re: 一个简单线程池的实现 2010-02-25 14:11 tcpcoder

都是epoll的年代了,还select-_-!  回复  更多评论   

# re: 一个简单线程池的实现 2010-04-07 23:00 asuran

如果你只需要1~2个socket 连接select 还是很不错的, 比如客户端

如果需要handle大量的链接, 如服务器端, 用 IOCP /EPOLL/BOOST。ASIO
所以需要case by case, 不能一概而论  回复  更多评论   

# re: 一个简单线程池的实现 2011-10-26 17:47 wishper

这代码 应该不是你写的吧.只是把人家的代码拿来该了几个namespace
  回复  更多评论   

# re: 一个简单线程池的实现 2013-01-30 10:27 clement

这线程池写的不咋地。能运行吗?  回复  更多评论   


只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理