|
ZMQ(zeromq) 是一个消息处理队列库 模式有以下三种 1、Request-Reply模式:客户端在请求后,服务端必须响应 2、Publish-Subscribe模式:广播所有client,没有队列缓存,断开连接数据将永远丢失。client可以进行数据过滤。 3、Parallel Pipeline模式:push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。区别于Publish-Subscribe存在一个数据缓存和处理负载。当连接被断开,数据不会丢失,重连后数据继续发送到对端。 日志接口及客户端
1 import time 2 import logging 3 import traceback 4 from io import StringIO 5 6 import zmq 7 from tornado.options import options 8 9 10 class ZMQHandler(logging.Handler): 11 def __init__(self, host, port): 12 super().__init__() 13 self.tcp_url = "tcp://{0}:{1}".format(host, port) 14 context = zmq.Context() 15 self.zq = context.socket(zmq.PUSH) 16 self.zq.connect(self.tcp_url) 17 18 def emit(self, record): 19 try: 20 self.zq.send_pyobj(record, flags=zmq.NOBLOCK, protocol=4) 21 except Exception: 22 fp = StringIO() 23 traceback.print_exc(file=fp) 24 error = fp.getvalue() 25 logger.log(logging.ERROR, error) 26 # logger.log(logging.ERROR, "\n".join(error.split("\n")[:-10])) 27 28 29 class ZMQListener(object): 30 def __init__(self, host, port, *handlers, respect_handler_level=False): 31 self.tcp_url = "tcp://{0}:{1}".format(host, port) 32 self.handlers = handlers 33 self.respect_handler_level = respect_handler_level 34 self.context = None 35 self.zq = None 36 self.connect() 37 38 def connect(self): 39 self.context = zmq.Context() 40 self.zq = self.context.socket(zmq.PULL) 41 self.zq.bind(self.tcp_url) 42 43 def close(self): 44 self.zq.close() 45 self.context.term() 46 self.context = None 47 self.zq = None 48 49 def handle(self, record): 50 """ 51 Handle a record. 52 53 This just loops through the handlers offering them the record 54 to handle. 55 """ 56 for handler in self.handlers: 57 if not self.respect_handler_level: 58 process = True 59 else: 60 process = record.levelno >= handler.level 61 62 if process: 63 handler.handle(record) 64 65 def run(self): 66 while True: 67 try: 68 record = self.zq.recv_pyobj(flags=zmq.NOBLOCK) 69 if record.msg == "EOF": 70 break 71 self.handle(record) 72 except zmq.ZMQError: 73 time.sleep(1) 74 except Exception as e: 75 print(e) 76 77 78 logger = logging.getLogger() 79 zmq_handler = ZMQHandler("127.0.0.1", 9825) 80 logger.setLevel(logging.DEBUG) 81 logger.addHandler(zmq_handler) 82 83 84 def test_zmq(): 85 zq_handler = ZMQHandler("127.0.0.1", 9825) 86 87 rootLogger = logging.getLogger('') 88 rootLogger.setLevel(logging.WARN) 89 # socketHandler = logging.handlers.SocketHandler('localhost', 90 # logging.handlers.DEFAULT_TCP_LOGGING_PORT) 91 # don't bother with a formatter, since a socket handler sends the event as 92 # an unformatted pickle 93 rootLogger.addHandler(zq_handler) 94 95 # Now, we can log to the root logger, or any other logger. First the root 96 logging.info('Jackdaws love my big sphinx of quartz.') 97 98 # Now, define a couple of other loggers which might represent areas in your 99 # application: 100 101 logger1 = logging.getLogger('myapp.area1') 102 logger2 = logging.getLogger('myapp.area2') 103 104 logger1.debug('Quick zephyrs blow, vexing daft Jim.') 105 logger1.info('How quickly daft jumping zebras vex.') 106 logger2.warning('Jail zesty vixen who grabbed pay from quack.') 107 logger2.error('The five boxing wizards jump quickly.') 108 109 110 def test_zmq_listener(): 111 handler = logging.StreamHandler() 112 formatter = logging.Formatter('%(name)s: %(message)s') 113 handler.setFormatter(formatter) 114 zq_listener = ZMQListener("127.0.0.1", 8090, handler) 115 zq_listener.run() 116 代码中显示用的zmq的第三种模式处理日志 服务端
1 import logging 2 import os 3 import sys 4 from logging.handlers import TimedRotatingFileHandler 5 from tornado.options import options, define 6 7 from settings import log_dir 8 9 define("logger_host", "127.0.0.1", type=int) 10 define("logger_port", 9825, type=int) 11 define("logger_dir", log_dir, type=str) 12 define("redis_host", "127.0.0.1", type=str) 13 define("redis_port", 6379, type=int) 14 define("redis_password", None, type=str) 15 define("redis_db", 12, type=int) 16 options.parse_command_line() 17 stream_handler = logging.StreamHandler() 18 19 formatter = logging.Formatter('%(asctime)s [%(filename)s:%(lineno)s]: %(message)s') 20 stream_handler.setFormatter(formatter) 21 22 file_handler = TimedRotatingFileHandler( 23 os.path.join(options.logger_dir, "{0}".format(options.logger_port)), backupCount=18, when='h',interval=4) 24 file_handler.suffix="%Y-%m-%d_%H-%M-%S.log" 25 file_handler.setLevel(logging.DEBUG) 26 file_handler.formatter = formatter 27 from core.logger import ZMQListener 28 zmq_listener = ZMQListener(options.logger_host, options.logger_port, file_handler) 29 zmq_listener.run() 30 直接用python启动服务器就可以,这样可实现日志的远程写入
|