平台服务和脚本服务接口对后端PostgreSQL数据库的使用目前采用短暂连接方式,造成多次调用服务时频繁连接和断开数据库,效率很低。
如果共享数据库连接,则会造成多线程访问数据库时的事务冲突,故必须采用连接池来管理对数据库的并发访问,某一线程连接到数据库使用完毕后,不断开数据库连接,而是把连接归还给连接池。
另一线程访问数据库时会首先向连接池申请已经存在的连接,如果连接池中没有空闲连接,或者申请到得连接已经超时失效,再建立新的连接,使用完毕后同样归还到连接池。
这样连接池中的连接数会随着线程压力的增加逐渐增长,直到所有的线程同时工作,达到最多连接数。
由于一个线程可能同时申请多个连接,故连接数可能会大于线程数。连接池在程序结束时销毁全部连接,或者线程在申请到的某一连接失效时销毁该连接。
1
2 class dbConnection
3 {
4 friend class dbConnectionPool;
5 bool is_using;
6
7 public:
8 pqxx::connection pqxx_conn;
9
10 public:
11 dbConnection(string opt)
12 : pqxx_conn(opt),is_using(false)
13 {}
14 };
15
16 typedef boost::shared_ptr<dbConnection> dbConnection_ptr;
17
18 class dbConnectionPool
19 {
20 string m_opt;
21 size_t m_max_num;
22 std::vector<dbConnection_ptr> m_pool;
23 cppx::thread_mutex m_mutex;
24
25 public:
26 dbConnectionPool(string opt,size_t max_num)
27 : m_opt(opt),m_max_num(max_num)
28 {}
29
30 dbConnection_ptr GetConnection(void){
31 ACE_DEBUG((LM_DEBUG,"(%t) LINE %d : %C\n", __LINE__ ,__FUNCTION__));
32
33 cppx::scoped_lock lock(m_mutex);
34 foreach(dbConnection_ptr ptr,m_pool){
35 if( ptr->is_using == false ){
36 ptr->is_using = true;
37
38 if( ptr->pqxx_conn.is_open() ){
39 ACE_DEBUG((LM_DEBUG,"(%t) 找到连接池空闲连接。\n"));
40 return ptr;
41 }
42 }
43 }
44
45 ACE_DEBUG((LM_DEBUG,"(%t) 没有空闲连接,创建新连接。[%d]\n",m_pool.size()));
46
47 // 没有空闲连接,创建新连接
48 dbConnection_ptr ptr(new dbConnection(m_opt));
49 ptr->is_using = true;
50
51 // 找到一个失效的连接,用新连接覆盖
52 bool cover = false;
53 for(size_t idx = 0; idx < m_pool.size(); idx++){
54 dbConnection_ptr ptr2 = m_pool[idx];
55 if( ptr2->is_using == false && ptr2->pqxx_conn.is_open() == false ){
56 m_pool[idx] = ptr;
57 cover = true;
58 ACE_DEBUG((LM_DEBUG,"(%t) 覆盖失效的连接[%d]。\n",idx));
59 break;
60 }
61 }
62 if( !cover ){
63 m_pool.push_back(ptr);
64 }
65
66 return ptr;
67 }
68
69 void ReleaseConnection(dbConnection_ptr ptr_){
70 ACE_DEBUG((LM_DEBUG,"(%t) LINE %d : %C\n", __LINE__ ,__FUNCTION__));
71 ACE_DEBUG((LM_DEBUG,"(%t) 连接使用完毕,归还到连接池。[%d]\n",m_pool.size()));
72
73 cppx::scoped_lock lock(m_mutex);
74 ptr_->is_using = false;
75 }
76 };
77
78 class dbConnectionUser
79 {
80 dbConnectionPool & pool;
81
82 public:
83 dbConnection_ptr conn;
84
85 public:
86 dbConnectionUser(dbConnectionPool & pool_) : pool(pool_) {
87 conn = pool.GetConnection();
88 }
89 ~dbConnectionUser(void){
90 pool.ReleaseConnection(conn);
91 }
92 };
93
使用连接池的方法,只要在建立数据库连接的类中增加连接池成员即可:
1
2 struct ALEE_PlatformService_iPqxx::pimpl_t
3 {
4 #if defined(USE_POOL_DBLINK) && (USE_POOL_DBLINK == 1)
5
6 public:
7 dbConnectionPool m_pool;
8
9 #else
10
为了方便,定义如下的宏:
1 #define DB_CONNECTION() \
2 dbConnectionUser user(pimpl_->m_pool); \
3 if( !user.conn || !user.conn->pqxx_conn.is_open() ) return false; \
4 pqxx::work X(user.conn->pqxx_conn);
5
需要调用数据库查询时,只需要把上面的宏插入try块中:
1
2 try{
3 DB_CONNECTION();
4
5 fuStringCommand cmd;
6 cmd = pimpl_->m_sqls["apx_get_moi_attr_enum"];
7 cmd << X.quote(moi_hash)
8 << X.quote(attr_code);
9
10 result = new defs::xml_row;
11 return pimpl_->QueryRecord(X, cmd, *result);
12 }
13 catch_pqxx_error();
14
一个简单的数据库连接池就完成了,由于多线程并发时动态申请数据库连接,既可以充分发挥数据库并发的好处有避免了共享数据库连接造成事务冲突。完全满足简单的数据库查询需要。
等灯。等灯。 Cppx Inside
posted on 2011-02-28 13:57
风雷九州 阅读(5689)
评论(0) 编辑 收藏 引用