功能:market_subscriber 能够向 market_publisher 请求订阅某些类型的消息,当 market_publisher 有该类型的消息时,需要把它推送给订阅服务。
3. market_subscriber 向 market_publisher 发起订阅请求,market_publisher 根据订阅请求参数,长连接 market_subscriber 提供的消息接收端口。
(2)market_publisher 推送订阅消息时,发现连接断开,尝试重连。 考虑 market_publisher 有重启的情况,收到的订阅请求参数需要做持久化。 ==> TODO:增加一个 market_subscriber 到 market_publisher 的取消订阅的请求。
1. market_subscriber.thrift -- market_subscriber 实现的服务接口
2. market_publisher.thrift -- market_publisher 实现的服务接口
3. subscriber_server.cpp
4. pubsher_server.cpp
/*
* Main.cpp
*/#include "../gen-cpp/SubscriberService.h"
#include "../gen-cpp/PublisherService.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/concurrency/ThreadManager.h>
#include <boost/thread/thread.hpp>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::apache::thrift::concurrency;
using boost::shared_ptr;
using namespace ::market_publisher;
using namespace ::market_subscriber;
std::list<boost::shared_ptr<SubscriberServiceClient> > g_lstSubscriberServiceClient;
boost::mutex g_mutexSubscriberServiceClient;
class PublisherServiceHandler :
virtual public PublisherServiceIf {
public:
PublisherServiceHandler() {
// Your initialization goes here
}
void subscribeMarket(SubscribeMarketAck& _return,
const SubscribeMarketParam& param) {
// Your implementation goes here
std::cout << "subscribeMarket, ip=" << param.ip << ", port=" << param.port << "." << std::endl;
boost::shared_ptr<TSocket> socket(
new TSocket(param.ip, param.port));
boost::shared_ptr<TTransport> transport(
new TBufferedTransport(socket));
boost::shared_ptr<TProtocol> protocol(
new TBinaryProtocol(transport));
boost::shared_ptr<SubscriberServiceClient> client(
new SubscriberServiceClient(protocol));
int error_code = 1;
// fail to open
try {
transport->open();
error_code = 0;
{
// add to subscribes list
boost::mutex::scoped_lock
lock(g_mutexSubscriberServiceClient);
g_lstSubscriberServiceClient.push_back(client);
}
}
catch (TException& e)
{
std::cout << "Exception: " << e.what() << std::endl;
_return.__set_error_info(e.what());
}
catch (std::exception& e)
{
std::cout << "Exception: " << e.what() << std::endl;
_return.__set_error_info(e.what());
}
catch (
)
{
char buff[100];
snprintf(buff, 99, "fail to open %s:%d.", param.ip.c_str(), param.port);
std::cout << "Exception: " << buff << std::endl;
_return.__set_error_info(buff);
}
_return.__set_error_code(error_code);
}
void getStockBaseInfo(GetStockBaseInfoAck& _return,
const GetStockBaseInfoParam& param) {
// Your implementation goes here
printf("getStockBaseInfo\n");
}
};
int32_t getCurTime()
{
time_t t = time(0);
char tmp[64];
strftime(tmp,
sizeof(tmp), "%H%M%S", localtime(&t));
return atoi(tmp);
}
// send markets to subscribers.
void publisherServiceThread()
{
while (1)
{
std::vector<Snapshot> lstSnapshot;
Snapshot snapshot;
snapshot.nSecurityID = 100000001;
snapshot.nTime = getCurTime() * 1000 + rand() % 1000;
snapshot.nTradingPhaseCode = 2;
snapshot.nPreClosePx = 240500;
snapshot.nOpenPx = 250500;
snapshot.nHighPx = 250800;
snapshot.nLowPx = 240800;
snapshot.nLastPx = 250300;
snapshot.llTradeNum = 15000;
snapshot.llVolume = 6000000;
snapshot.llValue = 15030000000;
lstSnapshot.push_back(snapshot);
boost::mutex::scoped_lock
lock(g_mutexSubscriberServiceClient);
std::list<boost::shared_ptr<SubscriberServiceClient> >::iterator iter = g_lstSubscriberServiceClient.begin();
while (iter != g_lstSubscriberServiceClient.end())
{
try {
(*iter)->sendSnapshot(lstSnapshot);
iter++;
}
catch (TException& e)
{
std::cout << "Exception: " << e.what() << std::endl;
iter = g_lstSubscriberServiceClient.erase(iter);
}
catch (std::exception& e)
{
std::cout << "Exception: " << e.what() << std::endl;
iter = g_lstSubscriberServiceClient.erase(iter);
}
catch (
)
{
std::cout << "Exception: fail to call sendSnapshot()." << std::endl;
iter = g_lstSubscriberServiceClient.erase(iter);
}
}
sleep(3);
}
}
int main(
int argc,
char **argv)
{
int port = 9090;
shared_ptr<PublisherServiceHandler> handler(
new PublisherServiceHandler());
shared_ptr<TProcessor> processor(
new PublisherServiceProcessor(handler));
shared_ptr<TServerTransport> serverTransport(
new TServerSocket(port));
shared_ptr<TTransportFactory> transportFactory(
new TBufferedTransportFactory());
shared_ptr<TProtocolFactory> protocolFactory(
new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory,
protocolFactory);
boost::thread thrd(&publisherServiceThread);
printf("Starting the server
\n");
server.serve();
printf("done.\n");
return 0;
}