#
系统中,用户的消息在移动设备与接入服务器建立的Tcp长连接上传递。这些消息包括:文本,复合文本,位置信息,音频剪辑,图像等等。 发送者传送消息到平台系统内部并将消息写入gridfs,待接收者上线时平台将消息推送至接收者。 考虑到带宽利用,接收者得到的消息将不包含二进制数据,例如: 音频,图像等等。 这要求接收者对平台发起一次获取消息包内指定的音频和图像数据的请求。 移动端向平台请求二进制数据的情况还包含 【离线文件传送】场景 。 二进制数据往往是指那些数据量比较大的对象,这些对象在移动两端交换时,交互通道将不占用与接入服务器的连接通道,而是通过nginx传送到平台内部; 同样接收者获取二进制数据也是通过nginx获取。这种请求是HTTP的。 这里整理的是如何在平台部署 【负载均衡的集群的分布式的文件服务】 nginx : http服务,提供反向代理和负载均衡服务(集群可用DNS或考虑LVS方案) mongodb+gridfs : 用于文件服务提供,其内置gridfs提供了分布式,海量存储的方案 gevent+webpy : nginx直接读取gridfs是不合适的,配置了cgi才能完成特定功能,这里使用webpy,比django更轻更好用。 webpy的作用是接收到上传和下传文件的请求,读写gridfs文件内容给移动端。 gevent是高效的通信框架,虽然单线程工作,但性能非常的好; 用好gevent关键在与外部的io必须全部都是异步的,例如: 数据库,文件磁盘访问等等。 mongodb对gevent已经支持,gevent对webpy,django,psycopg2支持也相当的好,所以要提供webservice服务那就考虑用gevent+webpy或django把,性能是杠杠的,比 apache+mod_wsgi要好很多 ,而且gevent是进程内的不同的HTTP REQUEST可以是共享数据的,这一点非常诱惑(apache+mod_wsgi的REQUEST可是隔离的哦!除非您通过redis的PUB/SUB实现两个REQUEST的通信) 关注的问题: 1.下传大文件时的处理 如果直接用nginx当然没有这个问题 ,但用webpy读取文件返回HttpResponse时问题来了,总不至于读取整个文件,然后再return。 这种方式在php有flush方法,python只能用yield来做 2.上传大文件时的处理 当接收到http的文件POST请求时,文件已经全部缓存到web服务器,如果同时几千个文件上传在进行,服务器就会被挤爆,这也是很多网站不允许大文件上传的缘故吧。关于这个问题,我想就需要修改一下webpy关于文件上传的处理代码了,将接收到的文件数据以流的形式写入到gridfs里去作为临时文件被缓存,等完全接收文件时,才通知到handler代码,这样必定高效很多(新的问题又来了,会不会把gridfs搞爆掉! 处理时考虑延时缓存提交gridfs把)。 BUF_SIZE = 262144 class download: def GET(self): file_name = 'file_name' file_path = os.path.join('file_path', file_name) f = None try: f = open(file_path, "rb") webpy.header('Content-Type','application/octet-stream') webpy.header('Content-disposition', 'attachment; filename=%s.dat' % file_name) while True: c = f.read(BUF_SIZE) if c: yield c else: break except Exception, e: print e yield 'Error' finally: if f: f.close()
links: http://api.mongodb.org/python http://webpy.org/cookbook/storeupload.zh-cn http://webpy.org/cookbook/streaming_large_files http://gevent.org 下份代码 demo很值得看哦 gevent 1.0 由libev 替换了libevent
摘要: 贴代码 Code highlighting produced by Actipro CodeHighlighter (freeware)http://www.CodeHighlighter.com/--> 1 #--coding:utf-8-- 2 3 ... 阅读全文
多样的文本消息 ----------------- struct MimeText_t{ int type; string text; }; MimeText_t 可以包含普通的文本、图像和音频文件的id 图像和音频数据发送到服务器,服务器并不直接将数据发送到接收者,而是发送 音频和图像的描述uri信息 接收者解释json,显示text文本,读取emoticon编号,显示表情图片; image,audio则显示占位(如果当前wifi可用,则自己自动加载image和audio资源) ,如果非wifi信号则待用户点击此占位,然后从服务器请求image和audio资源到本地。 文本描述: 字体大小,颜色,文本link,表情符号 文本用json组织 , { set:[ text:{text:'this is',bg-color:#ff0000,color:#ffffff,font-name:'arial',font-size:20,bold:true,italic:true}, text:{text:'shanghai',color:#ff0000,font-name:'arial',font-size:20,bold:true,italic:true,link:'http://sw2us.com/images/shanghai.png'}, image:{id:1001,width:200,height:200,uri:'http://sw2us.com/images/bear.png'}, audio:{id:2001,duration:5,uri:'http://sw2us.com/clips/a001.mp3'}, location:{lon:121.221,lat,time,speed,direction,text:'立月路2001号浦星公路口'}, emoticon:{id:201} ], } 属性名简化: --------------------- ----------------------- 1 - text [ 1: text , 2: bg-color , 3: color , 4: font-name, 5:font-size, 6:bold, 7:italic ] 2 - image [ 1: id , 2:width , 3:height , 4:uri] 3 - audio [ 1:id , 2:duration,3:uri] 4 - location [ 1:lon, 2:lat, 3:time, 4:speed, 5:direction, 6:text] 5 - emoticon [ 1: id ] ----------------------- 0 - false 1 - true
接口定义: 1 interface IAuthServer{ 2 CallReturn_t userAuth(string user,string passwd,int device_type); 3 CallReturn_t registerUser(UserRegisterInfo_t reginfo); // tested 4 }; 定义认证服务器接口,userAuth()返回认证用户的token 接口服务实现: 1 import os,os.path,sys,struct,time,traceback,signal,threading,copy,base64 2 import datetime,base64 3 4 from datetime import datetime 5 from base import * 6 import tcelib as tce 7 from showbox import * 8 import utils.misc 9 import utils.config 10 import utils.cipher 11 12 13 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "database.showbox.showbox.settings") 14 15 from django.db import connection 16 from django.db.models import Sum 17 from django.db import transaction 18 # import sns.core.models as cm 19 import database.showbox.core.models as core 20 import database.showbox.nosql.models as nosql 21 22 class AuthServerImpl(IAuthServer): 23 def __init__(self,app): 24 IAuthServer.__init__(self) 25 self.app = app 26 27 def userAuth(self, user, passwd, device_type, ctx): 28 cr = CallReturn_t() 29 try: 30 r = core.User.objects.get(user=user,passwd=passwd) 31 userinfo = { 32 "id":r.id, 33 "user":user, 34 "name":r.name, 35 "login_time":int(time.time()), 36 "user_type":SnsConsts.Authorized_User 37 } 38 token = utils.cipher.encryptToken(userinfo) 39 cr.value = token 40 except: 41 print traceback.format_exc() 42 cr = CallReturn_Error() 43 return cr 44 45 def registerUser(self, reginfo, ctx): 46 return IAuthServer.registerUser(self, reginfo, ctx) 47 48 49 50 class ServerApp: 51 def __init__(self): 52 pass 53 54 def getConfig(self): 55 #return self.app.getConfig() 56 pass 57 58 _handle = None 59 @classmethod 60 def instance(cls): 61 if cls._handle == None: 62 cls._handle = cls() 63 return cls._handle 64 65 def run(self): 66 tce.RpcCommunicator.instance().init('authserver').initMessageRoute('./services.xml') 67 server = tce.RpcCommunicator.instance().currentServer().findEndPointByName('mq_authserver').impl 68 adapter = tce.RpcAdapterEasyMQ.create('server',server) 69 #没有主动发送消息的情形 70 servant = AuthServerImpl(self) 71 adapter.addServant(servant) 72 tce.RpcCommunicator.instance().waitForShutdown() 73 74 if __name__ == '__main__': 75 ServerApp.instance().run() 服务器很简单,实现接口IAuthService的功能函数,定义一个ServerApp,然后运行 客户调用测试: 1 from datetime import datetime 2 from base import * 3 import tcelib as tce 4 from showbox import * 5 import utils.misc 6 import utils.config 7 import utils.cipher 8 9 10 def userAuthResult(result,prx): 11 print result 12 13 # queue:client 必须在调用服务器的write 队列mq 14 communicator =tce.RpcCommunicator.instance().init() 15 conn = tce.RpcConnectionEasyMQ.create('127.0.0.1',12301,'queue:mq_authserver') 16 local = tce.RpcConnectionEasyMQ.create('127.0.0.1',12301,'queue:mq_test_client',tce.AF_READ) 17 conn.setLoopbackMQ(local) 18 19 20 prx = IAuthServerPrx(conn) 21 prx.userAuth_async('test','111111',1,userAuthResult) #异步调用 22 print prx.userAuth('test','111111',1) #同步调用 21,22行分别测试两种调用模式 client与server通过EasyMQ进行传递 easyMQ是个最简单的消息队列实现
在tce构架的平台系统中,采集的用户位置gps信息从网关gatewayserver接收并通过mq_gps消息队列存储到多个位置单元服务器 LocationUnit, 系统中存在若干个LocationServer提供查询功能,当一次位置查询时,LocationServer对集群的LocationUnit进行Map Reduce计算
idl的保留关键字:'byte','bool','short','int','long','float','double','string' ,均不能用于定义module,class,interface和变量名称 定义的变量名称如果包含以下单词:'def','import','from','type','str','int','float','class' , tce生成python代码时自动给添加'_'后缀,比如: struct xx{ string name; string from; } xx结构的from变量名将生成from_ 接口定义: module test{ dictionary<string,string> Properties_t; sequence<string> IpAddressList_t; interface ITerminal{ void onGetServerMessage(string text); } interface Server{ IpAddressList_t getIpAddresses(); Properties_t getProperties(); void ping(string fromhost); string login(string user,string passwd,ctx); }; } struct: tce将结构struct映射为class对象 ,初始化成员变量并创建散列函数 marshall/unmarshall sequence<T>: tce将数组类型直接映射为[] 例如 : dictionary<K,V> tce将字典映射为 {} python实现Server接口的getIpAddresses()方法: def getIpAddresses(): return ['192.168.14.101','192.168.12.50'] 定义服务器接口实现: tce为interface生成接口基类: class Server 我们提供一个实现类 : class ServerImpl(Server): def __init__(self): Server.__init__(self) def getIpAddresses(self,ctx): return [] 在这里我们提供了ServerImpl类,然后编写实现函数getIpAddresses. 每个接口函数都携带ctx参数,ctx携带rpc请求的附属信息,比如: 外带数据(dict),底部的连接对象 等等 。 服务接口被称为一个服务类servant ,接下来演示如何将这个servant装配并提供客户。 tce.RpcCommunicator.instance().init() ep = tce.RpcEndPoint(host='127.0.0.1',port=16005) 定义一个通信端点 adapter = tce.RpcCommunicator.instance().createAdapter('first_server',ep) 创建一个通信适配器 servant = ServerImpl() 创建服务接口对象 adapter.addServant(servant) 添加进适配器 tce.RpcCommunicator.instance().waitForShutdown() 进入通信循环 调用服务: tce.RpcCommunicator.instance().init() prx = test.ServerProxy.create(127.0.0.1,16005) ips = prx.getIpAddresses() 多种呼叫模式: tce将接口函数自动生成 normal,oneway,async三种调用接口方法 ,rpc调用出现异常,底部将抛出异常,所以用户需要异常捕获。 1.normal: 原型: fun_name(参数..,timeout=0,extra=None) 调用函数自动添加timeout,extra参数。timeout默认为0,将自动采用tce默认的30s等待调用返回时间; extra 指此次调用携带的附属数据,extra ={'name':'scott','age':100} extra数据在服务端接口函数的ctx中获取: ctx.msg.extra 函数调用时将阻塞客户线程,直到timeout超时或者服务器数据返回 2. oneway fun_name_oneway(参数...,extra=None) 只有类型void的接口函数才会生成oneway调用方法.oneway调用不会阻塞用户线程,通常用于单向传输的场景,例如 Server接口的ping()函数 3. async fun_name_async(参数,async_callback,extra=None) 异步调用模式不会阻塞客户线程,async_callback指定了rpc调用的返回接收函数 接收函数原型: void fun_name_CallBack(result,proxy) 例如: def getIpAddressesResult(result,proxy): print result #result - IpAddressList_t prx.getIpAddresses_async(getIpAddressesResult) *连接复用 在互联网应用场景,服务器将接入大量的客户端设备,客户端是不能被寻址,所以服务器要完成推送消息给客户端,必须在客户端建立的连接上反向传输。 tce使这个工作变得相当简单: 1. 客户端定义接收消息的接口 ITerminal,定义接收函数onGetServerMessage() class TermnialImpl(ITerminal): ... 2. 创建到服务器的连接代理 tce.RpcCommunicator.instance().init() prx = test.ServerProxy.create(127.0.0.1,16005) 创建但并不马上连接 3. 添加服务类实现 adapter = tce.RpcCommAdapter('adapter') impl = TerminalImpl() adapter.addConnection(prx.conn) adapter.addServant(impl) 加到通信器对象 3. 请求一次调用 prx.login('scott','1234') 4. 服务器端反向调用ITerminal的onGetServerMessage() def login(self,user,passwd,ctx): prx = ITerminalProxy(ctx.conn) prx.onGetServerMessage('server message..') 完成一次对设备端的接口调用
同样在函数中连接pgsql,然后执行500次查询, 测试gevent模式、串行查询、多线程查询 数据如下: multithread_test cost time: 2.45199990273 normal_test cost time: 4.04299998283 gevent_test cost time: 2.12800002098 结果 串行最慢4.4s, 多线程 2.45s ,gevent最快2.12 ,yes! 测试代码: 1 import gevent 2 import gevent.queue 3 4 import psycopg2 5 import psycopg2.extensions 6 7 import psycogreen.gevent 8 9 psycogreen.gevent.patch_psycopg() 10 11 sys.path.insert(0,'../') 12 13 import easymq 14 15 ''' 16 在同一线程中,同一个连接conn上两次创建的cur将会是一样滴,因为是异步wait_read()缘故 17 所以要么每次创建数据库连接,要么使用dbpool 18 ''' 19 20 21 def readThread(): 22 conn = psycopg2.connect(database='postgres',user='postgres',password='111111') 23 24 # cur = conn.cursor(cursor_factory=psycopg2.extensions.DictCursor) 25 cur = conn.cursor(cursor_factory=psycopg2.extensions.cursor) 26 27 # cur.execute("select pg_sleep(%s)", (2,)) 28 for n in range(10): 29 cur.execute("select CURRENT_DATE") 30 # print cur.fetchone() 31 # print 'read end..' 32 conn = None 33 34 35 def gevent_test(): 36 jobs=[] 37 for n in range(100): 38 jobs.append(gevent.spawn(readThread)) 39 gevent.joinall(jobs) 40 41 def normal_test(): 42 for n in range(100): 43 readThread() 44 45 def multithread_test(): 46 threads=[] 47 for n in range(100): 48 thread = threading.Thread(target=readThread) 49 threads.append(thread) 50 thread.start() 51 for thread in threads: 52 thread.join() 53 54 start = time.time() 55 normal_test() 56 end = time.time() 57 print 'normal_test cost time:',end-start 58 59 start = time.time() 60 gevent_test() 61 end = time.time() 62 print 'gevent_test cost time:',end-start 63 64 # start = time.time() 65 # multithread_test() 66 # end = time.time() 67 # print 'multithread_test cost time:',end-start 68
搞了这么久的RPC通信框架TCE,完成java,c++,python,javascript,actionscript之间的互相调来调去,感觉很舒服。
作为移动应用平台,海量并发和高效传输是首要考虑要点。 市面上充值着都差不多的解决技术方案,无非那些 webserver+db ngnix+webserver+mq+logic-server ngnix+gevent-wsgi+db webapi已经被高举到不可超越的地步
而我,不走寻常路,我得另辟捷径 -http的效率根本无法跟socket的长连接媲美 -服务器是需要反向推送消息到移动设备的 -操作接口是简单的易扩展的,屏蔽掉通信细节 -支持htm5的websocket,支持java,支持python,支持python客户端调用
那我的方案是tce为基础的RPC框架平台,抛弃那些xmls,json,让开发者从无尽的网络编解码工作中脱离出来,不用考虑多种通信模式,同步和异步。 font-gate : 前端接入服务器 easymq : 平台服务总线消息队列 logic-service : 不同的逻辑服务器
设想,在android手机上java代码调用函数 whats_yourname(), 这个函数并不在本地,而是存在远端平台内部的一个服务器上,调用并被执行返回'scott'到手机终端,这是多么令人快乐的事情,用户不用关心消息如何被列集,如何被分派,这一切都是透明的。 同样,服务器主动推送商品打折信息到手机上,服务器仅仅需要调用手机接收函数,并填写要传输的参数即可。 其实,这些就是RPC的实现,这样的东东到处都是,DCOM,CORBA,ICE,只是我做得更加灵活
总是想做些令人轻松并快乐的事情!
gevent作为一款优秀的网络通信框架,其出色的性能得到大家一致认可,但在处理并行任务的时候也要注意很多问题,不然您的服务器将变得异常缓慢。 http://blog.163.com/lxl_1995/blog/static/677173392012724103742746/ 这篇博文讲的非常清楚,建议读一下 gevent的特点如下: 1. 单线程执行,所有协程都在同一进程中被模拟和调度分派 2. 可以创建成千上万的 协程,而不会受任何性能影响 3. 由于spawn的协程不是os分配和管理,所以不会有额外的线程资源分配,cpu也不用在这些线程之间调度切换 4. 单线程执行,无需考虑资源互斥 5. 协程之间切换是通过gevent的io阻塞完成,例如 gevent.sleep(0), queue.get/put,event,socket.... 每调用一次gevent 的api,gevent就能获得一次schedule的机会(这很类似操作系统的用户调用中断,由用户态切换到内核态) 以上特点保证gevent的性能非常出色,但当我们的server用到第三方软件包的时候那要非常小心了,特别是这些包内部涉及了io操作。 如果第三方软件包是纯python的那很简单,只需要gevent.monkey_patch(xxx)就okay; 但如果包是扩展clib,那要当心了,monkey_patch 并不能将其相关io操作打上补丁,为了使用这些第三方软件包,要求这些软件包必须支持 协程异步 接口(调用其同步io接口,将阻塞住gevent的执行线程,那gevent就完蛋了)。 gevent的patch对psycopg2无效,因为psycopg2的通信部分是c接口的函数库,还好psycopg2内部支持协程,需要使用 到 psycogreen 这个东东 psycogreen.gevent.patch_psycopg() 支持协程
之后的在gevent的线程中执行sql并等待数据返回时,gevent立马将执行切换到另外的线程 gvent项目中会用到各种诸多的第三方库,必须要求这些库的io接口不能是阻塞的,也就是能支持到gevent异步模式 应用逻辑代码在被执行时(无系统api呼叫),单线程比多线程执行速度更快。循环执行一段计算二次函数代码,由于期间没有系统api调用,os不能进行内核tasklet切换,所以导致cpu的峰值可以攀升到90%,直到硬件、时钟等中断产生,强行切换到其他线程。 多核心cpu表现为单个核始终异常的忙碌,其他几个比较空闲。
easymq 用于替代qpid的消息中间件。通信基础采用tce引擎,提供topic和queue两种队列。 mq服务器启动加载mq条目,建立mq内存对象,提供认证,客户程序连接时指定mq名称和认证口令, 管理程序可以动态增加、删除和监视队列。 mq持久化支持,根据创建参数控制durable。 easymq第一个版利用可以用python实现,之后考虑资源利用和系统会用c++实现 easymq是tce一个很好的应用。 定位够轻,够简单,暂不考虑负载均横和自动路由。 实例化mq服务器 1 def start(self): 2 tce.RpcCommunicator.instance().init('easymq.server') 3 ep = tce.RpcEndPoint(host=self.default_host,port=self.default_port) 4 adapter = tce.RpcCommunicator.instance().createAdapter('first',ep) 5 servant = self 6 adapter.addServant(servant) 7 print 'wait for shutdown..' 8 tce.RpcCommunicator.instance().waitForShutdown() 1 server = Server.instance() 2 print 'easymq server launched..' 3 server.init().start() 接收消息 1 import easymq 2 3 def readThread(conn): 4 while True: 5 m = conn.read( ) 6 print 'got one:',m 7 8 if __name__=='__main__': 9 easymq.init() 10 conn = easymq.Connection(('127.0.0.1',12301),'test',mode=easymq.READWRITE) 11 conn.open() 12 readThread(conn) 发送消息到接收者 1 import easymq 2 3 if __name__=='__main__': 4 easymq.init() 5 6 conn = easymq.Connection(('127.0.0.1',12301),'test',mode=easymq.WRITE) 7 conn.open() 8 for n in range(100): 9 conn.write(str(n)*10) 10 # waitForShutdown() 11 gevent.sleep(2)
|