我的第一本C++书

游历C++世界的地图

C++小品:她来听我的演唱会——C++11中的随机数、线程(thread)、互斥(mutex)和条件变量(condition)

源文来自: http://imcc.blogbus.com/logs/172675220.html
 

在新颁布的C++新标准C++11中,最令人激动人心的,我想不是auto关键字,也不是Lambda表达式,而是其中的对并行计算的支持——新的线程库(thread)的加入。

多核心CPU的普及应用,C++的主要应用领域,服务器程序,高性能计算等等,都对并行计算提出了新的要求,而这次C++中全新添加的线程库,就是 对这一趋势的应对。现在,C++程序员可以轻松地编写多线程的程序,而无需借助系统API或者是第三方程序库的支持。线程库的加入给C++带来的变化,无 异于 194,翻身的程序员们把歌唱。

C++11中的线程库,很大程度上直接来自boost这块C++的试验田,其基本架构和组件都完全相同,如果你是一个boost线程库的使用者,那 么在C++11中,你会感觉到是回到了老家一样,到处都是熟人。而如果你是一个完全的新手,也不要紧,C++11中的线程库非常简单,任何人都可以轻松上 手,我就是这样,但是要深究,还得好好学习。

下面是一个简单的例子,用到了线程库中的线程(thread),互斥(mutex),条件变量(condition),来模拟一个演唱会的入场检票的场景,另外,为了模拟观众,用到了C++11中的新的随机数的产生,模拟一个正态分布的访客人群。不说了,还是看代码:

#include <iostream>
#include <queue>
#include <vector>
// 随机数
#include <random>
// 这里,我使用了boost实现的线程库,如果你的编译器已经支持C++11,则使用<thread>是一样的
#include <boost\thread.hpp>
#include <boost\thread\locks.hpp>
#include <boost\thread\condition.hpp>

using namespace std;
using namespace boost;

// 共享资源和互斥对象
mutex mtx;
bool finish = false;  // 表示观众到来是否结束

// 观众,主要是为了表示检票过程中的检票耗费时间
class viewer
{
public:
    void check()
    {
        // 线程等待
        posix_time::milliseconds worktime(400); 
        this_thread::sleep(worktime);   
    }
    void arrival(int t)
    {
        posix_time::seconds arrtime(t); 
        this_thread::sleep(arrtime);   
    }
};
// 检票口
// 它有一个队列,用于保存到来的观众,并且用一个线程来处理队列中的观众
class gate
{
    typedef boost::mutex::scoped_lock scoped_lock;
public:
    gate():count(0),no(0){};
    // 启动线程
    void start(int n)
    {
        no = n;
        t = thread(&gate::check,this);
    }

    // 检票
    void check()
    {
        // 无限循环,知道观众数为0且不会有新的观众到来
        while(true)
        {
            viewer v;
            {
                // 锁定互斥对象,开始访问对列
                scoped_lock lock(m);
                if(0==vque.size())  // 如果队列为空
                {
                    {
                    // 判断是否还会有新的观众到来,也即是表示到达的线程是否结束
                    scoped_lock finlk(mtx);
                    if(finish)
                        return; // 如果已经结束,检票也同样结束
                    }
                    // 如果观众数为0,则等待新的观众的到来
                    while(0 == vque.size())
                    {   
                          // 这里的wait()是条件变量的关键,它会先是否lock所锁定的互斥对象m一定时间,
                          // 然后再次锁定,接着进行(0==vque.size())的判断。如此往复,知道size不等于0,
                          // 循环条件无法满足而结束循环,这里表达的条件就是,只有size!=0,也就是队列中有
                          // 观众才继续向下。
                          cond.wait(lock);
                    }
                }
                // 从对列中获得观众,对其进行检票
                v = vque.front();
                vque.pop();
                cond.notify_one(); // 这里是通知添加观众的进程,表示队列已经有空位置了,可以添加新的观众
            }
            v.check();
            ++count;
        }
    }
    // 将观众添加到队列
    void add(viewer v)
    {

        // 同样运用条件变量,判断队列是否已经满了
        // 只有在队列尚未满的情况下才向下继续
        scoped_lock lock(m);
        while(vque.size() >= 15 )
        {
            cond.wait(lock);
        }
        vque.push(v); // 将观众添加到队列
        cond.notify_one();  // 通知检票进程,新的观众进入队列,这样在size=0时等待的条件可以更新
    }
    int getcount()
    {
        return count;
    }
    int getno()
    {
        return no;
    }
    // 等待线程执行完毕返回
    void join()
    {
        t.join();
    }
private:
    thread t;
    mutex m;
    condition cond;
    queue<viewer> vque;
    int count;
    int no;
};

