Khan's Notebook GCC/GNU/Linux Delphi/Window Java/Anywhere

路漫漫,长修远,我们不能没有钱
随笔 - 172, 文章 - 0, 评论 - 257, 引用 - 0
数据加载中……

boost中asio网络库多线程并发处理实现,以及asio在多线程模型中线程的调度情况和线程安全。 (转载)

转载自(http://www.cnblogs.com/lidabo/p/3906055.html)


1、实现多线程方法:

    其实就是多个线程同时调用io_service::run

1         for (int i = 0; i != m_nThreads; ++i)
2         {
3             boost::shared_ptr<boost::thread> pTh(new boost::thread(
4                 boost::bind(&boost::asio::io_service::run,&m_ioService)));
5             m_listThread.push_back(pTh);
6         }


2、多线程调度情况:

    asio规定:只能在调用io_service::run的线程中才能调用事件完成处理器。


注:事件完成处理器就是你async_accept、async_write等注册的句柄,类似于回调的东西。


单线程:

    如果只有一个线程调用io_service::run,根据asio的规定,事件完成处理器也只能在这个线程中执行。也就是说,你所有代码都在同一个线程中运行,因此变量的访问是安全的。


多线程:

    如果有多个线程同时调用io_service::run以实现多线程并发处理。对于asio来说,这些线程都是平等的,没有主次之分。如果你投递的一个请求比如async_write完成时,asio将随机的激活调用io_service::run的线程。并在这个线程中调用事件完成处理器(async_write当时注册的句柄)。如果你的代码耗时较长,这个时候你投递的另一个async_write请求完成时,asio将不等待你的代码处理完成,它将在另外的一个调用io_service::run线程中,调用async_write当时注册的句柄。也就是说,你注册的事件完成处理器有可能同时在多个线程中调用。


当然你可以使用 boost::asio::io_service::strand让完成事件处理器的调用,在同一时间只有一个, 比如下面的的代码:

  socket_.async_read_some(boost::asio::buffer(buffer_),
      strand_.wrap(
        boost::bind(&connection::handle_read, shared_from_this(),
          boost::asio::placeholders::error,
          boost::asio::placeholders::bytes_transferred)));

...

boost::asio::io_service::strand strand_;

 

    此时async_read_som完成后掉用handle_read时,必须等待其它handle_read调用完成时才能被执行(async_read_som引起的handle_read调用)。

    多线程调用时,还有一个重要的问题,那就是无序化。比如说,你短时间内投递多个async_write,那么完成处理器的调用并不是按照你投递async_write的顺序调用的。asio第一次调用完成事件处理器,有可能是第二次async_write返回的结果,也有可能是第3次的。使用strand也是这样的。strand只是保证同一时间只运行一个完成处理器,但它并不保证顺序。

 


 

代码测试:

服务器:

将下面的代码编译以后,使用cmd命令提示符下传人参数<IP> <port> <threads>调用

比如:test.exe 0.0.0.0 3005 10   

客服端 使用windows自带的telnet

cmd命令提示符:

telnet 127.0.0.1 3005

 

原理:客户端连接成功后,同一时间调用100次boost::asio::async_write给客户端发送数据,并且在完成事件处理器中打印调用序号,和线程ID。

核心代码:


 1     void start()
 2     {
 3         for (int i = 0; i != 100; ++i)
 4         {
 5             boost::shared_ptr<string> pStr(new string);
 6             *pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
 7             *pStr += "\r\n";
 8             boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
 9                 boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(),
10                  boost::asio::placeholders::error,
11                  boost::asio::placeholders::bytes_transferred,
12                  pStr,i)
13                 );
14         }
15     }
16 
17 //去掉 boost::mutex::scoped_lock lock(m_ioMutex); 效果更明显。
18 
19     void HandleWrite(const boost::system::error_code& error
20         ,std::size_t bytes_transferred
21         ,boost::shared_ptr<string> pStr,int nIndex)
22     {
23         if (!error)
24         {
25             boost::mutex::scoped_lock lock(m_ioMutex);
26             cout << "发送序号=" << nIndex << ",线程id=" << boost::this_thread::get_id() << endl;
27         }
28         else
29         {
30             cout << "连接断开" << endl;
31         }
32     }


