Benjamin

静以修身,俭以养德,非澹薄无以明志,非宁静无以致远。
随笔 - 397, 文章 - 0, 评论 - 196, 引用 - 0
数据加载中……

python websocket使用


  1 # coding: utf-8
  2 import struct
  3 import time
  4 import weakref
  5 from hashlib import md5
  6 
  7 from tornado.websocket import WebSocketHandler, WebSocketClosedError
  8 
  9 from core.player_mgr import player_mgr
 10 from settings import crypt, update_header, sign
 11 from core.session_mgr import session_mgr
 12 from core.logger import logger
 13 from core.blacklist import blacklist
 14 
 15 
 16 # noinspection PyAbstractClass
 17 class WSHandler(WebSocketHandler):
 18 
 19     uuid = None
 20     app_id = None
 21     game_id = None
 22     ver = None
 23     timestamp = None
 24     key = None
 25     last_time = 0
 26     stick_package_stack = None
 27     token = None
 28     player = None
 29     receive_id = 0
 30     send_id = 0
 31 
 32     def bind(self, player):
 33         if player.session:
 34             player.session.close(2)
 35             player.logger("player {0} cancel old session".format(self.uuid))
 36         self.player = weakref.proxy(player)
 37         self.player.session = self
 38         player_mgr.add_player(player)
 39 
 40     def check_origin(self, origin):
 41         return True
 42 
 43     @staticmethod
 44     def str2bytes(string):
 45         return bytes(string.encode("utf8"))
 46 
 47     def xor(self, string):
 48         if not crypt:
 49             return string
 50         value = bytearray()
 51         step = len(self.token)
 52         for i in range(0, len(string), step):
 53             for j in range(step):
 54                 if i + j >= len(string):
 55                     break
 56                 value.append(string[i + j] ^ self.token[j])
 57         return bytes(value)
 58 
 59     def encrypt(self, ostream):
 60         return self.xor(ostream)
 61 
 62     def decrypt(self, istream):
 63         return self.xor(istream)
 64 
 65     def verify_args(self):
 66         try:
 67             #self.app_id = int(self.get_argument("app_id"))
 68             #self.game_id = int(self.get_argument("game_id"))
 69             # self.uuid = self.get_argument("uuid")
 70             # self.ver = [int(i) for i in self.get_argument("version").split(".")]
 71             # self.timestamp = int(self.get_argument("timestamp"))
 72             self.key = self.get_argument("key")
 73         except Exception as e:
 74             logger.error("session argument type incorrect {0} {1}".format(self.request.uri, e))
 75             return False
 76         try:
 77             assert self.key #self.app_id and self.game_id and self.uuid and self.ver and self.timestamp and self.key
 78             return True
 79         except AssertionError:
 80             logger.error("session miss arguments {0}".format(self.request.uri))
 81             return False
 82 
 83     def auth(self):
 84         uri = self.request.uri
 85         last_index = len(uri)#uri.rindex("&")
 86         body = uri[uri.index("?")+1: last_index]
 87         key = uri[last_index+5:]
 88         self.timestamp = 0
 89         t = abs((time.time() * 1000 - self.timestamp) / 1000)
 90         if t > 30 and self.timestamp:
 91             logger.info("session close: auth failed, timestamp expired {0}".format(t))
 92             blacklist(self.request.host_name, self.request.remote_ip, 2)
 93             # return False
 94         md5_key = md5((body+"52LYD50YDs57jmy52Bc49").encode("utf-8")).hexdigest()
 95         # print("key", key, "md5", md5_key)
 96         #if key == md5_key:
 97         if uri.find(md5_key):
 98             return True
 99         else:
