Fork me on GitHub
随笔 - 215  文章 - 13  trackbacks - 0
<2024年11月>
272829303112
3456789
10111213141516
17181920212223
24252627282930
1234567


专注即时通讯及网游服务端编程
------------------------------------
Openresty 官方模块
Openresty 标准模块(Opm)
Openresty 三方模块
------------------------------------
本博收藏大部分文章为转载,并在文章开头给出了原文出处,如有再转,敬请保留相关信息,这是大家对原创作者劳动成果的自觉尊重!!如为您带来不便,请于本博下留言,谢谢配合。

常用链接

留言簿(1)

随笔分类

随笔档案

相册

Awesome

Blog

Book

GitHub

Link

搜索

  •  

积分与排名

  • 积分 - 210445
  • 排名 - 118

最新评论

阅读排行榜

http://www.tuicool.com/articles/zYfAnea


最近实现一个二维码扫描登录的功能,当用户用移动设备扫描PC端页面的二维码之后,移动设备通过常规HTTP短连接向服务器获取认证数据,认证通过后,服务器向PC浏览器主动推送帐号相关信息以完成PC端页面的登录。

服务器主动向浏览器推送数据,基本上就是ajax轮询、iframe stream、websocket等等,可以参见 《Comet (web技术)》

推送服务器有很多种,当然用强大稳定又顺手的nginx了。 nginx相关的推送插件模块有nginx-push-stream-module、nginx_http_push_module,但是很遗憾,可配置不可编程。又到我们的主角 OpenResty (OpenResty (aka. ngx_openresty) is a full-fledged web application server by bundling the standard Nginx core, lots of 3rd-party Nginx modules, as well as most of their external dependencies)出马的时候了。

1、PC浏览器向php发起ajax请求,获取一个与当前session相关的唯一的二维码URL,和一个唯一的sub订阅URL。

2、PC浏览器显示二维码,并对sub订阅URL发起ajax长连接或者websocket连接,这个请求将直接由nginx来hold住,超时时间由配置参数push_free_timeout决定。

3、手机端扫描并解析二维码,向php发起认证。

4、php收到移动设备的请求后,解出sessionid,向nginx的pub接口发布数据,该数据将被直接投递到对应的sub接口,并回传到浏览器。

5、如果sub接口在push_free_timeout指定的时间内一直没有收到数据,将主动断开与浏览器端的连接。此时,浏览器可以根据业务场景决定是否重新发起连接。

出于性能考虑,使用ngx.shared共享内存存储消息,只能共享于一个ngx实例内,对于10k级别的聊天室并发连接应该是够用了。
使用redis作为外部存储,也是可以的,如果100k的并发,需要注意ngx对nosql发起连接时耗尽socket,当然这个是可以解决的。
更大规模的并发,值得自研推送服务器。

在生产环境某个LB节点上试运行过,openresty跑着waf\fastcgi proxy\http proxy\comet。常态1k并发连接数,load 0.01,40k并发时,load只有0.15。

下面是相关测试代码,如果有空完善了再托管到github上,考虑写一个聊天室的完整demo。

resty.push基础模块(需要使用到ngx.shared共享内存来存储消息,在nginx.conf的http段配置lua_shared_dict push 10m;)

--[[
-- /usr/local/openresty/lualib/resty/push.lua
-- push.lua ,resty.push 基于nginx_lua的push推送方案
-- 支持多对多频道 
-- 支持long-pooling, stream, websocket
--
-- Author: chuyinfeng.com <Liujiaxiong@kingsoft.com> 
-- 2014.03.12
--]]

local _M = {_VERSION = '0.01'}

local function debug(msg)
  --ngx.say(msg)
  --ngx.flush(true)
end

-- 配置信息
_M.config = {
  -- 推送间隔,1s
  ['push_interval'] = 1,
  -- 消息队列最大长度
  ['msglist_len'] = 100,
  -- 消息生存周期
  ['msg_lefttime'] = 3,
  -- 频道空闲超时
  ['channel_timeout'] = 30,
  -- 推送空闲超时,在改时间段内无消息则关闭当前推送连接
  ['push_free_timeout'] = 10,
  -- 共享内存名
  ['store_name'] = 'push',
  -- 频道号
  ['channels'] = {1, 2},
}


-- 频道数量
_M.channels_len = 0
-- 当前读位置
_M.idx_read = 0
-- 共享内存
_M.store = nil


-- cjson 模块
local cjson = require "cjson"

--[[ 
-- 设置
--]]
_M.opt = function(self, k, v)
  local t = type(k)
  if t == 'table' then
    for key, val in pairs(k) do
      self.config[key] = val
    end
  end

  if t == 'string' then
    self.config[k] = v
  end

  self.channels_len = table.maxn(self.config['channels'])
  self.store = ngx.shared[self.config['store_name']]

end