完整代码:

  1 #include <boost/bind.hpp>
  2 #include <boost/shared_ptr.hpp>
  3 #include <boost/enable_shared_from_this.hpp>
  4 #include <boost/asio.hpp>
  5 #include <boost/lexical_cast.hpp>
  6 #include <boost/thread.hpp>
  7 #include <boost/thread/mutex.hpp>
  8 #include <string>
  9 #include <iostream>
 10 
 11 
 12 using std::cout;
 13 using std::endl;
 14 using std::string;
 15 using boost::asio::ip::tcp;
 16 
 17 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 18 
 19 
 20 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 21 
 22 class CMyTcpConnection : public boost::enable_shared_from_this<CMyTcpConnection>
 23 {
 24 public:
 25     CMyTcpConnection(boost::asio::io_service &ser)
 26         :m_nSocket(ser)
 27     {
 28     }
 29 
 30     typedef boost::shared_ptr<CMyTcpConnection> CPMyTcpCon;
 31 
 32 
 33     static CPMyTcpCon CreateNew(boost::asio::io_service& io_service) {
 34         return CPMyTcpCon(new CMyTcpConnection(io_service));
 35     }
 36 
 37 
 38    
 39 public:
 40     void start()
 41     {
 42         for (int i = 0; i != 100; ++i) {
 43             boost::shared_ptr<string> pStr(new string);
 44             *pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
 45             *pStr += "\r\n";
 46             boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
 47                 boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, pStr,i)
 48                 );
 49         }
 50     }
 51 
 52 
 53     tcp::socket& socket()
 54     {
 55         return m_nSocket;
 56     }
 57 
 58 
 59 private:
 60     void HandleWrite(const boost::system::error_code& error ,std::size_t bytes_transferred ,boost::shared_ptr<string> pStr,int nIndex)
 61     {
 62         if (!error) {
 63             boost::mutex::scoped_lock lock(m_ioMutex);
 64             cout << "发送序号=" << nIndex << ",线程id=" << boost::this_thread::get_id() << endl;
 65         } else {
 66             cout << "连接断开" << endl;
 67         }
 68     }
 69 
 70 public:
 71 
 72 private:
 73     tcp::socket m_nSocket;
 74     boost::mutex m_ioMutex;
 75 };
 76 
 77 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 78 
 79 
 80 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 81 
 82 class CMyService : private boost::noncopyable
 83 {
 84 public:
 85     CMyService(string const &strIP,string const &strPort,int nThreads)
 86         :m_tcpAcceptor(m_ioService)
 87         ,m_nThreads(nThreads)
 88     {
 89         tcp::resolver resolver(m_ioService);
 90         tcp::resolver::query query(strIP,strPort);
 91         tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
 92         boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
 93         m_tcpAcceptor.open(endpoint.protocol());
 94         m_tcpAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
 95         m_tcpAcceptor.bind(endpoint);
 96         m_tcpAcceptor.listen();
 97 
 98 
 99         StartAccept();
100     }
101 
102     ~CMyService(){Stop();}
103 
104 public:
105     void Stop() 
106     { 
107         m_ioService.stop();
108         for (std::vector<boost::shared_ptr<boost::thread>>::const_iterator it = m_listThread.cbegin();
109             it != m_listThread.cend(); ++ it)
110         {
111             (*it)->join();
112         }
113     }
114     void Start() {
115         for (int i = 0; i != m_nThreads; ++i) {
116             boost::shared_ptr<boost::thread> pTh( new boost::thread( boost::bind(&boost::asio::io_service::run, &m_ioService) ) );
117             m_listThread.push_back(pTh);
118         }
119     }
120 private:
121     void HandleAccept(const boost::system::error_code& error ,boost::shared_ptr<CMyTcpConnection> newConnect)
122     {
123         if (!error) {
124             newConnect->start();
125         }
126         StartAccept();
127     }
128 
129 
130     void StartAccept()
131     {
132         CMyTcpConnection::CPMyTcpCon newConnect = CMyTcpConnection::CreateNew(m_tcpAcceptor.get_io_service());
133         m_tcpAcceptor.async_accept( newConnect->socket(), vboost::bind(&CMyService::HandleAccept, this, boost::asio::placeholders::error, newConnect) );
134     }
135 
136 
137 private:
138     boost::asio::io_service m_ioService;
139     boost::asio::ip::tcp::acceptor m_tcpAcceptor;
140     std::vector<boost::shared_ptr<boost::thread>> m_listThread;
141     std::size_t m_nThreads;
142 };
143 
144 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
145 
146 
147 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
148 
149 int main(int argc, char* argv[])
150 {
151     try
152     {
153         if (argc != 4)
154         {
155             std::cerr << "<IP> <port> <threads>\n";
156             return 1;
157         }
158         int nThreads = boost::lexical_cast<int>(argv[3]);
159         CMyService mySer(argv[1], argv[2], nThreads);
160         mySer.Start();
161         getchar();
162         mySer.Stop();
163     } catch (std::exception& e) {
164         std::cerr << "Exception: " << e.what() << "\n";
165     }
166     return 0;
167 }



