# -*- coding: utf-8 -*- #!/usr/bin/env python
import SocketServer from SocketServer import StreamRequestHandler as SRH from time import ctime import struct import json import urllib import urllib2 import socket import time import traceback
import threading import globalres
host = '' port = 5004 addr = (host,port)
url = 'http://yoururl'
''' First, you must create a request handler class by subclassing the BaseRequestHandler class and overriding its handle() method; this method will process incoming requests. ''' class TcpNetHandler(SRH): def handle(self): print '[server] got connection from ',self.client_address #self.wfile.write('connection %s:%s at %s succeed!' % (host,port,ctime())) old_buf = '' new_buf = '' all_buf = '' while True: try: new_buf = self.request.recv(1024*1024) if not new_buf: print '[server] lost connection from ',self.client_address break all_buf = old_buf + new_buf if len(all_buf) < 16 : continue processedlen=self.processRecvdBuffer(all_buf) if -1 == processedlen : break remain_len = len(all_buf) if remain_len > processedlen : old_buf = all_buf[processedlen:] print "[server] process:%d ,remain:%d"%(processedlen,remain_len) #print "[server] finish once" except: print 'error in ThreadedTCPRequestHandler :%s' % (traceback.format_exc()) #return def processRecvdBuffer(self,recvbuf): processedlen = 0 total_len= len(recvbuf) remain_buf = recvbuf while True : (retcode,size,cmd) = self.unpackFromBuffer(remain_buf) if -1 == retcode : print '[server] unpackFromBuffer error: ',retcode return -1 if 0 == retcode : print '[server] next time again' break netpacket = remain_buf[:size] if cmd == 0: print "[server] client-keeplive,from:",self.client_address self.request.send(netpacket) elif cmd == 102: globalres.netpackqueue.put(netpacket) #print "102" pass else : print "other ",cmd processedlen = processedlen+size if processedlen >=total_len : break remain_buf=remain_buf[size:] pass return processedlen ''' retcode : -1,failed;0,next time again;1,success, ''' def unpackFromBuffer(self,bytebuf1): packet_header = bytebuf1[:16] (size,cmd,subcmd,magic,version)=struct.unpack('IHHII',packet_header) if cmd == 0 and size==16: #print "[server] client-keeplive" self.request.send(packet_header) return (1,16,0) if size < 16 or size > 4096: print "[server] invalid size",size return (-1,-1,cmd) if len(bytebuf1) < size: print "[server] buf not enough",len(bytebuf1),size return (0,0,cmd) return (1,size,cmd)
class HttpRequestThread(threading.Thread): def __init__(self, queue,htint): threading.Thread.__init__(self) self.queue = queue self.Ht=htint #线程ID def run(self): while True: if self.queue.empty() : time.sleep(0.01) continue netpacket = self.queue.get() #print "[Thread%d] get from qsize:%d "%(self.Ht,self.queue.qsize()) try: self.processNetPacket(0,netpacket) finally: self.queue.task_done() def processNetPacket(self,cmd,netpacket): self.process_cmd102_post_pushmsg(netpacket) #print "[server] dispatchNetPacket",(cmd,netpacket,netpacksize) #if cmd==102 : # self.process_cmd102_post_pushmsg(netpacket) # pass #else: # print "[server] invalid cmd:",cmd # pass def process_cmd102_post_pushmsg(self,netpacket): data = netpacket[16:] datasize = len(netpacket) -16 msgsize = datasize - 26 #fmtstr = "QQQH%ds"%(msgsize) print "[Thread%d] fmtstr:%s"%(self.Ht,fmtstr) (groupid,memberid,senderid,devicetype,message)=struct.unpack(fmtstr,data) #print "[server] fmtstr unpack:", (groupid,memberid,senderid,devicetype,message) dict1=dict() dict1["groupid"]=groupid dict1["memberid"]=memberid dict1["devicetype"]=devicetype dict1["message"]=message content = urllib.urlencode(dict1) print "[Thread%d] urlencode content:%s"%(self.Ht,content) urllib2.socket.setdefaulttimeout(5) request = urllib2.Request(url, content) response = urllib2.urlopen(request) page = response.read() print "[Thread%d] response:%s"%( self.Ht,page.decode("utf-8") ) pass
def prepareWorkThreadPoll(): print "[server] prepareWorkThreadPoll" for i in range(4): t = HttpRequestThread(globalres.netpackqueue,i) t.setDaemon(True) t.start() print "[server] startthread:",i ''' Second, you must instantiate one of the server classes, passing it the server’s address and the request handler class. Finally, call the handle_request() or serve_forever() method of the server object to process one or many requests. ''' def main(): prepareWorkThreadPoll() server = SocketServer.ThreadingTCPServer(addr,TcpNetHandler) print 'server is running in port %d.'%(port) server.serve_forever() pass
main() #test(buf)
#data=buf #print buf #dict1 = json.loads(data) #print "load dict:",type(dict1),dict1
|