--[[
-- 向频道写入消息
--
-- @param ngx.shared.dict, 共享内存
-- @param string channel_id,可用ngx.crc32_long生成
-- @param int channel_timeout, 频道空闲超时时间
-- @param string msg,消息内容 必须为字符串
-- @param int msg_lefttime, 消息生存周期
-- @param int msglist_len, 消息队列长度
-- @return boolean
--]]
local function _write(store, channel_id, channel_timeout, msg, msg_lefttime, msglist_len)
  local idx, ok, err

  -- 消息当前读取位置计数器+1
  idx, err = store:incr(channel_id, 1)

  -- 如果异常,则新建频道
  if err then
    ok, err = store:set(channel_id, 1, channel_timeout)
    if err then return 0 end
    idx = 1
  else
    store:replace(channel_id, idx, channel_timeout)
  end


  -- 写入消息
  debug("write " .. channel_id .. idx .. " , lefttime: " .. msg_lefttime.. " , msg: " .. msg)
  ok, err = store:set('m' .. channel_id .. idx, msg,  msg_lefttime)
  if err then return 0 end

  -- 清除队列之前的旧消息
  if idx > msglist_len then
    store:delete('m' .. channel_id .. (idx - msglist_len))
  end

  return idx
end

--[[
-- 从频道读取消息
--
-- @param int channel_id, 必须为整形,可用ngx.crc32_long生成
-- @param int offset,历史偏移量,最小为0
-- @return int len, 剩余消息数量
-- @return string msg, 消息  
--]]
local _read = function (store, channel_id, msglist_len, idx_read)
  local idx_msg, err, msg

   

  -- 获取最新消息的位置
  idx_msg, _ = store:get(channel_id)
  idx_msg = idx_msg or 0

  if idx_msg == 0 then
    idx_read = 0
  end
  
  if idx_msg - idx_read > msglist_len then
    idx_read = idx_msg - msglist_len
  end
  
  if idx_read < idx_msg then
    idx_read = idx_read + 1
    msg, _ = store:get('m' .. channel_id .. idx_read) 
  end

  -- 返回读的位置和消息的最大位置,以及消息
  return idx_read, idx_msg, msg
end

--[[
-- 推送消息
-- @param callback wrapper, 消息包装回调函数
--]]
_M.push = function(self, wrapper)
  local flag_work = true
  local flag_read = true
  local idx_read, idx_msg, msg, err
  local time_last_msg = ngx.time()
  
  while flag_work do
     for i = 1, self.channels_len do
      -- 循环读取当前频道,直到EOF
      flag_read = true
      while flag_read do
        debug("read from idx_read: " .. self.idx_read)
        self.idx_read, idx_msg, msg = _read(self.store, self.config['channels'][i], self.config['msglist_len'], self.idx_read)
        if msg ~= nil then
          debug("got msg and wrapper  msg: " .. msg)
          time_last_msg = ngx.time()
          wrapper(msg)
        end
        debug("idx_read: " .. self.idx_read .. ", idx_msg: " .. idx_msg)
        
        if self.idx_read == idx_msg then flag_read = false end
      end
      
    end

    debug("push_free: " .. ngx.time() - time_last_msg)
    if ngx.time() - time_last_msg  >= self.config['push_free_timeout'] then
      debug("push_timeout: " .. " last: " .. time_last_msg .. " , now: " .. ngx.time())
      flag_work = false
    end
    

    debug("sleep: " .. self.config['push_interval'])
    ngx.sleep(self.config['push_interval'])
  end

end

--[[
-- 发送消息到指定频道
--]]
_M.send = function(self, msg)
  local idx = 0
  for i = 1, self.channels_len do
    idx = _write(self.store, self.config['channels'][i], self.config['channel_timeout'], msg, self.config['msg_lefttime'], self.config['msglist_len'])
  end
  return true
end

--[[
-- jsonp格式化
--]]
_M.jsonp = function(self, data, cb)
    if cb then
        return cb .. "(" .. cjson.encode(data)  .. ");"
    else
        return cjson.encode(data)
    end
end


--[[
-- 公开成员
--]]

_M.new = function(self)
  return setmetatable({}, { __index = _M })
end

return _M

public发布

local push = require "resty.push"

local function exit(is_ws)
  if is_ws == nil then ngx.eof() end
  ngx.exit(444)
end
local ok, err = ngx.on_abort(exit)
if err then return end

local pub = push:new()

pub:opt({
  ['channels'] = 123,
  ['push_interval'] = 0.1,
  ['push_free_timeout'] = 27,
})

ngx.req.read_body()
local body, err = ngx.req.get_post_args()
if err then exit() end
if pub:send(body['data'] or '') then
  ngx.header['Content-Type'] = 'text/javascript;charset=UTF-8'
  ngx.status = ngx.HTTP_OK
  ngx.say(pub:jsonp({['status'] = 1}, args['callback']))
end

subscribe 订阅接口

local push = require "resty.push"

local function exit(is_ws)
  if is_ws == nil then ngx.eof() end
  ngx.exit(444)
end
local ok, err = ngx.on_abort(exit)
if err then return end

local args = ngx.req.get_uri_args()

local sub = push:new()

sub:opt({
  ['channels'] = 123,
  ['push_interval'] = 0.1,
  ['push_free_timeout'] = 27,
})

local wrapper = function(msg)
  ngx.header['Content-Type'] = 'text/javascript;charset=UTF-8'
  ngx.status = ngx.HTTP_OK
  ngx.say(sub:jsonp(msg, args['callback']))
  exit()
end

sub:push(wrapper)
wrapper(sub:jsonp({['status'] = 1, ['tips'] = 'timeout'}))
posted on 2016-08-04 08:55 思月行云 阅读(4318) 评论(0)  编辑 收藏 引用 所属分类: Nginx\Openresty

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理