100             logger.info("session close: auth failed, key incorrect")
101             blacklist(self.request.host_name, self.request.remote_ip, 3)
102             return False
103 
104     def open(self, *args, **kwargs):
105         if not self.verify_args():
106             blacklist(self.request.host_name, self.request.remote_ip, 1)
107             self.close(8)
108             return
109         if not self.auth():
110             self.close(7)
111             return
112         session_mgr.add(self)
113         logger.debug("proxy {0} remote ip {1}".format(self.request.host_name, self.request.remote_ip))
114 
115     def close(self, code=None, reason=None):
116         #主动断开
117         super().close(code, reason)
118         if self.player:
119             self.player.session = None
120             session_mgr.delete(self)
121             self.player.online_state = 0
122 
123     def on_close(self):
124         #收到客户端断开连接触发事件
125         session_mgr.delete(self)
126         logger.debug("session {0} close {1}".format(self.uuid, self.close_code))
127         try:
128             if self.player and self.player.session:
129                 self.player.online_state = 0
130                 self.player.offline()
131                 self.player.session = None
132                 self.player = None
133         except (ReferenceError, AttributeError):
134             pass
135         if isinstance(self.close_code, int) and self.close_code >= 1000:
136             logger.warn("{0} close code {1} bad".format(self.uuid, self.close_code))
137 
138     def unpack_header(self, raw):
139         from protocol.deserialize import parse
140         if update_header:
141             size, seq, app_id, game_id, t, v1, v2, v3, sign1, sign2, sign3, sign4, sign5, sign6, sign7, sign8, sign9, cmd, = struct.unpack('>iihhibbbbbbbbbbbbb', raw[:29])
142             string, = struct.unpack('>{0}s'.format(size - 29), raw[29:size])
143             # print("receive_id", seq, self.receive_id)
144             client_sign = chr(sign1) + chr(sign2) + chr(sign3) + chr(sign4) + chr(sign5) + chr(sign6) + chr(sign7) + chr(sign8) + chr(sign9)
145             if seq == self.receive_id and client_sign == sign:
146                 self.receive_id += 1
147                 parse(cmd, string, self)
148             else:
149                 self.close(3)
150         else:
151             size, = struct.unpack('>H',raw[0:2])
152             cmd, = struct.unpack('>H', raw[2:4])
153             string, = struct.unpack('>{0}s'.format(size - 4), raw[4:size])
154 
155             # size, seq, self.game_id, cmd, = struct.unpack('>ihh', raw[:0])
156             # string, = struct.unpack('>{0}s'.format(size - 1), raw[1:size])
157             # size = len(raw)
158             # cmd = int(raw[0])#struct.unpack('>ihh', raw[:0])
159             # string = raw[2:size]#struct.unpack('>{0}s'.format(size - 1), raw[1:size])
160 
161             #协议处理
162             parse(cmd, string, self)
163 
164     def send(self, cmd, proto, serialize=True):
165         if serialize:
166             result = proto.SerializeToString()
167         else:
168             result = proto
169         result = self.encrypt(result)
170         if update_header:
171             fmt = ">iihhibbbb{0}s".format(len(result))
172             args = (len(result) + 20, self.send_id, self.app_id, self.game_id, int(time.time()), *self.ver, cmd, result)
173         #else:
174             # fmt = ">ii{0}s".format(len(result))
175             # args = (len(result) + 8, cmd, result)
176         fmt = ">HH{0}s".format(len(result))
177         args = (len(result), cmd,result)
178         data = struct.pack(fmt, *args)
179         # data = struct.pack('>BB{0}s',len(result),cmd,len(result),result)
180         try:
181             self.write_message(data, True)
182             if len(data) > 1024:
183                 logger.debug("{0} send message size {1}".format(self.uuid, len(data)))
184         except WebSocketClosedError:
185             pass
186         self.send_id += 1
187 
188     def on_message(self, buf):
189         self.last_time = time.time()
190         """解析数据流(半包,全包,粘包),并将命令和协议数据解开"""
191         if isinstance(buf, str):
192             buf = buf.encode(encoding='utf-8')
193             self.unpack_header(buf)
194             return
195             #buf = buf.encode(encoding='utf-8')
196             #self.unpack_header(buf)
197 
198         try:
199             if len(buf) > 1024:
200                 logger.debug("{0} receive message size {1}".format(self.uuid, len(buf)))
201             if self.stick_package_stack:
202                 buf = self.stick_package_stack + buf
203                 self.stick_package_stack = None
204             #size, = struct.unpack('>i', buf[:4])
205             size, = struct.unpack('>H',buf[0:2])
206             #size = buf[0]
207             raw_size = len(buf)
208             if raw_size > size:  # 至少有一个包
209                 self.unpack_header(buf)
210                 rest = buf[size:]
211                 self.on_message(rest)
212             elif size > raw_size:  # 半包
213                 self.stick_package_stack = buf
214                 logger.warn("{0} stick package {1}".format(self.uuid, len(buf)))
215             else:
216                 self.unpack_header(buf)
217         except Exception as e:
218             logger.exception("message parse failed {0}".format(str(e)))
219 
封装微博socket操作,包括解包、发包等,这里采用谷歌protobuf协议格式,具体的解包、组包格式在另外一篇中介绍

posted on 2019-09-21 17:31 Benjamin 阅读(778) 评论(0)  编辑 收藏 引用 所属分类: python


只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理