// 一共有10个检票口
vector<gate> vgates(10);

// 用随机数模拟观众到达
void arrival()
{   
    default_random_engine re{}; // 产生一个均值为31的正态分布的随机数
    normal_distribution<double> nd(31,8);

    // 将随机数引擎和分布绑定一个函数对象
    auto norm = std::bind(nd, re);
    // 保存随机数的容器
    vector<int> mn(64);
   
    // 产生随机数
    for(int i = 0;i<700;++i)
        ++mn[round(norm())];
   
    int secs = 100;
    // 产生0到9的随机数,表示观众随机地到达某一个检票口
    uniform_int_distribution<int>  index{0,9};
     
    // 进入检票口队列
    for(auto i:mn)
    {
        cout<<i<<endl;
        for(auto vi = 1; vi <= i; ++vi)
        {
            // 将观众添加到某个gate的队列中
             (vgates[index(re)]).add(viewer());
            // 等待一段时间
            int t = round(secs/(float)(i+1));
            this_thread::sleep(
            posix_time::milliseconds(t));
        }
    }
    // 观众已经全部到达,进入队列
     cout<<"finish"<<endl;
    mtx.lock();
    finish = true;
    mtx.unlock();
    //cout<<"unlock"<<endl;
}

int main()
{
    int i = 1;
    // 启动检票线程
    for(gate& g:vgates)
    {
        g.start(i);
        ++i;
    }
    // 启动到达线程,看看,在C++11中新线程的创建就这么简单
    thread arr = thread(arrival);
    // 等待线程结束
    arr.join();
    int total = 0;
    // 等待检票线程结束,并输出处理的人数
    for(gate& g:vgates)
    {
        g.join();
        total += g.getcount();
        cout<<"gate "<<g.getno()
            <<" processed "<<g.getcount()<<" viewers."<<endl;
    }
    cout<<"there are "<<total<<"viewers in total."<<endl;
    return 0;
}
这就是一个线程库的简单应用,模拟了非常复杂的场景。

因为自己对多线程开发还不太熟悉,这个程序在某些特定条件下会产生了死锁,还有待进一步完善

posted on 2011-11-12 10:29 陈良乔——《我的第一本C++书》 阅读(3395) 评论(3)  编辑 收藏 引用

Feedback

# re: C++小品:她来听我的演唱会——C++11中的随机数、线程(thread)、互斥(mutex)和条件变量(condition) 2011-11-13 17:16 一念天堂

我是很久没用 boost thread 了,但是 scoped lock 能跟 condition variable 一起用么?  回复  更多评论   

# re: C++小品:她来听我的演唱会——C++11中的随机数、线程(thread)、互斥(mutex)和条件变量(condition) 2012-08-15 10:18 walfud

请问你是在什么平台下编译并运行的? 我在 fedora 17 上 thread 会报异常, 说 Operation not permitted.
windows g++ 4.6 下找不到 std::thread...  回复  更多评论   

# re: C++小品:她来听我的演唱会——C++11中的随机数、线程(thread)、互斥(mutex)和条件变量(condition) 2012-08-15 10:19 walfud

@walfud
哦, 我错了, 我以为你是用的 C++11 thread 呢.  回复  更多评论   



只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理