to myself 的分类学习日志

做自己想做的事
posts - 232, comments - 6, trackbacks - 0, articles - 0

Boost多线程编程

Posted on 2010-09-22 17:15 kongkongzi 阅读(15913) 评论(0)  编辑 收藏 引用 所属分类: c++ network library
 

Boost多线程编程


背景

       今天互联网应用服务程序普遍使用多线程来提高与多客户链接时的效率;为了达到最大的吞吐量,事务服务器在单独的线程上运行服务程序;

         GUI应用程序将那些费时,复杂的处理以线程的形式单独运行,以此来保证用户界面能够及时响应用户的操作。这样使用多线程的例子还有很多。

       跨平台

 

创建线程

       头文件 <boost/thread/thread.hpp>

namespace boost {

 class thread;

 class thread_group;

}

       thread():构造一个表示当前执行线程的线程对象

       explicit thread(const boost::function0<void>& threadfunc)

注:boost::function0<void>可以简单看为:一个无返回(返回void),无参数的函数。这里的函数也可以是类重载operator()构成的函数。

 

 

第一种方式:最简单方法

       #include <boost/thread/thread.hpp>

       #include <iostream>

        

       void hello()

       {

               std::cout <<

               "Hello world, I''m a thread!"

               << std::endl;

       }

        

       int main(int argc, char* argv[])

       {

               boost::thread thrd(&hello);

               thrd.join();

               return 0;

       }

 

第二种方式:复杂类型对象作为参数来创建线程

       #include <boost/thread/thread.hpp>

       #include <boost/thread/mutex.hpp>

       #include <iostream>

        

       boost::mutex io_mutex;

        

       struct count

       {

               count(int id) : id(id) { }

              

               void operator()()

               {

                       for (int i = 0; i < 10; ++i)

                       {

                               boost::mutex::scoped_lock

                               lock(io_mutex);

                               std::cout << id << ": "

                               << i << std::endl;

                       }

               }

              

               int id;

       };

        

       int main(int argc, char* argv[])

       {

               boost::thread thrd1(count(1));

               boost::thread thrd2(count(2));

               thrd1.join();

               thrd2.join();

               return 0;

       }

 

第三种方式:在类内部创建线程

       1)类内部静态方法启动线程

       #include <boost/thread/thread.hpp>

       #include <iostream>

       class HelloWorld

       {

       public:

       static void hello()

       {

             std::cout <<

             "Hello world, I''m a thread!"

             << std::endl;

       }

       static void start()

       {

        

        boost::thread thrd( hello );

        thrd.join();

       }

      

       };

       int main(int argc, char* argv[])

       {

       HelloWorld::start();

      

       return 0;

       }

       在这里start()hello()方法都必须是static方法。

 

       2)如果要求start()hello()方法不能是静态方法则采用下面的方法创建线程:

       #include <boost/thread/thread.hpp>

       #include <boost/bind.hpp>

       #include <iostream>

       class HelloWorld

       {

       public:

       void hello()

       {

           std::cout <<

           "Hello world, I''m a thread!"

           << std::endl;

       }

       void start()

       {

        boost::function0< void> f = boost::bind(&HelloWorld::hello,this);

        boost::thread thrd( f );

        thrd.join();

       }

      

       };

       int main(int argc, char* argv[])

       {

       HelloWorld hello;

       hello.start();

       return 0;

       }

 

       3)在Singleton模式内部创建线程:

       #include <boost/thread/thread.hpp>

       #include <boost/bind.hpp>

       #include <iostream>

       class HelloWorld

       {

       public:

       void hello()

       {

           std::cout <<

           "Hello world, I''m a thread!"

           << std::endl;

       }

       static void start()

       {

        boost::thread thrd( boost::bind 

                          (&HelloWorld::hello,&HelloWorld::getInstance() ) ) ;

        thrd.join();

       }

       static HelloWorld& getInstance()

       {

        if ( !instance )

             instance = new HelloWorld;

        return *instance;

       }

       private:

       HelloWorld(){}

       static HelloWorld* instance;

      

       };

       HelloWorld* HelloWorld::instance = 0;

       int main(int argc, char* argv[])

       {

       HelloWorld::start();

       return 0;

       }

 

