dvr项目中flex驻留在浏览器,而影响播放程序是独立的进程,sandbox的安全问题导致flex的代码无法直接与播放进程IPC通信,那只有通过公网IP的主机进行桥接。 很多年以前用过foundstone系列的工具,也有socket转向的功能,包括在5173时做过lsp的底层转向软件(仿sockcap),原理当然是相当简单,python是首选工具。 代码接收两端建立socket进来,并根据相同的id号来进行socket配对,之后两个socket之间就实现互相转发(技术同之前写的http代理服务器 ) 1 # -- coding:utf-8 -- 2 3 import socket,traceback,os,os.path,sys,time,struct,base64,gzip,array,threading 4 import select,json 5 6 7 ''' 8 {'id','type'} 9 10 type - 'mapshow','imageplay' 11 id - 一次会话的编号 12 13 imageplay 与xbridge建立socket连接,并注册一个会话编号(随机产生) 14 imageplay启动mapshow,并将会话编号传递给mapshow,mapshow建立xbridge的连接,并提交会话编号 15 xbridge将双向传递相同会话编号的数据到对方 16 17 sock1的客户必须等sock2连接进入之后发送数据,否则将sock1数据转发给sock2时将产生异常 18 ''' 19 20 class ConnectionPair: 21 def __init__(self,app): 22 self.app = app 23 self.id = '' 24 self.sock1=None #imageplay上来的连接 25 self.sock2=None #第二个连接上来的对象mapdemo 26 27 def start(self): 28 t = threading.Thread(target=self.threadRecv) 29 t.start() 30 31 def onLostConnection(self): 32 try: 33 print 'connection pair lost..' 34 self.sock1.close() 35 self.sock2.close() 36 self.app.onConnectionPairBroken(self) 37 except: 38 traceback.print_exc() 39 40 def threadRecv(self): 41 print 'service threading entering' 42 import select 43 while True: 44 fds = [] 45 if self.sock1: 46 fds.append(self.sock1) 47 if self.sock2: 48 fds.append(self.sock2) 49 #fds = [self.sock1,self.sock2] 50 try: 51 #sock2未连接进来前,将不接收sock1上产生数据 52 #print 'fds:',len(fds),fds 53 rds,wds,eds = select.select(fds,[],[],1) 54 if not rds:#timeout 55 continue 56 57 for s in rds: 58 d = s.recv(1024) 59 #print d 60 if not d: 61 raise 'any jump' 62 63 to = self.sock2 64 if s == self.sock2: 65 to = self.sock1 66 #print 'redirect data:',d 67 to.sendall(d) 68 except: 69 traceback.print_exc() 70 self.onLostConnection() 71 break 72 73 print 'ConnThread Exiting' 74 75 76 77 78 class XBridge: 79 def __init__(self,addr=('',12788)): 80 self.sock = None 81 self.addr = addr 82 self.conns={} #{id} 83 self.mtxconns = threading.Lock() 84 85 def onConnectionPairBroken(self,cp): 86 self.mtxconns.acquire() 87 del self.conns[cp.id] 88 print 'onConnectionPairBroken(),removed:',cp.id 89 self.mtxconns.release() 90 91 def start(self): 92 try: 93 94 self.sock = socket.socket() 95 #print 'lll',self.addr 96 self.sock.bind( tuple(self.addr) ) 97 self.sock.listen(5) 98 99 self.thread = threading.Thread(target=self.service_loop) 100 self.thread.start() 101 print 'xbridge started!' 102 self.thread.join() 103 except: 104 traceback.print_exc() 105 return False 106 107 def shutdown(self): 108 self.sock.close() 109 110 111 def service_loop(self): 112 113 while True: 114 fdr = [] 115 fdr.append(self.sock) 116 infds,wr,e = select.select(fdr,[],[]) 117 if e: 118 print 'service thread exit' 119 break 120 for s in infds: 121 if s == self.sock: #新连接到达 122 sock = None 123 try: 124 sock,peer = self.sock.accept() #异常产生表示self.sock被强行关闭 125 print 'new client incoming',peer 126 except: 127 return 128 thread = threading.Thread(target=self.threadNewClient,args=(sock,)) 129 thread.start() 130 131 def threadNewClient(self,sock): 132 #等待注册信息进入 ,5 秒超时 133 try: 134 fdr = [sock,] 135 print 'enter select ' 136 infds,wr,e = select.select(fdr,[],[],5) 137 138 if not infds: 139 sock.close() 140 print 'client register timeout' 141 return #接收超时 142 d = sock.recv(1024) 143 d = json.loads(d) 144 id,type = d['id'],d['type'] 145 connpair = None 146 print id,type 147 self.mtxconns.acquire() 148 if type == 'imageplay': 149 150 cp = ConnectionPair(self) 151 cp.id = id 152 cp.sock1 = sock 153 self.conns[id] = cp 154 cp.start() 155 156 elif type =='mapshow': 157 connpair = self.conns.get(id,None) 158 if connpair == None: #没找到imageplay 159 sock.close() 160 print 'mapshow cannt found imageplay..' 161 else: 162 print 'mapclient matched!' 163 connpair.sock2 = sock 164 else: 165 print 'unknown command id:',id,type 166 167 self.mtxconns.release() 168 except: 169 sock.close() 170 traceback.print_exc() 171 172 if __name__=='__main__': 173 XBridge().start() #default '',12788 174
|