看群里有同志老是在找线程池的实现,听说网上曾经发布的都是不正确的,今天我就自己弄了一个,不正确的地方大家指点指点
mutex.hxx 互斥类
1
#ifndef INCLUDE_MUTEX_HH
2
#define INCLUDE_MUTEX_HH
3
#include <pthread.h>
4
5
class Mutex
6

{
7
public:
8
Mutex();
9
virtual ~Mutex();
10
void lock();
11
void unlock();
12
pthread_mutex_t *get_mutex();
13
private:
14
pthread_mutex_t mutex;
15
};
16
17
#endif
18
mutex.cxx互斥实现类
#include "mutex.hxx"
#include "error.hxx"
Mutex::Mutex()


{
if(pthread_mutex_init(&mutex,NULL))

{
perror("pthread_mutex_init error");
throw MutexError("pthread_mutex_init error");
}
}
Mutex::~Mutex()


{
if(pthread_mutex_destroy(&mutex))

{
perror("pthread_mutex_destroy error");
throw MutexError("pthread_mutex_destroy error");
}
}

void Mutex::lock()


{
pthread_mutex_lock(&mutex);
}

void Mutex::unlock()


{
pthread_mutex_unlock(&mutex);
}

pthread_mutex_t *Mutex::get_mutex()


{
return &mutex;
}

error.hxx 异常类型
#ifndef INCLUDE_ERROR_HH
#define INCLUDE_ERROR_HH
#include <stdexcept>

class MutexError:public std::runtime_error


{
public:
MutexError(const std::string& what)
:std::runtime_error(what.c_str())

{}
MutexError(const char* const what)
:std::runtime_error(what)

{}
};

#endif

task.hxx 任务类,所有的任务需要实现此接口
#ifndef INCLUDE_TASK_HH
#define INCLUDE_TASK_HH

#include <string>
#include "mutex.hxx"
//class Mutex;
class Task


{
friend bool operator<(const Task& t1,const Task& t2);
public:
Task(const std::string& taskName=std::string(),int level=0);

virtual ~Task()
{};
void setLevel(int level);
std::string taskName()const;
std::string taskName();
void setName(const std::string&);
virtual void run()=0;

private:
Mutex mutex;
int level_;
std::string taskName_;

};
#endif

task.cxx 任务实现代码
#include "task.hxx"
//#include "mutex.hxx"

Task::Task(const std::string& name,int level)
:taskName_(name),level_(level)


{
}

void Task::setLevel(int level)


{
mutex.lock();
level_=level;
mutex.unlock();
}
std::string Task::taskName()const


{
return taskName_;
}
std::string Task::taskName()


{
return taskName_;
}
void Task::setName(const std::string& name)


{
mutex.lock();
taskName_=name;
mutex.unlock();
}
bool operator<(const Task& t1,const Task& t2)


{
return t1.level_<t2.level_;
}

池头文件 pool.hxx
#ifndef INCLUDE_POOL_HH
#define INCLUDE_POOL_HH
#include <pthread.h>
#include <queue>
#include <list>
#include "mutex.hxx"
class Task;
class ThreadPool
:private Mutex


{
public:
ThreadPool(int);
~ThreadPool();
void addTask(Task*);
void wait();
void release(const pthread_t&);
Task* get();
void setTimeout(long t);
private:
typedef std::list<pthread_t>::iterator ThreadIterator;
pthread_cond_t release_cond;
pthread_cond_t task_cond;
static void* threadFunc(void*);
void init(int);
std::priority_queue<Task*> tasks;
std::list<pthread_t> idleThreads;
std::list<pthread_t> busyThreads;
long timeout_second;
};

#endif

池实现文件
#include "pool.hxx"
#include "task.hxx"
#include <algorithm>
#include <ctime>
#include <iostream>

ThreadPool::ThreadPool(int threadNumber)
:timeout_second(10)


{
pthread_cond_init(&release_cond,NULL);
pthread_cond_init(&task_cond,NULL);
init(threadNumber);
}

ThreadPool::~ThreadPool()


{
pthread_cond_destroy(&release_cond);
pthread_cond_destroy(&task_cond);
}

void ThreadPool::init(int threadNumber)


{
for(int i=0;i<threadNumber;i++)

{
pthread_t t;
pthread_create(&t,NULL,threadFunc,this);
busyThreads.push_back(t);
}
}

void ThreadPool::setTimeout(long t)


{
if(t>0)
timeout_second=t;
}

void ThreadPool::addTask(Task* task)


{
lock();
tasks.push(task);
pthread_cond_signal(&task_cond);
unlock();
}

Task* ThreadPool::get()


{
struct timespec timeout;
timeout.tv_sec=time(NULL)+timeout_second;
timeout.tv_nsec=0;
lock();
if(tasks.empty())

{
pthread_cond_timedwait(&task_cond,get_mutex(),&timeout);
}
if(tasks.empty())

{
std::cout<<"empty"<<std::endl;
unlock();
return NULL;
}
Task *task=tasks.top();
tasks.pop();
unlock();
return task;
}

void * ThreadPool::threadFunc(void* args)


{
ThreadPool* pool=static_cast<ThreadPool*>(args);
Task* task;
while((task=pool->get())!=NULL)

{
task->run();
}
pool->release(pthread_self());
}

void ThreadPool::release(const pthread_t& t)


{
lock();
ThreadIterator it;
it=std::find(busyThreads.begin(),busyThreads.end(),t);
if(it!=busyThreads.end())

{
busyThreads.erase(it);
}
idleThreads.push_back(t);
pthread_cond_signal(&release_cond);
unlock();
}
void ThreadPool::wait()


{
lock();
while(!busyThreads.empty())

{
struct timespec timeout;
timeout.tv_sec=time(NULL)+10;
timeout.tv_nsec=0;

pthread_cond_timedwait(&release_cond,get_mutex(),&timeout);
}

for(ThreadIterator it=idleThreads.begin();it!=idleThreads.end();it++)

{
pthread_join(*it,NULL);
}
unlock();
}

测试文件
#include "pool.hxx"
#include "task.hxx"
#include <unistd.h>
#include <iostream>
#include <string>
#include <vector>
#include <sstream>
#include <memory>
#include "mutex.hxx"
class WorkTask
:public Task


{
public:
WorkTask(int level,void *data):Task(std::string(),level)

{
this->data_=data;
}

~WorkTask()
{}
virtual void run()

{
std::cout<<taskName()<<(char*)data_<<std::endl;
sleep(2);
std::cout<<taskName()<<" ok"<<std::endl;
}
private:
void *data_;
Mutex mutex;
};

int main(void)


{
ThreadPool pool(5);
char szTemp[]="aaaaaaaaaaaaaaabbbbbbbbbbbccccccccccdddddddddd";
WorkTask task(1,szTemp);
char buf[20];
std::vector<Task*> tasks;
for(int i=0;i<10;i++)

{
snprintf(buf,sizeof(buf),"%s %d","task",i);
task.setName(buf);
std::auto_ptr<Task> t(new WorkTask(task));
pool.addTask(t.get());
tasks.push_back(t.release());
}
pool.wait();
for(std::vector<Task*>::iterator it=tasks.begin();it!=tasks.end();it++)

{
delete *it;
}
return 0;
}



测试的结果

没有注释直接看源码就可解决。。
注:使用本人代码请注明本人的信息