## 相关函数介绍

### boost::asio::ip::tcp::resolver

iterator resolve(
    const endpoint_type & e,
    boost::system::error_code & ec);
Return Value

A forward-only iterator that can be used to traverse the list of endpoint entries. Returns a default constructed iterator if an error occurs.

Remarks

A default constructed iterator represents the end of the list.

A successful call to this function is guaranteed to return at least one entry.


tcp::resolver一般和tcp::resolver::query结合用,通过query这个词顾名思义就知道它是用来查询socket的相应信息,一般而言我们关心socket的东东有address,port而已,通过 tcp::resolver很容易实现设置和查询,它通过query把字符串格式的ip如192.168.0.200或主机名 http://localhost,端口“8080”等转化成socket内部表示格式,这样我们应用的时候可以直接使用字符串的形式,而且不用再担心 socket的字节顺序转化问题。示例如下:


1     boost::asio::io_service io_service ;  
2     boost::asio::ip::tcp::resolver resolver(io_service);  
3     boost::asio::ip::tcp::resolver::query query("localhost""9000");

还 有要说明的是, boost::asio把通讯双方(server, client)都用endpoint的表示,所以endpoint中的address, port 分别封装了ip和端口。貌似resolver和endpoint不相干,于是乎出现tcp::resolver::iterator了,它是 resolver的迭代器,其实就是endpoint的指针,那么就可以这样:
 1     boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);  
 2     boost::asio::ip::tcp::resolver::iterator end;     //默认构造的resolver::iterator,用来代表失败
 3     boost::system::error_code error = boost::asio::error::host_not_found;  
 4     boost::asio::ip::tcp::endpoint endpoint;  
 5     while (error && endpoint_iterator != end) {  
 6         endpoint = *endpoint_iterator ;    //返回的是迭代器,需要解引用成endpoint
 7         socket.close();    //先close,再connect
 8         socket.connect(endpoint, error);  
 9         endpoint_iterator++ ;  
10     } 

得到endpoint后就好说啦,endpoint.address().to_string()就能够返回string格式的ip地址,endpoint.port()返回端口。

其实endpoint 完全可以自己构造,方法也是很简单的,tcp::endpoint(tcp::v4(), (unsigned short)9000) 这个是server端的用法,tcp::v4()直接返回自己的address,如果用于client那么需要设置server的ip ,实现如下:

1 
2     boost::system::error_code error = boost::asio::error::host_not_found;  
3     boost::asio::ip::address add;  
4     add.from_string("127.0.0.1");  
5     tcp::endpoint endpoint(add, short(9000));  
6     socket.connect(endpoint, error);  
7 

这样不使用resolver也是可以的。
还有更神奇的:

 1 
 2     boost::asio::io_service ioservice ;  
 3     boost::asio::io_service my_io_service ;  
 4     boost::asio::ip::tcp::resolver resolver(my_io_service);  
 5     boost::asio::ip::tcp::resolver::query query("www.google.com""http");  
 6     boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query);  
 7     boost::asio::ip::tcp::resolver::iterator end; // End marker.  
 8       
 9     while (iter != end) {  
10       boost::asio::ip::tcp::endpoint endpoint = *iter++;  
11       std::cout << endpoint << std::endl;  
12     }  
13 


