1: #include <muduo/net/TcpServer.h>
   2:  
   3: #include <muduo/base/Atomic.h>
   4: #include <muduo/base/Logging.h>
   5: #include <muduo/base/Thread.h>
   6: #include <muduo/net/EventLoop.h>
   7: #include <muduo/net/InetAddress.h>
   8:  
   9: #include <boost/bind.hpp>
  10:  
  11: #include <utility>
  12:  
  13: #include <stdio.h>
  14: #include <unistd.h>
  15:  
  16: using namespace muduo;
  17: using namespace muduo::net;
  18:  
  19: int numThreads = 0;
  20:  
  21: class DiscardServer
  22: {  23:  public:
  24:   DiscardServer(EventLoop* loop, const InetAddress& listenAddr)
  25:     : loop_(loop),
  26:       server_(loop, listenAddr, "DiscardServer"),
  27:       oldCounter_(0),
  28:       startTime_(Timestamp::now())
  29:   {  30:     server_.setConnectionCallback(
  31:         boost::bind(&DiscardServer::onConnection, this, _1));
  32:     server_.setMessageCallback(
  33:         boost::bind(&DiscardServer::onMessage, this, _1, _2, _3));
  34:     server_.setThreadNum(numThreads);
  35:     loop->runEvery(3.0, boost::bind(&DiscardServer::printThroughput, this));
  36:   }
  37:  
  38:   void start()
  39:   {  40:     LOG_INFO << "starting " << numThreads << " threads.";
  41:     server_.start();
  42:   }
  43:  
  44:  private:
  45:   void onConnection(const TcpConnectionPtr& conn)
  46:   {  47:     LOG_TRACE << conn->peerAddress().toHostPort() << " -> "
  48:         << conn->localAddress().toHostPort() << " is "
  49:         << (conn->connected() ? "UP" : "DOWN");
  50:   }
  51:  
  52:   void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
  53:   {  54:     size_t len = buf->readableBytes();
  55:     transferred_.add(len);
  56:     receivedMessages_.incrementAndGet();
  57:     buf->retrieveAll();
  58:   }
  59:  
  60:   void printThroughput()
  61:   {  62:     Timestamp endTime = Timestamp::now();
  63:     int64_t newCounter = transferred_.get();
  64:     int64_t bytes = newCounter - oldCounter_;
  65:     int64_t msgs = receivedMessages_.getAndSet(0);
  66:     double time = timeDifference(endTime, startTime_);
  67:     printf("%4.3f MiB/s %4.3f Ki Msgs/s %6.2f bytes per msg\n",  68:         static_cast<double>(bytes)/time/1024/1024,
  69:         static_cast<double>(msgs)/time/1024,
  70:         static_cast<double>(bytes)/static_cast<double>(msgs));
  71:  
  72:     oldCounter_ = newCounter;
  73:     startTime_ = endTime;
  74:   }
  75:  
  76:   EventLoop* loop_;
  77:   TcpServer server_;
  78:  
  79:   AtomicInt64 transferred_;
  80:   AtomicInt64 receivedMessages_;
  81:   int64_t oldCounter_;
  82:   Timestamp startTime_;
  83: };
  84:  
  85: int main(int argc, char* argv[])
  86: {  87:   LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
  88:   if (argc > 1)
  89:   {  90:     numThreads = atoi(argv[1]);
  91:   }
  92:   EventLoop loop;
  93:   InetAddress listenAddr(2009);
  94:   DiscardServer server(&loop, listenAddr);
  95:  
  96:   server.start();
  97:  
  98:   loop.loop();
  99: }