内存消息队列是服务器端常用的基础组件,他使得符合生产者-消费者模型的两个线程或两组线程之间的通讯看起来更加清晰,即生产者将消息压入队列,消费者从队列里面取走消息并处理,具体到网络服务器结构中,生产者线程是网络接收线程,消费者线程是逻辑处理线程,网络线程不停的将接收到的数据放到全局消息队列中,逻辑处理线程不停的从全局消息队列中取走消息并处理。
system v消息队列接口非常简单,主要是msgsnd,msgrcv,每个消息的结构中都包含一个类型信息,这样在msgrcv时就可以选择只读取某个类型的消息,如果类型传为0,则不考虑类型,读取第一个消息。根据消息类型来获取消息是非常有用的,考虑下述简单服务器结构:
client server dbproxy
其中,server是主要的业务处理服务器,dbproxy是数据库代理。以server为例,他需要处理两方面的消息:1。来自client的消息;2。来自dbproxy的消息。定义如下枚举:
enum eQueueType
{
QUEUE_TYPE_CLIENT = 1,
QUEUE_TYPE_WORLD = 2,
};
当网络线程收到来自client的消息时,将消息放到QUEUE_TYPE_CLIENT 类型的队列中,当收到dbproxy的消息时,放到QUEUE_TYPE_WORLD中,然后设置两个或两组线程分别处理
QUEUE_TYPE_CLIENT 队列和QUEUE_TYPE_WORLD队列,结构非常清晰。在具体实现MessageQueue时,是基于ACE的消息队列的,在内部设置了一个消息队列的map,也就是多个消息队列,但在接口上就是一个队列,MessageQueue只提供了根据类型来获取队列中的消息,没有提供获取整个队列组中第一个消息的功能。
考虑使用全局消息队列是我思考了一段时间才决定的,之前的做法是类似Active Object的,在ACE里就是ACE_Task类,该类即具备线程的功能,也包含了一个消息队列,在使用时重载他的svc成员即可。对不熟悉ACE的人来说,ACE_Task在设计上比较复杂,还牵扯到队列的操作,如此以来,提供更友好的接口就显得很有必要,因为线程的使用大家都已经非常熟悉,消息队列在概念上也很清晰,那么思路就是:我可以自由的创建线程,在线程里从全局消息队列中读取消息。看似和ACE_Task没有根本区别,其实全局消息队列的C风格的接口,而不是派生的方式,大大降低了复杂度。
代码贴上,方便以后查阅:
MessageQueue.h
#pragma once
#include <map>
#include <ace/Synch.h>
#include <ace/Message_Queue.h>
#include <ace/Singleton.h>
using std::map;
class MessageQueue
{
public:
MessageQueue(void);
~MessageQueue(void);
public:
static MessageQueue* Instance();
ACE_Message_Block* GetMessage(int nType);
ACE_Message_Block* GetMessage(int nType,int nSeconds );//超时nSeconds秒
bool PutMessage(int nType,ACE_Message_Block* pMsg);
bool PutMessage(int nType,ACE_Message_Block* pMsg,int nSeconds);//超时nSeconds秒
protected:
//获取某类型的消息队列,没有则创建
ACE_Message_Queue<ACE_MT_SYNCH>* GetQueue(int nType);
private:
map<int,ACE_Message_Queue<ACE_MT_SYNCH>* > m_QueueMap;
ACE_Thread_Mutex m_QueueMapMutex;
};
typedef ACE_Singleton<MessageQueue,ACE_Thread_Mutex> MessageQueueSingleton;
MessageQueue.cpp
#include <cassert>
#include <ace/Guard_T.h>
#include "MessageQueue.h"
// MessageQueue.cpp : 定义控制台应用程序的入口点。
//
MessageQueue::MessageQueue(void)
{
}
MessageQueue::~MessageQueue(void)
{
ACE_GUARD(ACE_Thread_Mutex,g,m_QueueMapMutex);
for (map<int,ACE_Message_Queue<ACE_MT_SYNCH>* >::iterator iter = m_QueueMap.begin(); iter != m_QueueMap.end();++iter)
{
ACE_Message_Queue<ACE_MT_SYNCH>* pQueue = iter->second;
pQueue->close();
delete pQueue;
}
m_QueueMap.clear();
}
MessageQueue* MessageQueue::Instance()
{
return MessageQueueSingleton::instance();
}
ACE_Message_Block* MessageQueue::GetMessage( int nType )
{
ACE_Message_Queue<ACE_MT_SYNCH>* pQueue = GetQueue(nType);
ACE_Message_Block* pMsg = NULL;
int nRet = pQueue->dequeue(pMsg);
if (nRet != -1)
{
return pMsg;
}
else
{
return NULL;
}
}
ACE_Message_Block* MessageQueue::GetMessage( int nType,int nSeconds )
{
assert( nSeconds > 0);
ACE_Time_Value timeout = ACE_OS::gettimeofday();
timeout += nSeconds;
ACE_Message_Queue<ACE_MT_SYNCH>* pQueue = GetQueue(nType);
ACE_Message_Block* pMsg = NULL;
int nRet = pQueue->dequeue(pMsg,&timeout);
if (nRet != -1)
{
return pMsg;
}
else
{
return NULL;
}
}
bool MessageQueue::PutMessage( int nType,ACE_Message_Block* pMsg )
{
ACE_Message_Queue<ACE_MT_SYNCH>* pQueue = GetQueue(nType);
int nRet = pQueue->enqueue(pMsg);
return nRet != -1;
}
bool MessageQueue::PutMessage( int nType,ACE_Message_Block* pMsg,int nSeconds )
{
assert( nSeconds > 0);
ACE_Time_Value timeout = ACE_OS::gettimeofday();
timeout += nSeconds;
ACE_Message_Queue<ACE_MT_SYNCH>* pQueue = GetQueue(nType);
int nRet = pQueue->enqueue(pMsg,&timeout);
return nRet != -1;
}
ACE_Message_Queue<ACE_MT_SYNCH>* MessageQueue::GetQueue( int nType )
{
assert( nType >= 0 );
ACE_GUARD_RETURN(ACE_Thread_Mutex,g,m_QueueMapMutex,NULL);
if (m_QueueMap.find(nType) != m_QueueMap.end())
{
return m_QueueMap[nType];
}
else
{
ACE_Message_Queue<ACE_MT_SYNCH>* pQueue = new ACE_Message_Queue<ACE_MT_SYNCH>();
m_QueueMap[nType] = pQueue;
return pQueue;
}
}
test.cpp
#include "MessageQueue.h"
#include <iostream>
#include <ace/OS.h>
using namespace std;
enum eQueueType
{
QUEUE_TYPE_CLIENT = 1,
QUEUE_TYPE_WORLD = 2,
};
int main(int argc, char* argv[])
{
MessageQueue* pQueue = MessageQueue::Instance();
ACE_Message_Block* pClientMsg = new ACE_Message_Block(100);
pClientMsg->copy("client msg");
pQueue->PutMessage(QUEUE_TYPE_CLIENT,pClientMsg);
ACE_Message_Block* pWorldMsg = new ACE_Message_Block(100);
pWorldMsg->copy("world msg");
pQueue->PutMessage(QUEUE_TYPE_WORLD,pWorldMsg);
ACE_Message_Block* pTemp = NULL;
pTemp = pQueue->GetMessage(QUEUE_TYPE_CLIENT);
cout << pTemp->rd_ptr() << endl;
pTemp = pQueue->GetMessage(QUEUE_TYPE_WORLD);
cout << pTemp->rd_ptr() << endl;
cout << "begin time : " << ACE_OS::time(NULL) << endl;
pTemp = pQueue->GetMessage(QUEUE_TYPE_CLIENT,10);
cout << "end time : " << ACE_OS::time(NULL) << endl;
if (pTemp == NULL)
{
cout << "time out when get client msg" << endl;
}
return 0;
}