耗时1天,根据公司应用需求,开发一种简易的基础的通信框架,简化系统其它模块在网络通信工作方面的复杂度 简单测试network.py service 做服务器,network.py client做客户端,传送多个消息报文,且能响应sock连接和断开状态 考虑到性能和实际项目对线程需求,故都采用单连接单线程模式,预留select多路复用接口,可见: service.selectIn() 1 # -- coding:utf-8 -- 2 # network.py 3 # bin.zhang@sw2us.com 4 # 2012.3.8 5 # revision: 6 # 7 8 import socket,traceback,os,os.path,sys,time,struct,base64,gzip,array,threading 9 import select 10 11 import message 12 from message import * 13 14 global_compress_type = COMPRESS_ZLIB 15 16 NETMSG_ERROR_MAGIC = 1 17 NETMSG_ERROR_SIZE = 2 18 NETMSG_ERROR_DECOMPRESS =3 19 NETMSG_ERROR_NOTSUPPORTCOMPRESS = 4 20 NETMSG_ERROR_MESSAGEUNMARSHALL = 5 21 22 #NetPacketQueue 处理消息分解 23 class NetPacketQueue: 24 def __init__(self,conn = None,size= 1024): 25 self.size = size 26 self.outs={} 27 self.ins={} 28 self.user=None 29 self.conn = conn 30 self.bf='' 31 self.pktlist=[] #解出来的消息 32 self.mtxptks=threading.Condition() 33 self.invalid = False 34 35 def clearPackets(self): 36 self.mtxptks.acquire() 37 self.pktlist=[] 38 self.mtxptks.release() 39 40 def destroy(self): 41 self.invalid = True 42 self.mtxptks.acquire() 43 self.mtxptks.notify() 44 self.mtxptks.release() 45 46 47 # def getMessage(self): 48 # m = None 49 # if self.invalid: 50 # return m 51 # self.mtxptks.acquire() 52 # if self.pktlist: 53 # m = self.pktlist[0] 54 # del self.pktlist[0] 55 # else: 56 # self.mtxptks.wait(0.5) 57 # if self.pktlist: 58 # m = self.pktlist[0] 59 # del self.pktlist[0] 60 # self.mtxptks.release() 61 # return m 62 63 def getMessageList(self): 64 m =[] 65 self.mtxptks.acquire() 66 m = self.pktlist 67 self.pktlist=[] 68 self.mtxptks.release() 69 return m 70 71 ''' 72 @return: false - 脏数据产生 73 ''' 74 def dataQueueIn(self,d): 75 rc = (True,2) 76 self.bf+=d 77 d = self.bf 78 while True: 79 hdrsize = NetMetaPacket.minSize() 80 #print hdrsize,len(d) 81 if len(d)<NetMetaPacket.minSize(): 82 rc = True,0 #数据不够,等待 83 break 84 magic,size,compress,encrypt,ver = struct.unpack('!IIBBI',d[:hdrsize]) 85 if magic != NetMetaPacket.magic4: 86 return False, NETMSG_ERROR_MAGIC# 87 if size<=10: 88 return False,NETMSG_ERROR_SIZE 89 if len(d)< size+4: 90 rc = True,1 #数据不够 91 break 92 size-=10 93 #print size,compress,encrypt,ver 94 s = d[hdrsize:hdrsize+size] 95 d = d[hdrsize+size:] 96 if compress == message.COMPRESS_ZLIB: 97 try: 98 s = zlib.decompress(s) 99 except: 100 return False,NETMSG_ERROR_DECOMPRESS 101 elif compress != message.COMPRESS_NONE: 102 return False,NETMSG_ERROR_NOTSUPPORTCOMPRESS 103 # restore to NetMetaPacket 104 #MessageBase 105 m = MessageBase.unmarshall(s) 106 if m == None: 107 return False,NETMSG_ERROR_MESSAGEUNMARSHALL 108 self.mtxptks.acquire() 109 self.pktlist.append(m) 110 self.mtxptks.notify() 111 self.mtxptks.release() 112 self.bf = d 113 return rc 114 115 class NetConnThread: 116 def __init__(self,conn,proc=None): 117 self.conn = conn 118 if not proc: 119 proc = self.inner 120 self.thread = threading.Thread(target=proc) 121 self.thread.start() 122 123 def inner(self): 124 while True: 125 try: 126 d = self.conn.recvData() 127 if not d: 128 self.conn.close() 129 break 130 self.conn.eventDataRecv(d) 131 except: 132 self.conn.close() 133 break 134 self.conn.eventDestroyed() 135 print 'NetConnThread Exiting' 136 137 class NetConnectionEvent: 138 EVENT_CONNECTED=1 139 EVENT_DATA=2 140 EVENT_DISCONNECTED=3 141 def __init__(self,type,conn,data=None): 142 self.type = type 143 self.conn = conn 144 self.data = data 145 146 class NetConnection: 147 def __init__(self,sock=None,svc=None,recvfunc=None): 148 self.service =svc 149 self.sock = sock 150 self.delta = None 151 self.recvfunc = recvfunc 152 self.queue = NetPacketQueue(self) 153 154 def getService(self): 155 return self.service 156 157 def getQueue(self): 158 return self.queue 159 160 def peer(self): 161 pass 162 163 def connect(self,dest): 164 self.sock = socket.socket() 165 try: 166 self.sock.connect(dest) 167 except: 168 return False 169 return True 170 ''' 171 def setDataRecvFunc(self,funcRecv): 172 recvfunc = funcRecv 173 ''' 174 def eventDataRecv(self,data): 175 r = False 176 r = self.queue.dataQueueIn(data) 177 if not r: 178 self.close() #数据解码错误,直接关闭连接 179 return 180 msglist = self.queue.getMessageList() 181 if len(msglist) == 0: 182 return 183 #直接将数据抛给service接收处理 184 if self.service: 185 self.service.eventGotMessage( msglist ,self ) #由到这里将消息直接弹射给用户,需要独立的或多个线程做支持 186 #如果是无service的connection对象接收数据需要构建一个thread对象,且在另外线程调用 conn.queue.getMessage()获取消息包 187 if self.recvfunc: 188 evt = NetConnectionEvent(NetConnectionEvent.EVENT_DATA,self,msglist) 189 self.recvfunc(evt) 190 191 def recvData(self,size=1024): 192 return self.sock.recv(size) 193 194 def sendData(self,d): 195 pass 196 197 def sendMessage(self,m): 198 try: 199 d = NetMetaPacket(msg=m,compress=global_compress_type).marshall() 200 self.sock.sendall(d) 201 except: 202 self.close() 203 #traceback.print_exc() 204 return False 205 return True 206 207 def close(self): 208 try: 209 self.sock.close() 210 except:pass 211 212 def eventDestroyed(self): 213 self.queue.destroy() 214 if self.service: 215 self.service.eventConnDisconnected(self) 216 217 if self.recvfunc: 218 evt = NetConnectionEvent(NetConnectionEvent.EVENT_DISCONNECTED,self) 219 self.recvfunc(evt) 220 221 class NetService: 222 def __init__(self,name,addr): 223 self.name = name 224 self.addr = addr 225 226 self.condexit = threading.Condition() 227 self.sock = None 228 self.mtxconns = threading.Lock() 229 self.conns=[] 230 231 232 def eventConnCreated(self,conn): 233 print 'conn created' 234 self.mtxconns.acquire() 235 self.conns.append(conn) 236 self.mtxconns.release() 237 238 def eventConnDisconnected(self,conn): 239 print 'conn disconnected' 240 self.mtxconns.acquire() 241 self.conns.remove(conn) 242 self.mtxconns.release() 243 244 #service模式下接收的消息从这里冒上来 245 # conn - 从哪个连接上接收的数据 246 def eventGotMessage(self,msglist,conn): 247 pass 248 249 #将连接设置为select模式 250 def selectConnIn(self,conn): 251 pass 252 253 def getConnections(self): 254 pass 255 256 def start(self): 257 try: 258 259 self.sock = socket.socket() 260 print 'lll',self.addr 261 self.sock.bind(self.addr) 262 self.sock.listen(5) 263 264 self.thread = threading.Thread(target=self.service_loop) 265 self.thread.start() 266 267 except: 268 traceback.print_exc() 269 return False 270 271 def shutdown(self): 272 self.sock.close() 273 self.mtxconns.acquire() 274 for c in self.conns: 275 c.close() 276 self.mtxconns.release() 277 278 def service_loop(self): 279 print 'service:(%s) thread starting'%self.name 280 while True: 281 fdr = [] 282 fdr.append(self.sock) 283 infds,wr,e = select.select(fdr,[],[]) 284 if e: 285 print 'service thread exit' 286 break 287 for s in infds: 288 if s == self.sock: #新连接到达 289 sock = None 290 try: 291 sock,peer = self.sock.accept() #异常产生表示self.sock被强行关闭 292 except: 293 print 'service:(%s) thread exiting'%self.name 294 return 295 conn = NetConnection(sock,self) 296 sock.delta['conn'] = conn 297 self.eventConnCreated(conn) 298 ## 299 300 #self.condexit.nofity() 301 302 303 class NetworkServer: 304 def __init__(self,name=''): 305 self.name = name 306 self.services={} 307 setattr(socket.socket,'delta',{}) 308 setattr(socket.socket,'conn',None) 309 310 def createService(self,name,addr,port,servicecls=NetService): 311 svc = serviccls(name,addr,port) 312 self.services[name]=svc 313 return svc 314 315 def addService(self,serv): 316 self.services[serv.name]=serv 317 318 319 320 321 def test_packetqueue(): 322 q = NetPacketQueue() 323 for n in range(3*10): 324 d = NetMetaPacket(msg=MsgCallReturn(value=[n],bin='A'*(n+1) ),compress=COMPRESS_ZLIB).marshall() 325 q.dataQueueIn(d) 326 ''' 327 while True: 328 m = q.getMessage() 329 if not m: 330 break 331 print m.attrs,m.bin 332 ''' 333 334 class MyService(NetService): 335 def __init__(self,name,addr): 336 NetService.__init__(self,name,addr) 337 338 def eventConnCreated(self,conn): 339 NetService.eventConnCreated(self,conn) 340 #print 'client connection created!',conn 341 thread = NetConnThread(conn) 342 343 def eventGotMessage(self,msglist,conn): 344 for m in msglist: 345 print m.attrs,m.bin 346 347 test_dest = ('localhost',12004) 348 349 class MyClient: 350 def __init__(self): 351 conn = NetConnection(recvfunc = self.recvEvent) 352 r = conn.connect( test_dest) 353 thread = NetConnThread(conn) 354 for n in range(100): 355 if not conn.sendMessage(MsgCallReturn(value=[n],bin='A'*(n+1) )): 356 print 'serivce lost abord!!' 357 break 358 time.sleep(1) 359 360 def recvEvent(self,evt): 361 evt.conn == 1 362 if evt.type == NetConnectionEvent.EVENT_DATA: 363 for m in evt.data: 364 print m.attrs,m.bin 365 if evt.type == NetConnectionEvent.EVENT_DISCONNECTED: 366 print 'connection lost!' 367 368 def test_service(): 369 server = NetworkServer('test-server') 370 svc =MyService('fileserver',test_dest) 371 #svc = server.createService("filesync-server",'localhost',12001,MyService) 372 server.addService(svc) 373 svc.start() 374 #time.sleep(5) 375 #svc.shutdown() 376 377 def test_client(): 378 MyClient() 379 380 if __name__=='__main__': 381 #test_packetqueue() 382 p = sys.argv[1] 383 if p=='client': 384 test_client() 385 sys.exit(0) 386 if p=='service': 387 test_service() 388 389 time.sleep(100)
|