看群里有同志老是在找线程池的实现,听说网上曾经发布的都是不正确的,今天我就自己弄了一个,不正确的地方大家指点指点
mutex.hxx 互斥类
1#ifndef INCLUDE_MUTEX_HH
2#define INCLUDE_MUTEX_HH
3#include <pthread.h>
4
5class Mutex
6{
7public:
8 Mutex();
9 virtual ~Mutex();
10 void lock();
11 void unlock();
12 pthread_mutex_t *get_mutex();
13private:
14 pthread_mutex_t mutex;
15};
16
17#endif
18
mutex.cxx互斥实现类
#include "mutex.hxx"
#include "error.hxx"
Mutex::Mutex()
{
if(pthread_mutex_init(&mutex,NULL))
{
perror("pthread_mutex_init error");
throw MutexError("pthread_mutex_init error");
}
}
Mutex::~Mutex()
{
if(pthread_mutex_destroy(&mutex))
{
perror("pthread_mutex_destroy error");
throw MutexError("pthread_mutex_destroy error");
}
}
void Mutex::lock()
{
pthread_mutex_lock(&mutex);
}
void Mutex::unlock()
{
pthread_mutex_unlock(&mutex);
}
pthread_mutex_t *Mutex::get_mutex()
{
return &mutex;
}
error.hxx 异常类型
#ifndef INCLUDE_ERROR_HH
#define INCLUDE_ERROR_HH
#include <stdexcept>
class MutexError:public std::runtime_error
{
public:
MutexError(const std::string& what)
:std::runtime_error(what.c_str())
{}
MutexError(const char* const what)
:std::runtime_error(what)
{}
};
#endif
task.hxx 任务类,所有的任务需要实现此接口
#ifndef INCLUDE_TASK_HH
#define INCLUDE_TASK_HH
#include <string>
#include "mutex.hxx"
//class Mutex;
class Task
{
friend bool operator<(const Task& t1,const Task& t2);
public:
Task(const std::string& taskName=std::string(),int level=0);
virtual ~Task(){};
void setLevel(int level);
std::string taskName()const;
std::string taskName();
void setName(const std::string&);
virtual void run()=0;
private:
Mutex mutex;
int level_;
std::string taskName_;
};
#endif
task.cxx 任务实现代码
#include "task.hxx"
//#include "mutex.hxx"
Task::Task(const std::string& name,int level)
:taskName_(name),level_(level)
{
}
void Task::setLevel(int level)
{
mutex.lock();
level_=level;
mutex.unlock();
}
std::string Task::taskName()const
{
return taskName_;
}
std::string Task::taskName()
{
return taskName_;
}
void Task::setName(const std::string& name)
{
mutex.lock();
taskName_=name;
mutex.unlock();
}
bool operator<(const Task& t1,const Task& t2)
{
return t1.level_<t2.level_;
}
池头文件 pool.hxx
#ifndef INCLUDE_POOL_HH
#define INCLUDE_POOL_HH
#include <pthread.h>
#include <queue>
#include <list>
#include "mutex.hxx"
class Task;
class ThreadPool
:private Mutex
{
public:
ThreadPool(int);
~ThreadPool();
void addTask(Task*);
void wait();
void release(const pthread_t&);
Task* get();
void setTimeout(long t);
private:
typedef std::list<pthread_t>::iterator ThreadIterator;
pthread_cond_t release_cond;
pthread_cond_t task_cond;
static void* threadFunc(void*);
void init(int);
std::priority_queue<Task*> tasks;
std::list<pthread_t> idleThreads;
std::list<pthread_t> busyThreads;
long timeout_second;
};
#endif
池实现文件
#include "pool.hxx"
#include "task.hxx"
#include <algorithm>
#include <ctime>
#include <iostream>
ThreadPool::ThreadPool(int threadNumber)
:timeout_second(10)
{
pthread_cond_init(&release_cond,NULL);
pthread_cond_init(&task_cond,NULL);
init(threadNumber);
}
ThreadPool::~ThreadPool()
{
pthread_cond_destroy(&release_cond);
pthread_cond_destroy(&task_cond);
}
void ThreadPool::init(int threadNumber)
{
for(int i=0;i<threadNumber;i++)
{
pthread_t t;
pthread_create(&t,NULL,threadFunc,this);
busyThreads.push_back(t);
}
}
void ThreadPool::setTimeout(long t)
{
if(t>0)
timeout_second=t;
}
void ThreadPool::addTask(Task* task)
{
lock();
tasks.push(task);
pthread_cond_signal(&task_cond);
unlock();
}
Task* ThreadPool::get()
{
struct timespec timeout;
timeout.tv_sec=time(NULL)+timeout_second;
timeout.tv_nsec=0;
lock();
if(tasks.empty())
{
pthread_cond_timedwait(&task_cond,get_mutex(),&timeout);
}
if(tasks.empty())
{
std::cout<<"empty"<<std::endl;
unlock();
return NULL;
}
Task *task=tasks.top();
tasks.pop();
unlock();
return task;
}
void * ThreadPool::threadFunc(void* args)
{
ThreadPool* pool=static_cast<ThreadPool*>(args);
Task* task;
while((task=pool->get())!=NULL)
{
task->run();
}
pool->release(pthread_self());
}
void ThreadPool::release(const pthread_t& t)
{
lock();
ThreadIterator it;
it=std::find(busyThreads.begin(),busyThreads.end(),t);
if(it!=busyThreads.end())
{
busyThreads.erase(it);
}
idleThreads.push_back(t);
pthread_cond_signal(&release_cond);
unlock();
}
void ThreadPool::wait()
{
lock();
while(!busyThreads.empty())
{
struct timespec timeout;
timeout.tv_sec=time(NULL)+10;
timeout.tv_nsec=0;
pthread_cond_timedwait(&release_cond,get_mutex(),&timeout);
}
for(ThreadIterator it=idleThreads.begin();it!=idleThreads.end();it++)
{
pthread_join(*it,NULL);
}
unlock();
}
测试文件
#include "pool.hxx"
#include "task.hxx"
#include <unistd.h>
#include <iostream>
#include <string>
#include <vector>
#include <sstream>
#include <memory>
#include "mutex.hxx"
class WorkTask
:public Task
{
public:
WorkTask(int level,void *data):Task(std::string(),level)
{
this->data_=data;
}
~WorkTask(){}
virtual void run()
{
std::cout<<taskName()<<(char*)data_<<std::endl;
sleep(2);
std::cout<<taskName()<<" ok"<<std::endl;
}
private:
void *data_;
Mutex mutex;
};
int main(void)
{
ThreadPool pool(5);
char szTemp[]="aaaaaaaaaaaaaaabbbbbbbbbbbccccccccccdddddddddd";
WorkTask task(1,szTemp);
char buf[20];
std::vector<Task*> tasks;
for(int i=0;i<10;i++)
{
snprintf(buf,sizeof(buf),"%s %d","task",i);
task.setName(buf);
std::auto_ptr<Task> t(new WorkTask(task));
pool.addTask(t.get());
tasks.push_back(t.release());
}
pool.wait();
for(std::vector<Task*>::iterator it=tasks.begin();it!=tasks.end();it++)
{
delete *it;
}
return 0;
}
测试的结果
没有注释直接看源码就可解决。。
注:使用本人代码请注明本人的信息