这样有发现一个新的用途,通过resolver迭代可以得到多个节点endpoint,比如google 就有好几个ip。
上面这个例子的运行结果:

1         74.125.128.106:80  
2         74.125.128.147:80  
3         74.125.128.99:80  
4         74.125.128.103:80  
5         74.125.128.104:80  
6         74.125.128.105:80




### boost::mutex::scoped_lock

boost::mutex::scoped_lock lock(m_ioMutex);

其依赖RAII机制, 在过了作用域之后对锁进行自动释放和回收.其实现代码如图所示

boost_1_60_0/boost/asio/detail/scoped_lock.hpp:

  1 //
  2 // detail/scoped_lock.hpp
  3 // ~~~~~~~~~~~~~~~~~~~~~~
  4 //
  5 // Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6 //
  7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9 //
 10 
 11 #ifndef BOOST_ASIO_DETAIL_SCOPED_LOCK_HPP
 12 #define BOOST_ASIO_DETAIL_SCOPED_LOCK_HPP
 13 
 14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
 15 # pragma once
 16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
 17 
 18 #include <boost/asio/detail/noncopyable.hpp>
 19 
 20 #include <boost/asio/detail/push_options.hpp>
 21 
 22 namespace boost {
 23 namespace asio {
 24 namespace detail {
 25 
 26 // Helper class to lock and unlock a mutex automatically.
 27 template <typename Mutex>
 28 class scoped_lock
 29   : private noncopyable
 30 {
 31 public:
 32   // Tag type used to distinguish constructors.
 33   enum adopt_lock_t { adopt_lock };
 34 
 35   // Constructor adopts a lock that is already held.
 36   scoped_lock(Mutex& m, adopt_lock_t)
 37     : mutex_(m),
 38       locked_(true)
 39   {
 40   }
 41 
 42   // Constructor acquires the lock.
 43   explicit scoped_lock(Mutex& m)
 44     : mutex_(m)
 45   {
 46     mutex_.lock();
 47     locked_ = true;
 48   }
 49 
 50   // Destructor releases the lock.
 51   ~scoped_lock()
 52   {
 53     if (locked_)
 54       mutex_.unlock();
 55   }
 56 
 57   // Explicitly acquire the lock.
 58   void lock()
 59   {
 60     if (!locked_)
 61     {
 62       mutex_.lock();
 63       locked_ = true;
 64     }
 65   }
 66 
 67   // Explicitly release the lock.
 68   void unlock()
 69   {
 70     if (locked_)
 71     {
 72       mutex_.unlock();
 73       locked_ = false;
 74     }
 75   }
 76 
 77   // Test whether the lock is held.
 78   bool locked() const
 79   {
 80     return locked_;
 81   }
 82 
 83   // Get the underlying mutex.
 84   Mutex& mutex()
 85   {
 86     return mutex_;
 87   }
 88 
 89 private:
 90   // The underlying mutex.
 91   Mutex& mutex_;
 92 
 93   // Whether the mutex is currently locked or unlocked.
 94   bool locked_;
 95 };
 96 
 97 // namespace detail
 98 // namespace asio
 99 // namespace boost
100 
101 #include <boost/asio/detail/pop_options.hpp>
102 
103 #endif // BOOST_ASIO_DETAIL_SCOPED_LOCK_HPP
104 


### lexical_cast

lexical_cast是boost中一个非常有用,常用,好用的库,我现在的小数据转换用的都是lexical_cast。
lexical_cast最大的特点是安全,包括长度安全,类型安全。

使用方式:

1 #include <boost/lexical_cast.hpp> 
2 
3 const double PI=3.1415926535;
4 string str;
5 str = lexical_cast<string>(PI); 
6 
7 string str="3.1415926535";
8 double PI=lexical_cast<double>(str); 
9 

如果转换失败, 抛出bad_lexical_cast异常

其实现方式如下:

boost_1_60_0/boost/lexical_cast.hpp

  1 // Copyright Kevlin Henney, 2000-2005.
  2 // Copyright Alexander Nasonov, 2006-2010.
  3 // Copyright Antony Polukhin, 2011-2014.
  4 //
  5 // Distributed under the Boost Software License, Version 1.0. (See
  6 // accompanying file LICENSE_1_0.txt or copy at
  7 // http://www.boost.org/LICENSE_1_0.txt)
  8 //
  9 // what:  lexical_cast custom keyword cast
 10 // who:   contributed by Kevlin Henney,
 11 //        enhanced with contributions from Terje Slettebo,
 12 //        with additional fixes and suggestions from Gennaro Prota,
 13 //        Beman Dawes, Dave Abrahams, Daryle Walker, Peter Dimov,
 14 //        Alexander Nasonov, Antony Polukhin, Justin Viiret, Michael Hofmann,
 15 //        Cheng Yang, Matthew Bradbury, David W. Birdsall, Pavel Korzh and other Boosters
 16 // when:  November 2000, March 2003, June 2005, June 2006, March 2011 - 2014
 17 
 18 #ifndef BOOST_LEXICAL_CAST_INCLUDED
 19 #define BOOST_LEXICAL_CAST_INCLUDED
 20 
 21 #include <boost/config.hpp>
 22 #ifdef BOOST_HAS_PRAGMA_ONCE
 23 #   pragma once
 24 #endif
 25 
 26 #if defined(BOOST_NO_STRINGSTREAM) || defined(BOOST_NO_STD_WSTRING)
 27 #define BOOST_LCAST_NO_WCHAR_T
 28 #endif
 29 
 30 #include <boost/range/iterator_range_core.hpp>
 31 #include <boost/lexical_cast/bad_lexical_cast.hpp>
 32 #include <boost/lexical_cast/try_lexical_convert.hpp>
 33 
 34 namespace boost 
 35 {
 36     template <typename Target, typename Source>
 37     inline Target lexical_cast(const Source &arg)
 38     {
 39         Target result;
 40 
 41         if (!boost::conversion::detail::try_lexical_convert(arg, result)) {
 42             boost::conversion::detail::throw_bad_cast<Source, Target>();
 43         }
 44 
 45         return result;
 46     }
 47 
 48     template <typename Target>
 49     inline Target lexical_cast(const char* chars, std::size_t count)
 50     {
 51         return ::boost::lexical_cast<Target>(
 52             ::boost::iterator_range<const char*>(chars, chars + count)
 53         );
 54     }
 55 
 56     template <typename Target>
 57     inline Target lexical_cast(const unsigned char* chars, std::size_t count)
 58     {
 59         return ::boost::lexical_cast<Target>(
 60             ::boost::iterator_range<const unsigned char*>(chars, chars + count)
 61         );
 62     }
 63 
 64     template <typename Target>
 65     inline Target lexical_cast(const signed char* chars, std::size_t count)
 66     {
 67         return ::boost::lexical_cast<Target>(
 68             ::boost::iterator_range<const signed char*>(chars, chars + count)
 69         );
 70     }
 71 
 72 #ifndef BOOST_LCAST_NO_WCHAR_T
 73     template <typename Target>
 74     inline Target lexical_cast(const wchar_t* chars, std::size_t count)
 75     {
 76         return ::boost::lexical_cast<Target>(
 77             ::boost::iterator_range<const wchar_t*>(chars, chars + count)
 78         );
 79     }
 80 #endif
 81 #ifndef BOOST_NO_CXX11_CHAR16_T
 82     template <typename Target>
 83     inline Target lexical_cast(const char16_t* chars, std::size_t count)
 84     {
 85         return ::boost::lexical_cast<Target>(
 86             ::boost::iterator_range<const char16_t*>(chars, chars + count)
 87         );
 88     }
 89 #endif
 90 #ifndef BOOST_NO_CXX11_CHAR32_T
 91     template <typename Target>
 92     inline Target lexical_cast(const char32_t* chars, std::size_t count)
 93     {
 94         return ::boost::lexical_cast<Target>(
 95             ::boost::iterator_range<const char32_t*>(chars, chars + count)
 96         );
 97     }
 98 #endif
 99 
100 // namespace boost
101 
102 #undef BOOST_LCAST_NO_WCHAR_T
103 
104 #endif // BOOST_LEXICAL_CAST_INCLUDED
105 
106 



posted on 2017-10-14 11:44 Khan 阅读(1712) 评论(0)  编辑 收藏 引用 所属分类: GCC/G++跨平台开发


只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理