那谁的技术博客

感兴趣领域:高性能服务器编程,存储,算法,Linux内核
随笔 - 210, 文章 - 0, 评论 - 1183, 引用 - 0
数据加载中……

Linux下面的线程锁,条件变量以及信号量的使用

一) 线程锁
1) 只能用于"锁"住临界代码区域
2) 一个线程加的锁必须由该线程解锁.

锁几乎是我们学习同步时最开始接触到的一个策略,也是最简单, 最直白的策略.

二) 条件变量,与锁不同, 条件变量用于等待某个条件被触发
1) 大体使用的伪码:

// 线程一代码
pthread_mutex_lock(&mutex);
// 设置条件为true
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);

// 线程二代码
pthread_mutex_lock(&mutex);
while (条件为false)
    pthread_cond_wait(&cond, &mutex);
修改该条件
pthread_mutex_unlock(&mutex);

需要注意几点:
1) 第二段代码之所以在pthread_cond_wait外面包含一个while循环不停测试条件是否成立的原因是, 在pthread_cond_wait被唤醒的时候可能该条件已经不成立.UNPV2对这个的描述是:"Notice that when pthread_cond_wait returns, we always test the condition again, because spurious wakeups can occur: a wakeup when the desired condition is still not true.".

2) pthread_cond_wait调用必须和某一个mutex一起调用, 这个mutex是在外部进行加锁的mutex, 在调用pthread_cond_wait时, 内部的实现将首先将这个mutex解锁, 然后等待条件变量被唤醒, 如果没有被唤醒, 该线程将一直休眠, 也就是说, 该线程将一直阻塞在这个pthread_cond_wait调用中, 而当此线程被唤醒时, 将自动将这个mutex加锁.
man文档中对这部分的说明是:
pthread_cond_wait atomically unlocks the mutex (as per pthread_unlock_mutex) and waits for the condition variable cond to  be  signaled.  The thread execution is suspended and does not consume any CPU time until the condition variable is
signaled. The mutex must be locked by the calling thread on entrance to pthread_cond_wait.  Before  returning  to  the calling thread, pthread_cond_wait re-acquires mutex (as per pthread_lock_mutex).
也就是说pthread_cond_wait实际上可以看作是以下几个动作的合体:
解锁线程锁
等待条件为true
加锁线程锁.

这里是使用条件变量的经典例子:
http://www.cppblog.com/CppExplore/archive/2008/03/20/44949.html
之所以使用两个条件变量, 是因为有两种情况需要进行保护,使用数组实现循环队列,因此一个条件是在getq函数中判断读写指针相同且可读数据计数为0,此时队列为空没有数据可读,因此获取新数据的条件变量就一直等待,另一个条件是读写指针相同且可读数据计数大于0,此时队列满了不能再添加数据, 因此添加新数据的条件变量就一直等待,而nEmptyThreadNum和nFullThreadNum则是计数, 只有这个计数大于0时才会唤醒相应的条件变量,这样可以减少调用pthread_cond_signal的次数.
为了在下面的叙述方便, 我将这段代码整理在下面, 是一个可以编译运行的代码,但是注意需要在编译时加上-pthread链接线程库:
#include <pthread.h>
#include 
<stdio.h>
#include 
<unistd.h>
#include 
<stdlib.h>

