随笔-167  评论-8  文章-0  trackbacks-0

本例示范Linux信号量的基本用法。该范例使用了两个线程分别对一个公用队列进行入队和出队操作,并用信号量进行控制,当队列空时出队操作可以被阻塞,当队列满时入队操作可以被阻塞。

主要用到的信号量函数有:
sem_init:初始化信号量sem_t,初始化的时候可以指定信号量的初始值,以及是否可以在多进程间共享。
sem_wait:一直阻塞等待直到信号量>0。
sem_timedwait:阻塞等待若干时间直到信号量>0。
sem_post:使信号量加1。
sem_destroy:释放信号量。和sem_init对应。
关于各函数的具体参数请用man查看。如man sem_init可查看该函数的帮助。

下面看具体的代码:

//--------------------------msgdequeue.h开始-------------------------------------
//实现可控队列
#ifndef MSGDEQUEUE_H
#define MSGDEQUEUE_H
#include 
"tmutex.h"
#include 
<iostream>
#include 
<errno.h>
#include 
<time.h>
#include 
<semaphore.h>
#include 
<deque>
using namespace std;

template
<typename T,typename MUTEX_TYPE = ThreadMutex>
class CMessageDequeue
{
public:
        CMessageDequeue(size_t MaxSize) : m_MaxSize( MaxSize )
        
{
                sem_init( 
&m_enques,0, m_MaxSize ); //入队信号量初始化为MaxSize,最多可容纳MaxSize各元素
                sem_init( &m_deques,0,0 ); //队列刚开始为空,出队信号量初始为0
        }


        
~CMessageDequeue()
        
{
                sem_destroy(
&m_enques);
                sem_destroy(
&m_deques);
        }


        
int sem_wait_i( sem_t *psem, int mswait )
        
{//等待信号量变成>0,mswait为等待时间,若mswait<0则无穷等待,否则等待若干mswait毫秒。
                if( mswait < 0 )
                
{
                        
int rv = 0;                          
                        
while( ((rv = sem_wait(psem) ) != 0 ) && (errno == EINTR
) );    
//等待信号量,errno==EINTR屏蔽其他信号事件引起的等待中断
                        return rv;    
                }
                                            
                
else                                         
                
{                                            
                        timespec ts;                         
                        clock_gettime(CLOCK_REALTIME, 
&ts );    //获取当前时间
                        ts.tv_sec += (mswait / 1000 );        //加上等待时间的秒数
                        ts.tv_nsec += ( mswait % 1000 ) * 1000//加上等待时间纳秒数
                        int rv = 0;                          
                        
while( ((rv=sem_timedwait( psem, &ts ))!=0&& (errno ==
EINTR) );   
//等待信号量,errno==EINTR屏蔽其他信号事件引起的等待中断
                        return rv;   
                }
                                            
                                                             
        }
                                                    
        
bool push_back( const T &item, int mswait = -1 )     
        
//等待mswait毫秒直到将item插入队列,mswait为-1则一直等待                                                   
                if-1 == sem_wait_i( &m_enques, mswait ))   
                
{                                            
                        
return false;                        
                }


                  
//AUTO_GUARD:定界加锁,见Linux多线程及临界区编程例解的tmutex.h文件定义。                             
                AUTO_GUARD( g, MUTEX_TYPE, m_lock );
                
try                                          
                
{                                            
                        m_data.push_back( item );            
                        cout 
<< "push " << item << endl;     
                        sem_post( 
&m_deques );               
                        
return true;                         
                }
                                            
                
catch(...)                                   
                
{                                            
                        
return false;                        
                }
                                            
        }
        

      
bool pop_front( T &item, bool bpop = trueint mswait = -1 )      
        
//等待mswait毫秒直到从队列取出元素,mswait为-1则一直等待                                                     
                if-1 == sem_wait_i( &m_deques, mswait ) )  
                
{                                            
                        
return false;                        
                }
           
                 
//AUTO_GUARD:定界加锁,见Linux多线程及临界区编程例解的tmutex.h文件定义。                   
                AUTO_GUARD( g, MUTEX_TYPE, m_lock );         
                
try                                          
                
{                                            
                        item 
= m_data.front();               
                        
if( bpop )                           
                        
{                                    
                                m_data.pop_front();          
                                cout 
<< "pop " << item << endl;
                        }
                                    
                                                             
                        sem_post( 
&m_enques );               
                        
return true;                         
                }
                                            
                
catch(...)                                   
                
{                                            
                        
return false;                        
                }
                                            
        }
                                                    
        inline size_t size()                                 
        
{                                                    
                
return m_data.size();                        
        }
     

private:                                                     
        MUTEX_TYPE m_lock;                                   
        deque
<T> m_data;                                     
        size_t m_MaxSize;                                    
        sem_t m_enques;                                      
        sem_t m_deques;                                      
}
;                                                           
                                                             
#endif                         

//--------------------------msgdequeue.h结束-------------------------------------

//--------------------------test.cpp开始-------------------------------------
//主程序文件

#include 
"msgdequeue.h"
#include 
<pthread.h>
#include 
<iostream>
using namespace std;

CMessageDequeue
<int> qq(5);

void *get_thread(void *parg);
void *put_thread(void *parg);

void *get_thread(void *parg)
{
        
while(true)
        
{
                
int a = -1;
                
if!qq.pop_front( a,true1000 ) )
                
{
                        cout 
<< "pop failed. size=" << qq.size() << endl;
                }

        }

        
return NULL;
}


void *put_thread(void *parg)
{
        
for(int i=1; i<=30; i++)
        
{
                qq.push_back( i, 
-1 );
        }


        
return NULL;                                         
}
                                                            
                                                             
int main()                                                   
{                                                          
        pthread_t pget,pput;                                 
        pthread_create( 
&pget,NULL,get_thread,NULL);         
        pthread_create( 
&pput, NULL, put_thread,NULL);       
                                                             
        pthread_join( pget,NULL );                           
        pthread_join( pput,NULL );                           
                                                             
        
return 0;                                            
}
      

//--------------------------test.cpp结束-------------------------------------

    编译程序:g++ msgdequeue.h test.cpp -lpthread -lrt -o test
    -lpthread链接pthread库。-ltr链接clock_gettime函数相关库。

    编译后生成可执行文件test。输入./test执行程序。

    线程get_thread每隔1000毫秒从队列取元素,线程put_thread将30个元素依次入队。两个线程模拟两条入队和出队的流水线。因我们在 CMessageDequeue<int> qq(5)处定义了队列最多可容纳5个元素,入队线程每入队到队列元素满5个后需阻塞等待出队线程将队列元素出队才能继续。测试时可调整队列可容纳最大元素个数来观察运行效果。

posted on 2011-09-22 10:06 老马驿站 阅读(975) 评论(0)  编辑 收藏 引用 所属分类: linux