第四种方法:用类内部函数在类外部创建线程

       #include <boost/thread/thread.hpp>

       #include <boost/bind.hpp>

       #include <string>

       #include <iostream>

       class HelloWorld

       {

       public:

       void hello(const std::string& str)

       {

               std::cout <<str<< std::endl;

       }

       };

        

       int main(int argc, char* argv[])

       {

       HelloWorld obj;

       boost::thread thrd( boost::bind(&HelloWorld::hello,&obj,"Hello

                                      world, I''m a thread!" ) ) ;

       thrd.join();

       return 0;

       }

如果线程需要绑定的函数有参数则需要使用boost::bind。比如想使用 boost::thread创建一个线程来执行函数:void f(int i)

如果这样写:boost::thread thrd(f)是不对的,因为thread构造函数声明接受的是一个没有参数且返回类型为void的型别,而且

不提供参数i的值f也无法运行,这时就可以写:boost::thread thrd(boost::bind(f,1))。涉及到有参函数的绑定问题基本上都

boost::threadboost::functionboost::bind结合起来使用。

 

互斥体

       一个互斥体一次只允许一个线程访问共享区。当一个线程想要访问共享区时,首先要做的就是锁住(lock)互斥体。

       Boost线程库支持两大类互斥体,包括简单互斥体(simple mutex)和递归互斥体(recursive mutex)

有了递归互斥体,单个线程就可以对互斥体多次上锁,当然也必须解锁同样次数来保证其他线程可以对这个互斥体上锁。

       Boost线程库提供的互斥体类型:

        boost::mutex,

        boost::try_mutex,

        boost::timed_mutex,

        boost::recursive_mutex,

        boost::recursive_try_mutex,

        boost::recursive_timed_mutex,

        boost::shared_mutex

       mutex类采用Scope Lock模式实现互斥体的上锁和解锁。即构造函数对互斥体加锁,析构函数对互斥体解锁。

       对应现有的几个mutex导入了scoped_lock,scoped_try_lock,scoped_timed_lock.

       scoped系列的特色就是析构时解锁,默认构造时加锁,这就很好的确定在某个作用域下某线程独占某段代码。

 

mutex+scoped_lock

       #include <boost/thread/thread.hpp>

       #include <boost/thread/mutex.hpp>

       #include <boost/bind.hpp>

       #include <iostream>

       boost::mutex   io_mutex;

       void count(int id)

       {

              for (int i = 0; i < 10; ++i)

              {

                      boost::mutex::scoped_lock    lock(io_mutex);

                      std::cout << id << ": " << i << std::endl;

              }

       }

       int main(int argc, char* argv[])

       {

              boost::thread thrd1(boost::bind(&count, 1));

              boost::thread thrd2(boost::bind(&count, 2));

              thrd1.join();

              thrd2.join();

              return 0;

       }

 

try_mutex+scoped_try_lock

       void loop(void)

       {

              bool running = true;

              while (running)

              {

                      static boost::try_mutex iomutex;

                      {

                              boost::try_mutex::scoped_try_lock    lock(iomutex);//锁定mutex

                              if (lock.owns_lock())

                              {

                                     std::cout << "Get lock." << std::endl;

                              }

                              else

                              {

                                     // To do

                                     std::cout << "Not get lock." << std::endl;

                                     boost::thread::yield(); //释放控制权

                                     continue;

                              }

                      } //lock析构,iomutex解锁

              }

       }

 

timed_mutex+scoped_timed_mutex

       void loop(void)

       {

              bool running = true;

              while (running)

              {

                      typedef boost::timed_mutex MUTEX;

                      typedef MUTEX::scoped_timed_lock LOCK;

                      static MUTEX iomutex;

                      {

                              boost::xtime xt;

                              boost::xtime_get(&xt,boost::TIME_UTC);

                              xt.sec += 1; //超时时间秒

                              LOCK lock(iomutex, xt); //锁定mutex

                              if (lock.owns_lock())

                              {

                                     std::cout << "Get lock." << std::endl;

                              }

                              else

                              {

                                     std::cout << "Not get lock." << std::endl;

                                     boost::thread::yield(); //释放控制权

                              }

                              //::sleep(10000); //长时间

                      } //lock析构,iomutex解锁

                      //::sleep(250);

              }

       }

 

shared_mutex

       应用boost::threadshared_mutex实现singled_write/multi_read的简单例子

       #include <iostream>

       #include <boost/thread/thread.hpp>

       #include <boost/thread/shared_mutex.hpp>

       using namespace std;

       using namespace boost;

       boost::shared_mutex shr_mutex;

       /// 这个是辅助类,能够保证log_info被完整的输出

       class safe_log {

       public:

           static void log(const std::string& log_info) {

               boost::mutex::scoped_lock lock(log_mutex);

               cout << log_info << endl;

           }

       private:

           static boost::mutex log_mutex;

       };

       boost::mutex safe_log::log_mutex;

       void write_process() {

           shr_mutex.lock();

           safe_log::log("begin of write_process");

           safe_log::log("end of write_process");

           shr_mutex.unlock();

       }

       void read_process() {

           shr_mutex.lock_shared();

           safe_log::log("begin of read_process");

           safe_log::log("end of read_process");

           shr_mutex.unlock_shared();

       }

       int main() {

           thread_group threads;

           for (int i = 0; i < 10; ++ i) {

               threads.create_thread(&write_process);

               threads.create_thread(&read_process);

           }

           threads.join_all();

           ::system("PAUSE");

           return 0;

       }

 

条件变量

       有的时候仅仅依靠锁住共享资源来使用它是不够的。有时候共享资源只有某些状态的时候才能够使用。

         比方说,某个线程如果要从堆栈中读取数据,那么如果栈中没有数据就必须等待数据被压栈。这种情

         况下的同步使用互斥体是不够的。另一种同步的方式--条件变量,就可以使用在这种情况下。

       boost::condition  

typedef condition_variable_any condition;

void wait(unique_lock<mutex>& m);

       boost::condition_variable

template<typename lock_type>

void wait(lock_type& m);

 

       #include <boost/thread/thread.hpp>

       #include <boost/thread/mutex.hpp>

       #include <boost/thread/condition.hpp>

       #include <iostream>

       const int BUF_SIZE = 10;

       const int ITERS = 100;

       boost::mutex io_mutex;

       class buffer

       {

       public:

              typedef boost::mutex::scoped_lock scoped_lock;

              buffer()

              : p(0), c(0), full(0)

              {

              }

              void put(int m)

              {

                      scoped_lock lock(mutex);

                      if (full == BUF_SIZE)

                      {

                              {

                                     boost::mutex::scoped_lock lock(io_mutex);

                                     std::cout << "Buffer is full. Waiting..." << std::endl;

                              }

                              while (full == BUF_SIZE)

                                     cond.wait(lock);

                      }

                      buf[p] = m;

                      p = (p+1) % BUF_SIZE;

                      ++full;

                      cond.notify_one();

              }

              int get()

              {

                      scoped_lock lk(mutex);

                      if (full == 0)

                      {

                              {

                                     boost::mutex::scoped_lock lock(io_mutex);

                                     std::cout << "Buffer is empty. Waiting..." << std::endl;

                              }

                              while (full == 0)

                                     cond.wait(lk);

                      }

                      int i = buf[c];

                      c = (c+1) % BUF_SIZE;

                      --full;

                      cond.notify_one();

                      return i;

              }

       private:

                      boost::mutex mutex;

                      boost::condition cond;

                      unsigned int p, c, full;

                      int buf[BUF_SIZE];

       };

       buffer buf;

       void writer()

       {

              for (int n = 0; n < ITERS; ++n)

              {

                      {

                              boost::mutex::scoped_lock lock(io_mutex);

                              std::cout << "sending: " << n << std::endl;

                      }

                      buf.put(n);

              }

       }

       void reader()

       {

              for (int x = 0; x < ITERS; ++x)

               {

                      int n = buf.get();

                      {

                              boost::mutex::scoped_lock lock(io_mutex);

                              std::cout << "received: " << n << std::endl;

                      }

              }

       }

       int main(int argc, char* argv[])

       {

              boost::thread thrd1(&reader);

              boost::thread thrd2(&writer);

              thrd1.join();

              thrd2.join();

              return 0;

       }

 

线程局部存储

       函数的不可重入。

       Boost线程库提供了智能指针boost::thread_specific_ptr来访问本地存储线程(thread local storage)。

       #include <boost/thread/thread.hpp>

       #include <boost/thread/mutex.hpp>

       #include <boost/thread/tss.hpp>

       #include <iostream>

       boost::mutex io_mutex;

       boost::thread_specific_ptr<int> ptr;

       struct count

       {

              count(int id) : id(id) { }

              void operator()()

              {

                      if (ptr.get() == 0)

                              ptr.reset(new int(0));

                      for (int i = 0; i < 10; ++i)

                      {

                              (*ptr)++; // 往自己的线程上加

                              boost::mutex::scoped_lock lock(io_mutex);

                              std::cout << id << ": " << *ptr << std::endl;

                      }

              }

              int id;

       };

       int main(int argc, char* argv[])

       {

              boost::thread thrd1(count(1));

              boost::thread thrd2(count(2));

              thrd1.join();

              thrd2.join();

              return 0;

       }

 

仅运行一次的例程

       如何使得初始化工作(比如说构造函数)也是线程安全的。

       一次实现once routine)。一次实现在一个应用程序只能执行一次。

       Boost线程库提供了boost::call_once来支持一次实现,并且定义了一个标志boost::once_flag及一个初始化这个标志的宏 BOOST_ONCE_INIT

       #include <boost/thread/thread.hpp>

       #include <boost/thread/once.hpp>

       #include <iostream>

       int i = 0;

       boost::once_flag flag = BOOST_ONCE_INIT;

       void init()

       {

              ++i;

       }

       void thread()

       {

              boost::call_once(&init, flag);

       }

       int main(int argc, char* argv[])

       {

              boost::thread thrd1(&thread);

              boost::thread thrd2(&thread);

              thrd1.join();

              thrd2.join();

              std::cout << i << std::endl;

              return 0;

       }

 

Boost线程库的未来

       Boost线程库正在计划加入一些新特性。其中包括boost::read_write_mutex,它可以让多个线程同时从共享区中读取数据,

         但是一次只可能有一个线程向共享区写入数据;boost::thread_barrier,它使得一组线程处于等待状态,知道所有得线程

         都都进入了屏障区;boost::thread_pool,他允许执行一些小的routine而不必每一都要创建或是销毁一个线程。 

       Boost线程库已经作为标准中的类库技术报告中的附件提交给C++标准委员会,它的出现也为下一版C++标准吹响了第一声号角。

         委员会成员对 Boost线程库的初稿给予了很高的评价,当然他们还会考虑其他的多线程库。他们对在C++标准中加入对多线程的

         支持非常感兴趣。从这一点上也可以看出,多线程在C++中的前途一片光明。