class CThreadQueue
{
public:
    CThreadQueue(
int queueSize=1024):
        sizeQueue(queueSize),lput(
0),lget(0),nFullThread(0),nEmptyThread(0),nData(0)
    {
        pthread_mutex_init(
&mux,0);
        pthread_cond_init(
&condGet,0);
        pthread_cond_init(
&condPut,0);
        buffer
=new void *[sizeQueue];
    }
    
virtual ~CThreadQueue()
    {
        delete[] buffer;
    }
    
void * getq()
    {
        
void *data;
        pthread_mutex_lock(
&mux);
        /*
         此 处循环判断的原因如下:假设2个线程在getq阻塞,然后两者都被激活,而其中一个线程运行比较块,快速消耗了2个数据,另一个线程醒来的时候已经没有新 数据可以消耗了。另一点,man pthread_cond_wait可以看到,该函数可以被信号中断返回,此时返回EINTR。为避免以上任何一点,都必须醒来后再次判断睡眠条件。更 正:pthread_cond_wait是信号安全的系统调用,不会被信号中断。
        */
        while(lget==lput&&nData==0)
        {
            nEmptyThread
++;
            pthread_cond_wait(
&condGet,&mux);
            nEmptyThread
--;     
        }

        data
=buffer[lget++];
        nData
--;
        
if(lget==sizeQueue)
        {
            lget
=0;
        }
        
if(nFullThread) //必要时才进行signal操作,勿总是signal
        {
            pthread_cond_signal(
&condPut);    
        }
        pthread_mutex_unlock(
&mux);
        
return data;
    }
    
void putq(void *data)
    {
        pthread_mutex_lock(
&mux);
        
while(lput==lget&&nData)
        { 
            nFullThread
++;
            pthread_cond_wait(
&condPut,&mux);
            nFullThread
--;
        }
        buffer[lput
++]=data;
        nData
++;
        
if(lput==sizeQueue)
        {
            lput
=0;
        }
        
if(nEmptyThread) //必要时才进行signal操作,勿总是signal
        {
            pthread_cond_signal(
&condGet);
        }
        pthread_mutex_unlock(
&mux);
    }
private:
    pthread_mutex_t mux;
    pthread_cond_t condGet;
    pthread_cond_t condPut;

    
void * * buffer;    //循环消息队列
    int sizeQueue;        //队列大小
    int lput;        //location put  放数据的指针偏移
    int lget;        //location get  取数据的指针偏移
    int nFullThread;    //队列满,阻塞在putq处的线程数
    int nEmptyThread;    //队列空,阻塞在getq处的线程数
    int nData;        //队列中的消息个数,主要用来判断队列空还是满
};

CThreadQueue queue;
//使用的时候给出稍大的CThreadQueue初始化参数,可以减少进入内核态的操作。

void * produce(void * arg)
{
    
int i=0;
    pthread_detach(pthread_self());
    
while(i++<100)
    {
        queue.putq((
void *)i);
    }
}

void *consume(void *arg)
{
    
int data;
    
while(1)
    {
        data
=(int)(queue.getq());
        printf(
"data=%d\n",data);
    }
}

int main()
{    
    pthread_t pid;
    
int i=0;

    
while(i++<3)
        pthread_create(
&pid,0,produce,0);
    i
=0;
    
while(i++<3)
        pthread_create(
&pid,0,consume,0);
    sleep(
5);

    
return 0;
}


三) 信号量
信号量既可以作为二值计数器(即0,1),也可以作为资源计数器.
主要是两个函数:
sem_wait()  decrements  (locks)  the semaphore pointed to by sem.  If the semaphore's value is greater than zero, then
the decrement proceeds, and the function returns, immediately.  If the semaphore currently has the  value  zero,  then
the  call  blocks  until  either  it  becomes possible to perform the decrement (i.e., the semaphore value rises above
zero), or a signal handler interrupts the call.

sem_post()  increments  (unlocks)  the  semaphore  pointed  to  by sem.  If the semaphore's value consequently becomes
greater than zero, then another process or thread blocked in a sem_wait(3) call will be woken up and proceed  to  lock
the semaphore.

而函数int sem_getvalue(sem_t *sem, int *sval);则用于获取信号量当前的计数.

可以用信号量模拟锁和条件变量:
1) 锁,在同一个线程内同时对某个信号量先调用sem_wait再调用sem_post, 两个函数调用其中的区域就是所要保护的临界区代码了,这个时候其实信号量是作为二值计数器来使用的.不过在此之前要初始化该信号量计数为1,见下面例子中的代码.
2) 条件变量,在某个线程中调用sem_wait, 而在另一个线程中调用sem_post.

我们将上面例子中的线程锁和条件变量都改成用信号量实现以说明信号量如何模拟两者:
#include <pthread.h>
#include 
<stdio.h>
#include 
<unistd.h>
#include 
<stdlib.h>
#include 
<fcntl.h>
#include 
<sys/stat.h>
#include 
<semaphore.h>
#include 
<errno.h>
#include 
<string.h>

