作者:CppExplore 网址:http://www.cppblog.com/CppExplore/
为了后面写的《网络模型(二)》,多写一篇关于线程的。线程使用涉及的主要数据结构以及应用框架可以参考http://www.cppblog.com/CppExplore/archive/2008/01/15/41175.html。本文的主要目的是给出linux下实用的线程消息队列实现。
一、linux上线程相关的操作有下面几种:
(1)pthread_t类型的创建、属性创建设置等。
这类具体可以:man pthread_creat; man pthread_attr_init;man pthread_detach;man pthread_join;等查看
(2)pthread_mutex_t类型的操作。
这类具体可以: man pthread_mutex_init可以看到所有相关的操作。
(3)pthread_cond_t类型的操作。同样:man pthread_cond_init。pthread_cond_t的wait和signal操作一定要和pthread_mutex_t的lock、unlock配合使用。类似于此:
pthread_mutex_t mux=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
//wait操作:
pthread_mutex_lock(&mux);
pthread_cond_wait(&cond,&mux);//睡眠前将内部会执行pthread_mutex_unlock,醒来时内部会执行pthread_mutex_lock
pthread_mutex_unlock(&mux);
//signal操作
pthread_mutex_lock(&mux);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mux);
(4)sem_t类型的操作。同样:man sem_init 这个系列一般是用不到的,太重量级了,也是最强大的一种。
二、linux2.6内核的线程库。2.6内核的默认安装的是redhat公司的NPTL(原生posix线程库),以前内核安装的是LinuxThreads库,两者的简单介绍可以看http://www.ibm.com/developerworks/cn/linux/l-threading.html。不过对于应用者,分析两者的区别和优劣也没什么大意义。这里特别提下NPTL的futex机制。借助该机制,pthread_mutex的性能大大提高,只要不进入竞争态,进程就不会陷入内核态。这点可以自己写示例程序,通过strace -c 跟踪进程的系统调用得以证实,另外还可以证实总是进入内核态的操作有pthread_cond_signal和sem_post。
三、实用的线程消息队列实现。下面设计一个具有线程消息队列的线程封装类。通过上面的分析,我们可以有如下结论:
(1)减少pthread_cond_signal和sem_post的调用,只在有必要的时候调用;
(2)尽量避免pthread_mutex进入竞争态。增大消息队列的大小,可以有效减少竞态条件的出现。
下面给出一个实用的线程消息队列的实现类,这个类也将是以后《网络模型》文章中用到的线程消息队列类,代码注释请看对私有属性的注释:
class CThreadQueue
{
public:
CThreadQueue(int queueSize=1024):
sizeQueue(queueSize),lput(0),lget(0),nFullThread(0),nEmptyThread(0),nData(0)
{
pthread_mutex_init(&mux,0);
pthread_cond_init(&condGet,0);
pthread_cond_init(&condPut,0);
buffer=new (void *)[sizeQueue];
}
virtual ~CThreadQueue()
{
delete[] buffer;
}
void * getq()
{
void *data;
pthread_mutex_lock(&mux);
while(lget==lput&&nData==0)//此处循环判断的原因如下:假设2个线程在getq阻塞,然后两者都被激活,而其中一个线程运行比较块,快速消耗了2个数据,另一个线程醒来的时候已经没有新数据可以消耗了。另一点,man pthread_cond_wait可以看到,该函数可以被信号中断返回,此时返回EINTR。为避免以上任何一点,都必须醒来后再次判断睡眠条件。更正:pthread_cond_wait是信号安全的系统调用,不会被信号中断。
{
nEmptyThread++;
pthread_cond_wait(&condGet,&mux);
nEmptyThread--;
}
data=buffer[lget++];
nData--;
if(lget==sizeQueue)
{
lget=0;
}
if(nFullThread) //必要时才进行signal操作,勿总是signal
{
pthread_cond_signal(&condPut);
}
pthread_mutex_unlock(&mux);
return data;
}
void putq(void *data)
{
pthread_mutex_lock(&mux);
while(lput==lget&&nData)
{
nFullThread++;
pthread_cond_wait(&condPut,&mux);
nFullThread--;
}
buffer[lput++]=data;
nData++;
if(lput==sizeQueue)
{
lput=0;
}
if(nEmptyThread)
{
pthread_cond_signal(&condGet);
}
pthread_mutex_unlock(&mux);
}
private:
pthread_mutex_t mux;
pthread_cond_t condGet;
pthread_cond_t condPut;
void * * buffer; //循环消息队列
int sizeQueue; //队列大小
int lput; //location put 放数据的指针偏移
int lget; //location get 取数据的指针偏移
int nFullThread; //队列满,阻塞在putq处的线程数
int nEmptyThread; //队列空,阻塞在getq处的线程数
int nData; //队列中的消息个数,主要用来判断队列空还是满
};
下面给出这个线程消息队列的一个使用举例:
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
CThreadQueue queue;//使用的时候给出稍大的CThreadQueue初始化参数,可以减少进入内核态的操作。
void * produce(void * arg)
{
int i=0;
pthread_detach(pthread_self());
while(i++<100)
{
queue.putq((void *)i);
}
}
void *consume(void *arg)
{
int data;
while(1)
{
data=(int)(queue.getq());
printf("data=%d\n",data)
}
}
int main()
{ pthread_t pid;
int i=0;
while(i++<3)
pthread_create(&pid,0,produce,0);
i=0;
while(i++<3)
pthread_create(&pid,0,consume,0);
sleep(300);
}