http://www.tuicool.com/articles/zYfAnea
最近实现一个二维码扫描登录的功能,当用户用移动设备扫描PC端页面的二维码之后,移动设备通过常规HTTP短连接向服务器获取认证数据,认证通过后,服务器向PC浏览器主动推送帐号相关信息以完成PC端页面的登录。
服务器主动向浏览器推送数据,基本上就是ajax轮询、iframe stream、websocket等等,可以参见 《Comet (web技术)》
推送服务器有很多种,当然用强大稳定又顺手的nginx了。 nginx相关的推送插件模块有nginx-push-stream-module、nginx_http_push_module,但是很遗憾,可配置不可编程。又到我们的主角 OpenResty (OpenResty (aka. ngx_openresty) is a full-fledged web application server by bundling the standard Nginx core, lots of 3rd-party Nginx modules, as well as most of their external dependencies)出马的时候了。
1、PC浏览器向php发起ajax请求,获取一个与当前session相关的唯一的二维码URL,和一个唯一的sub订阅URL。
2、PC浏览器显示二维码,并对sub订阅URL发起ajax长连接或者websocket连接,这个请求将直接由nginx来hold住,超时时间由配置参数push_free_timeout决定。
3、手机端扫描并解析二维码,向php发起认证。
4、php收到移动设备的请求后,解出sessionid,向nginx的pub接口发布数据,该数据将被直接投递到对应的sub接口,并回传到浏览器。
5、如果sub接口在push_free_timeout指定的时间内一直没有收到数据,将主动断开与浏览器端的连接。此时,浏览器可以根据业务场景决定是否重新发起连接。
出于性能考虑,使用ngx.shared共享内存存储消息,只能共享于一个ngx实例内,对于10k级别的聊天室并发连接应该是够用了。
使用redis作为外部存储,也是可以的,如果100k的并发,需要注意ngx对nosql发起连接时耗尽socket,当然这个是可以解决的。
更大规模的并发,值得自研推送服务器。
在生产环境某个LB节点上试运行过,openresty跑着waf\fastcgi proxy\http proxy\comet。常态1k并发连接数,load 0.01,40k并发时,load只有0.15。
下面是相关测试代码,如果有空完善了再托管到github上,考虑写一个聊天室的完整demo。
resty.push基础模块(需要使用到ngx.shared共享内存来存储消息,在nginx.conf的http段配置lua_shared_dict push 10m;)
--[[
-- /usr/local/openresty/lualib/resty/push.lua
-- push.lua ,resty.push 基于nginx_lua的push推送方案
-- 支持多对多频道
-- 支持long-pooling, stream, websocket
--
-- Author: chuyinfeng.com <Liujiaxiong@kingsoft.com>
-- 2014.03.12
--]]
local _M = {_VERSION = '0.01'}
local function debug(msg)
--ngx.say(msg)
--ngx.flush(true)
end
-- 配置信息
_M.config = {
-- 推送间隔,1s
['push_interval'] = 1,
-- 消息队列最大长度
['msglist_len'] = 100,
-- 消息生存周期
['msg_lefttime'] = 3,
-- 频道空闲超时
['channel_timeout'] = 30,
-- 推送空闲超时,在改时间段内无消息则关闭当前推送连接
['push_free_timeout'] = 10,
-- 共享内存名
['store_name'] = 'push',
-- 频道号
['channels'] = {1, 2},
}
-- 频道数量
_M.channels_len = 0
-- 当前读位置
_M.idx_read = 0
-- 共享内存
_M.store = nil
-- cjson 模块
local cjson = require "cjson"
--[[
-- 设置
--]]
_M.opt = function(self, k, v)
local t = type(k)
if t == 'table' then
for key, val in pairs(k) do
self.config[key] = val
end
end
if t == 'string' then
self.config[k] = v
end
self.channels_len = table.maxn(self.config['channels'])
self.store = ngx.shared[self.config['store_name']]
end
--[[
-- 向频道写入消息
--
-- @param ngx.shared.dict, 共享内存
-- @param string channel_id,可用ngx.crc32_long生成
-- @param int channel_timeout, 频道空闲超时时间
-- @param string msg,消息内容 必须为字符串
-- @param int msg_lefttime, 消息生存周期
-- @param int msglist_len, 消息队列长度
-- @return boolean
--]]
local function _write(store, channel_id, channel_timeout, msg, msg_lefttime, msglist_len)
local idx, ok, err
-- 消息当前读取位置计数器+1
idx, err = store:incr(channel_id, 1)
-- 如果异常,则新建频道
if err then
ok, err = store:set(channel_id, 1, channel_timeout)
if err then return 0 end
idx = 1
else
store:replace(channel_id, idx, channel_timeout)
end
-- 写入消息
debug("write " .. channel_id .. idx .. " , lefttime: " .. msg_lefttime.. " , msg: " .. msg)
ok, err = store:set('m' .. channel_id .. idx, msg, msg_lefttime)
if err then return 0 end
-- 清除队列之前的旧消息
if idx > msglist_len then
store:delete('m' .. channel_id .. (idx - msglist_len))
end
return idx
end
--[[
-- 从频道读取消息
--
-- @param int channel_id, 必须为整形,可用ngx.crc32_long生成
-- @param int offset,历史偏移量,最小为0
-- @return int len, 剩余消息数量
-- @return string msg, 消息
--]]
local _read = function (store, channel_id, msglist_len, idx_read)
local idx_msg, err, msg
-- 获取最新消息的位置
idx_msg, _ = store:get(channel_id)
idx_msg = idx_msg or 0
if idx_msg == 0 then
idx_read = 0
end
if idx_msg - idx_read > msglist_len then
idx_read = idx_msg - msglist_len
end
if idx_read < idx_msg then
idx_read = idx_read + 1
msg, _ = store:get('m' .. channel_id .. idx_read)
end
-- 返回读的位置和消息的最大位置,以及消息
return idx_read, idx_msg, msg
end
--[[
-- 推送消息
-- @param callback wrapper, 消息包装回调函数
--]]
_M.push = function(self, wrapper)
local flag_work = true
local flag_read = true
local idx_read, idx_msg, msg, err
local time_last_msg = ngx.time()
while flag_work do
for i = 1, self.channels_len do
-- 循环读取当前频道,直到EOF
flag_read = true
while flag_read do
debug("read from idx_read: " .. self.idx_read)
self.idx_read, idx_msg, msg = _read(self.store, self.config['channels'][i], self.config['msglist_len'], self.idx_read)
if msg ~= nil then
debug("got msg and wrapper msg: " .. msg)
time_last_msg = ngx.time()
wrapper(msg)
end
debug("idx_read: " .. self.idx_read .. ", idx_msg: " .. idx_msg)
if self.idx_read == idx_msg then flag_read = false end
end
end
debug("push_free: " .. ngx.time() - time_last_msg)
if ngx.time() - time_last_msg >= self.config['push_free_timeout'] then
debug("push_timeout: " .. " last: " .. time_last_msg .. " , now: " .. ngx.time())
flag_work = false
end
debug("sleep: " .. self.config['push_interval'])
ngx.sleep(self.config['push_interval'])
end
end
--[[
-- 发送消息到指定频道
--]]
_M.send = function(self, msg)
local idx = 0
for i = 1, self.channels_len do
idx = _write(self.store, self.config['channels'][i], self.config['channel_timeout'], msg, self.config['msg_lefttime'], self.config['msglist_len'])
end
return true
end
--[[
-- jsonp格式化
--]]
_M.jsonp = function(self, data, cb)
if cb then
return cb .. "(" .. cjson.encode(data) .. ");"
else
return cjson.encode(data)
end
end
--[[
-- 公开成员
--]]
_M.new = function(self)
return setmetatable({}, { __index = _M })
end
return _M
public发布
local push = require "resty.push"
local function exit(is_ws)
if is_ws == nil then ngx.eof() end
ngx.exit(444)
end
local ok, err = ngx.on_abort(exit)
if err then return end
local pub = push:new()
pub:opt({
['channels'] = 123,
['push_interval'] = 0.1,
['push_free_timeout'] = 27,
})
ngx.req.read_body()
local body, err = ngx.req.get_post_args()
if err then exit() end
if pub:send(body['data'] or '') then
ngx.header['Content-Type'] = 'text/javascript;charset=UTF-8'
ngx.status = ngx.HTTP_OK
ngx.say(pub:jsonp({['status'] = 1}, args['callback']))
end
subscribe 订阅接口
local push = require "resty.push"
local function exit(is_ws)
if is_ws == nil then ngx.eof() end
ngx.exit(444)
end
local ok, err = ngx.on_abort(exit)
if err then return end
local args = ngx.req.get_uri_args()
local sub = push:new()
sub:opt({
['channels'] = 123,
['push_interval'] = 0.1,
['push_free_timeout'] = 27,
})
local wrapper = function(msg)
ngx.header['Content-Type'] = 'text/javascript;charset=UTF-8'
ngx.status = ngx.HTTP_OK
ngx.say(sub:jsonp(msg, args['callback']))
exit()
end
sub:push(wrapper)
wrapper(sub:jsonp({['status'] = 1, ['tips'] = 'timeout'}))
posted on 2016-08-04 08:55
思月行云 阅读(4335)
评论(0) 编辑 收藏 引用 所属分类:
Nginx\Openresty