class CThreadQueue
{
public:
    CThreadQueue(
int queueSize=1024):
        sizeQueue(queueSize),lput(
0),lget(0),nFullThread(0),nEmptyThread(0),nData(0)
    {
        
//pthread_mutex_init(&mux,0);
        mux = sem_open("mutex", O_RDWR | O_CREAT);
        
get = sem_open("get", O_RDWR | O_CREAT);
        put 
= sem_open("put", O_RDWR | O_CREAT);
    
        sem_init(mux, 
01);

        buffer
=new void *[sizeQueue];
    }
    
virtual ~CThreadQueue()
    {
        delete[] buffer;
        sem_unlink(
"mutex");
        sem_unlink(
"get");
        sem_unlink(
"put");
    }
    
void * getq()
    {
        
void *data;

        
//pthread_mutex_lock(&mux);
        sem_wait(mux);

        
while(lget==lput&&nData==0)
        {
            nEmptyThread
++;
            
//pthread_cond_wait(&condGet,&mux);
            sem_wait(get);
            nEmptyThread
--;     
        }

        data
=buffer[lget++];
        nData
--;
        
if(lget==sizeQueue)
        {
            lget
=0;
        }
        
if(nFullThread) //必要时才进行signal操作,勿总是signal
        {
            
//pthread_cond_signal(&condPut);    
            sem_post(put);
        }

        
//pthread_mutex_unlock(&mux);
        sem_post(mux);

        
return data;
    }
    
void putq(void *data)
    {
        
//pthread_mutex_lock(&mux);
        sem_wait(mux);

        
while(lput==lget&&nData)
        { 
            nFullThread
++;
            
//pthread_cond_wait(&condPut,&mux);
            sem_wait(put);
            nFullThread
--;
        }
        buffer[lput
++]=data;
        nData
++;
        
if(lput==sizeQueue)
        {
            lput
=0;
        }
        
if(nEmptyThread)
        {
            
//pthread_cond_signal(&condGet);
            sem_post(get);
        }

        
//pthread_mutex_unlock(&mux);
        sem_post(mux);
    }
private:
    
//pthread_mutex_t mux;
    sem_t* mux;
    
//pthread_cond_t condGet;
    
//pthread_cond_t condPut;
    sem_t* get;
    sem_t
* put;

    
void * * buffer;    //循环消息队列
    int sizeQueue;        //队列大小
    int lput;        //location put  放数据的指针偏移
    int lget;        //location get  取数据的指针偏移
    int nFullThread;    //队列满,阻塞在putq处的线程数
    int nEmptyThread;    //队列空,阻塞在getq处的线程数
    int nData;        //队列中的消息个数,主要用来判断队列空还是满
};

CThreadQueue queue;
//使用的时候给出稍大的CThreadQueue初始化参数,可以减少进入内核态的操作。

void * produce(void * arg)
{
    
int i=0;
    pthread_detach(pthread_self());
    
while(i++<100)
    {
        queue.putq((
void *)i);
    }
}

void *consume(void *arg)
{
    
int data;
    
while(1)
    {
        data
=(int)(queue.getq());
        printf(
"data=%d\n",data);
    }
}

int main()
{    
    pthread_t pid;
    
int i=0;

    
while(i++<3)
        pthread_create(
&pid,0,produce,0);
    i
=0;
    
while(i++<3)
        pthread_create(
&pid,0,consume,0);
    sleep(
5);

    
return 0;
}


不过, 信号量除了可以作为二值计数器用于模拟线程锁和条件变量之外, 还有比它们更加强大的功能, 信号量可以用做资源计数器, 也就是说初始化信号量的值为某个资源当前可用的数量, 使用了一个之后递减, 归还了一个之后递增, 将前面的例子用资源计数器的形式再次改写如下,注意在初始化的时候要将资源计数进行初始化, 在下面代码中的构造函数中将put初始化为队列的最大数量, 而get为0:
#include <pthread.h>
#include 
<stdio.h>
#include 
<unistd.h>
#include 
<stdlib.h>
#include 
<fcntl.h>
#include 
<sys/stat.h>
#include 
<semaphore.h>

