1: #include <muduo/net/TcpServer.h>
2:
3: #include <muduo/base/Atomic.h>
4: #include <muduo/base/Logging.h>
5: #include <muduo/base/Thread.h>
6: #include <muduo/net/EventLoop.h>
7: #include <muduo/net/InetAddress.h>
8:
9: #include <boost/bind.hpp>
10:
11: #include <utility>
12:
13: #include <stdio.h>
14: #include <unistd.h>
15:
16: using namespace muduo;
17: using namespace muduo::net;
18:
19: int numThreads = 0;
20:
21: class DiscardServer
22: {
23: public:
24: DiscardServer(EventLoop* loop, const InetAddress& listenAddr)
25: : loop_(loop),
26: server_(loop, listenAddr, "DiscardServer"),
27: oldCounter_(0),
28: startTime_(Timestamp::now())
29: {
30: server_.setConnectionCallback(
31: boost::bind(&DiscardServer::onConnection, this, _1));
32: server_.setMessageCallback(
33: boost::bind(&DiscardServer::onMessage, this, _1, _2, _3));
34: server_.setThreadNum(numThreads);
35: loop->runEvery(3.0, boost::bind(&DiscardServer::printThroughput, this));
36: }
37:
38: void start()
39: {
40: LOG_INFO << "starting " << numThreads << " threads.";
41: server_.start();
42: }
43:
44: private:
45: void onConnection(const TcpConnectionPtr& conn)
46: {
47: LOG_TRACE << conn->peerAddress().toHostPort() << " -> "
48: << conn->localAddress().toHostPort() << " is "
49: << (conn->connected() ? "UP" : "DOWN");
50: }
51:
52: void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
53: {
54: size_t len = buf->readableBytes();
55: transferred_.add(len);
56: receivedMessages_.incrementAndGet();
57: buf->retrieveAll();
58: }
59:
60: void printThroughput()
61: {
62: Timestamp endTime = Timestamp::now();
63: int64_t newCounter = transferred_.get();
64: int64_t bytes = newCounter - oldCounter_;
65: int64_t msgs = receivedMessages_.getAndSet(0);
66: double time = timeDifference(endTime, startTime_);
67: printf("%4.3f MiB/s %4.3f Ki Msgs/s %6.2f bytes per msg\n",
68: static_cast<double>(bytes)/time/1024/1024,
69: static_cast<double>(msgs)/time/1024,
70: static_cast<double>(bytes)/static_cast<double>(msgs));
71:
72: oldCounter_ = newCounter;
73: startTime_ = endTime;
74: }
75:
76: EventLoop* loop_;
77: TcpServer server_;
78:
79: AtomicInt64 transferred_;
80: AtomicInt64 receivedMessages_;
81: int64_t oldCounter_;
82: Timestamp startTime_;
83: };
84:
85: int main(int argc, char* argv[])
86: {
87: LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
88: if (argc > 1)
89: {
90: numThreads = atoi(argv[1]);
91: }
92: EventLoop loop;
93: InetAddress listenAddr(2009);
94: DiscardServer server(&loop, listenAddr);
95:
96: server.start();
97:
98: loop.loop();
99: }