|
ws_handler 1 # coding: utf-8 2 import struct 3 import time 4 import weakref 5 from hashlib import md5 6 7 from tornado.websocket import WebSocketHandler, WebSocketClosedError 8 9 from core.player_mgr import player_mgr 10 from settings import crypt, update_header, sign 11 from core.session_mgr import session_mgr 12 from core.logger import logger 13 from core.blacklist import blacklist 14 15 16 # noinspection PyAbstractClass 17 class WSHandler(WebSocketHandler): 18 19 uuid = None 20 app_id = None 21 game_id = None 22 ver = None 23 timestamp = None 24 key = None 25 last_time = 0 26 stick_package_stack = None 27 token = None 28 player = None 29 receive_id = 0 30 send_id = 0 31 32 def bind(self, player): 33 if player.session: 34 player.session.close(2) 35 player.logger("player {0} cancel old session".format(self.uuid)) 36 self.player = weakref.proxy(player) 37 self.player.session = self 38 player_mgr.add_player(player) 39 40 def check_origin(self, origin): 41 return True 42 43 @staticmethod 44 def str2bytes(string): 45 return bytes(string.encode("utf8")) 46 47 def xor(self, string): 48 if not crypt: 49 return string 50 value = bytearray() 51 step = len(self.token) 52 for i in range(0, len(string), step): 53 for j in range(step): 54 if i + j >= len(string): 55 break 56 value.append(string[i + j] ^ self.token[j]) 57 return bytes(value) 58 59 def encrypt(self, ostream): 60 return self.xor(ostream) 61 62 def decrypt(self, istream): 63 return self.xor(istream) 64 65 def verify_args(self): 66 try: 67 #self.app_id = int(self.get_argument("app_id")) 68 #self.game_id = int(self.get_argument("game_id")) 69 # self.uuid = self.get_argument("uuid") 70 # self.ver = [int(i) for i in self.get_argument("version").split(".")] 71 # self.timestamp = int(self.get_argument("timestamp")) 72 self.key = self.get_argument("key") 73 except Exception as e: 74 logger.error("session argument type incorrect {0} {1}".format(self.request.uri, e)) 75 return False 76 try: 77 assert self.key #self.app_id and self.game_id and self.uuid and self.ver and self.timestamp and self.key 78 return True 79 except AssertionError: 80 logger.error("session miss arguments {0}".format(self.request.uri)) 81 return False 82 83 def auth(self): 84 uri = self.request.uri 85 last_index = len(uri)#uri.rindex("&") 86 body = uri[uri.index("?")+1: last_index] 87 key = uri[last_index+5:] 88 self.timestamp = 0 89 t = abs((time.time() * 1000 - self.timestamp) / 1000) 90 if t > 30 and self.timestamp: 91 logger.info("session close: auth failed, timestamp expired {0}".format(t)) 92 blacklist(self.request.host_name, self.request.remote_ip, 2) 93 # return False 94 md5_key = md5((body+"52LYD50YDs57jmy52Bc49").encode("utf-8")).hexdigest() 95 # print("key", key, "md5", md5_key) 96 #if key == md5_key: 97 if uri.find(md5_key): 98 return True 99 else: 100 logger.info("session close: auth failed, key incorrect") 101 blacklist(self.request.host_name, self.request.remote_ip, 3) 102 return False 103 104 def open(self, *args, **kwargs): 105 if not self.verify_args(): 106 blacklist(self.request.host_name, self.request.remote_ip, 1) 107 self.close(8) 108 return 109 if not self.auth(): 110 self.close(7) 111 return 112 session_mgr.add(self) 113 logger.debug("proxy {0} remote ip {1}".format(self.request.host_name, self.request.remote_ip)) 114 115 def close(self, code=None, reason=None): 116 #主动断开 117 super().close(code, reason) 118 if self.player: 119 self.player.session = None 120 session_mgr.delete(self) 121 self.player.online_state = 0 122 123 def on_close(self): 124 #收到客户端断开连接触发事件 125 session_mgr.delete(self) 126 logger.debug("session {0} close {1}".format(self.uuid, self.close_code)) 127 try: 128 if self.player and self.player.session: 129 self.player.online_state = 0 130 self.player.offline() 131 self.player.session = None 132 self.player = None 133 except (ReferenceError, AttributeError): 134 pass 135 if isinstance(self.close_code, int) and self.close_code >= 1000: 136 logger.warn("{0} close code {1} bad".format(self.uuid, self.close_code)) 137 138 def unpack_header(self, raw): 139 from protocol.deserialize import parse 140 if update_header: 141 size, seq, app_id, game_id, t, v1, v2, v3, sign1, sign2, sign3, sign4, sign5, sign6, sign7, sign8, sign9, cmd, = struct.unpack('>iihhibbbbbbbbbbbbb', raw[:29]) 142 string, = struct.unpack('>{0}s'.format(size - 29), raw[29:size]) 143 # print("receive_id", seq, self.receive_id) 144 client_sign = chr(sign1) + chr(sign2) + chr(sign3) + chr(sign4) + chr(sign5) + chr(sign6) + chr(sign7) + chr(sign8) + chr(sign9) 145 if seq == self.receive_id and client_sign == sign: 146 self.receive_id += 1 147 parse(cmd, string, self) 148 else: 149 self.close(3) 150 else: 151 size, = struct.unpack('>H',raw[0:2]) 152 cmd, = struct.unpack('>H', raw[2:4]) 153 string, = struct.unpack('>{0}s'.format(size - 4), raw[4:size]) 154 155 # size, seq, self.game_id, cmd, = struct.unpack('>ihh', raw[:0]) 156 # string, = struct.unpack('>{0}s'.format(size - 1), raw[1:size]) 157 # size = len(raw) 158 # cmd = int(raw[0])#struct.unpack('>ihh', raw[:0]) 159 # string = raw[2:size]#struct.unpack('>{0}s'.format(size - 1), raw[1:size]) 160 161 #协议处理 162 parse(cmd, string, self) 163 164 def send(self, cmd, proto, serialize=True): 165 if serialize: 166 result = proto.SerializeToString() 167 else: 168 result = proto 169 result = self.encrypt(result) 170 if update_header: 171 fmt = ">iihhibbbb{0}s".format(len(result)) 172 args = (len(result) + 20, self.send_id, self.app_id, self.game_id, int(time.time()), *self.ver, cmd, result) 173 #else: 174 # fmt = ">ii{0}s".format(len(result)) 175 # args = (len(result) + 8, cmd, result) 176 fmt = ">HH{0}s".format(len(result)) 177 args = (len(result), cmd,result) 178 data = struct.pack(fmt, *args) 179 # data = struct.pack('>BB{0}s',len(result),cmd,len(result),result) 180 try: 181 self.write_message(data, True) 182 if len(data) > 1024: 183 logger.debug("{0} send message size {1}".format(self.uuid, len(data))) 184 except WebSocketClosedError: 185 pass 186 self.send_id += 1 187 188 def on_message(self, buf): 189 self.last_time = time.time() 190 """解析数据流(半包,全包,粘包),并将命令和协议数据解开""" 191 if isinstance(buf, str): 192 buf = buf.encode(encoding='utf-8') 193 self.unpack_header(buf) 194 return 195 #buf = buf.encode(encoding='utf-8') 196 #self.unpack_header(buf) 197 198 try: 199 if len(buf) > 1024: 200 logger.debug("{0} receive message size {1}".format(self.uuid, len(buf))) 201 if self.stick_package_stack: 202 buf = self.stick_package_stack + buf 203 self.stick_package_stack = None 204 #size, = struct.unpack('>i', buf[:4]) 205 size, = struct.unpack('>H',buf[0:2]) 206 #size = buf[0] 207 raw_size = len(buf) 208 if raw_size > size: # 至少有一个包 209 self.unpack_header(buf) 210 rest = buf[size:] 211 self.on_message(rest) 212 elif size > raw_size: # 半包 213 self.stick_package_stack = buf 214 logger.warn("{0} stick package {1}".format(self.uuid, len(buf))) 215 else: 216 self.unpack_header(buf) 217 except Exception as e: 218 logger.exception("message parse failed {0}".format(str(e))) 219 封装微博socket操作,包括解包、发包等,这里采用谷歌protobuf协议格式,具体的解包、组包格式在另外一篇中介绍
|