class CThreadQueue
{
public:
    CThreadQueue(
int queueSize=1024):
        sizeQueue(queueSize),lput(
0),lget(0)
    {
        pthread_mutex_init(
&mux,0);
        
get = sem_open("get", O_RDWR | O_CREAT);
        put 
= sem_open("put", O_RDWR | O_CREAT);

        sem_init(
get00);
        sem_init(put, 
0, sizeQueue);

        buffer
=new void *[sizeQueue];
    }
    
virtual ~CThreadQueue()
    {
        sem_unlink(
"get");
        sem_unlink(
"put");
        delete[] buffer;
    }
    
void * getq()
    {
        sem_wait(
get);

        
void *data;

        pthread_mutex_lock(
&mux);

        
/*
        while(lget==lput&&nData==0)
        {
            nEmptyThread++;
            //pthread_cond_wait(&condGet,&mux);
            nEmptyThread--;     
        }
        
*/

        data
=buffer[lget++];
        
//nData--;
        if(lget==sizeQueue)
        {
            lget
=0;
        }
        
/*
        if(nFullThread) //必要时才进行signal操作,勿总是signal
        {
            //pthread_cond_signal(&condPut);    
            sem_post(put);
        }
        
*/
        pthread_mutex_unlock(
&mux);

        sem_post(put);

        
return data;
    }
    
void putq(void *data)
    {
        sem_wait(put);

        pthread_mutex_lock(
&mux);

        
/*
        while(lput==lget&&nData)
        { 
            nFullThread++;
            //pthread_cond_wait(&condPut,&mux);
            sem_wait(put);
            nFullThread--;
        }
        
*/

        buffer[lput
++]=data;
        
//nData++;
        if(lput==sizeQueue)
        {
            lput
=0;
        }
        
/*
        if(nEmptyThread)
        {
            //pthread_cond_signal(&condGet);
            sem_post(get);
        }
        
*/

        pthread_mutex_unlock(
&mux);

        sem_post(
get);
    }
private:
    pthread_mutex_t mux;
    
//pthread_cond_t condGet;
    
//pthread_cond_t condPut;
    sem_t* get;
    sem_t
* put;

    
void * * buffer;    //循环消息队列
    int sizeQueue;        //队列大小
    int lput;        //location put  放数据的指针偏移
    int lget;        //location get  取数据的指针偏移
};

CThreadQueue queue;
//使用的时候给出稍大的CThreadQueue初始化参数,可以减少进入内核态的操作。

void * produce(void * arg)
{
    
int i=0;
    pthread_detach(pthread_self());
    
while(i++<100)
    {
        queue.putq((
void *)i);
    }
}

void *consume(void *arg)
{
    
int data;
    
while(1)
    {
        data
=(int)(queue.getq());
        printf(
"data=%d\n",data);
    }
}

int main()
{    
    pthread_t pid;
    
int i=0;

    
while(i++<3)
        pthread_create(
&pid,0,produce,0);
    i
=0;
    
while(i++<3)
        pthread_create(
&pid,0,consume,0);
    sleep(
5);

    
return 0;
}

可以看见,采用信号量作为资源计数之后, 代码变得"很直白",原来的一些保存队列状态的变量都不再需要了.

信号量与线程锁,条件变量相比还有以下几点不同:
1)锁必须是同一个线程获取以及释放, 否则会死锁.而条件变量和信号量则不必.
2)信号的递增与减少会被系统自动记住, 系统内部有一个计数器实现信号量,不必担心会丢失, 而唤醒一个条件变量时,如果没有相应的线程在等待该条件变量, 这次唤醒将被丢失.

posted on 2009-01-15 10:23 那谁 阅读(15653) 评论(3)  编辑 收藏 引用 所属分类: Linux/Unix

评论

# re: Linux下面的线程锁,条件变量以及信号量的使用  回复  更多评论   

第二段程序模拟线程锁中有bug,sem_wait(get)跟sem_wait(put)前后需要加上解锁加锁动作。

produce 和 consume 的设计,队列大小为1024,测试数据3x100太小。while循环中最好加上usleep让线程休息一会儿,否则consume极可能需要在produce线程跑完了之后才获得锁,对测试程序不利。
2009-01-27 08:57 | Mensch88

# re: Linux下面的线程锁,条件变量以及信号量的使用  回复  更多评论   

关于最后一段对用信号量实现通知/等待机制和用条件变量来实现相同机制的比较,所说的两点都对信号量有利,那条件变量多余了吗?
2009-02-11 11:39 | yqiang

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理