Fork me on GitHub
随笔 - 215  文章 - 13  trackbacks - 0
<2016年8月>
31123456
78910111213
14151617181920
21222324252627
28293031123
45678910


专注即时通讯及网游服务端编程
------------------------------------
Openresty 官方模块
Openresty 标准模块(Opm)
Openresty 三方模块
------------------------------------
本博收藏大部分文章为转载,并在文章开头给出了原文出处,如有再转,敬请保留相关信息,这是大家对原创作者劳动成果的自觉尊重!!如为您带来不便,请于本博下留言,谢谢配合。

常用链接

留言簿(1)

随笔分类

随笔档案

相册

Awesome

Blog

Book

GitHub

Link

搜索

  •  

积分与排名

  • 积分 - 212050
  • 排名 - 118

最新评论

阅读排行榜

  • https://www.aliyun.com/jiaocheng/647017.html
    https://github.com/chenxiaofa/p
    前言

    作为一个游戏从业者不可能不使用推方案,以前一直使用 nginx-push-stream-module这个模块的 Forever Iframe模式来实现推方案。

    最近决定研究下 lua-resty-websocket来实现一个更加高效好用推方案

    不推荐使用的场景

    由于 OpenResty目前还不能做到跨 worker通信,所以想到实现指定推送需要中转一次,效率上可能不如其他语言如 golang等

    过于复杂的业务逻辑 频繁的指定推送(单对单、组、Tag等) 广播过多 一起工作的好基友们

    想要推的优雅以下几个基友的帮忙是不可或缺的

    ngx.semaphore

    它可以让你在想要发消息的地方优雅的进入到发送阶段,也可以让你来优雅的控制一个链接超时的关闭。

    ngx.shared

    由于目前我们无法做到跨 worker的通信,所以必须借助共享内存来中转不属于当前 worker的消息。

    lua-resty-websocket

    由于贪图方便还是直接使用了现成的库,喜欢折腾的小伙伴请移步 stream-lua-nginx-module

    大概的思路

    由于不能跨 worker通信所以我给每个 worker申请了一个 shared共享内存来保存消息。

    理论上 shared的数量等于 worker的数量最佳。

    然后每个 worker启动一个 timer来判断当前 worker的 message id和 shared中的 message id是否有变化。

    这里为什么不用 shared的有序列表来做,容我先卖个关子。

    当发生变化时,判断消息的目标是否在自己的 session hash中,如果在则发之。

    开始准备工作 修改配置文件

    首先修改 nginx.conf配置,增加以下设置

    lua_shared_dict message_1 10m;
    lua_shared_dict message_2 10m;
    lua_shared_dict message_n 10m;
    init_worker_by_lua_file scripts/init_worker_by_lua.lua;
    init_worker_by_lua
    local ngx = ngx
    local ngx_log = ngx.log
    local ngx_ERR = ngx.ERR
    local ngx_timer_at = ngx.timer.at
    local require = require
    local socketMgr = require("socketMgr")
    local delay = 1
    local loopMessage
    loopMessage = function(premature)
    if premature then
    ngx_log(ngx_ERR, "timer was shut: ", err)
    return
    end
    socketMgr:loopMessages()
    local ok, err = ngx_timer_at(delay, loopMessage)
    if not ok then
    ngx_log(ngx_ERR, "failed to create the timer: ", err)
    return
    end
    end
    loopMessage()
    loopMessages

    判断 local message id和 shared message id是否不等。

    随后每次 local message id+ 1 从 shared拉取数据,进行消息推送逻辑。

    建立连接

    不做过多说明,自行查看 lua-resty-websocket的 wiki

    当连接监听好之后,要进行一系列的管理。如:

    session id和 user id的双向映射 session id和 group name的双向映射

    后面再详细说明

    生成 session id

    我是用 ( worker id+ 1) * 100000 + worker's local incr id来生成唯一 session id比较简陋,但是够用。

    这么做的原因是,通过对 session id进行取余可以很方便的得知 worker id,可以方便的给 shared写消息。

    local ngx_worker_id = ngx.worker.id()local _incr_id = 0local _gen_session_id = function()_incr_id = _incr_id + 1return (ngx_worker_id + 1) * 100000 + _incr_idend 设置消息映射

    这个可以用于收到当前 worker所属的 shared message判断是否在当前进程。

    _messages[session_id] = {}_semaphores[session_id] = semaphore.new(0) 接收消息&;发送消息

    代码和 官方例子类同不做过多说明,只说我改了什么。

    在接收消息中管理了一个变量即 close_flag用于管理 send message轻线程的退出。

    以下是一段伪代码,含义的话请联系上下文。

    local session_id = sessionMgr:gen_session_id()
    local send_semaphore = sessionMgr:get_semaphore(session_id)
    local close_flag = false
    local function _push_thread_function()
    while close_flag == false do
    local ok, err = send_semaphore:wait(300)
    if ok then
    local messages = socketMgr:getMessages(session_id)
    while messages and #messages > 0 do
    local message = messages[1]
    table_remove(messages, 1)
    --- your send message function handler
    end
    end
    if close_flag then
    socketMgr:destory(session_id)
    break
    end
    end
    end
    local push_thread = ngx_thread_spawn(_push_thread_function)
    while true do
    local data, typ, err = wbsocket:recv_frame()
    while err == "again" do
    local cut_data cut_data, _, err = wbsocket:recv_frame()
    data = data .. cut_data
    end
    if not data then
    close_flag = true
    send_semaphore:post(1)
    break
    elseif typ == 'close' then
    close_flag = true
    send_semaphore:post(1)
    break
    elseif typ == 'ping' then
    local bytes, err = wbsocket:send_pong(data)
    if not bytes then
    close_flag = true
    send_semaphore:post(1)
    break
    end
    elseif typ == 'pong' then
    elseif typ == 'text' then
    -- your receive function handler
    elseif typ == 'continuation' then
    elseif typ == 'binary' then
    end
    end
    ngx_thread_wait(push_thread)
    wbsocket:send_close() 消息推送

    现在说说为什么不用 shared的有序列表来存储消息,我是使用了 shared的 set方法中的 flag属性来存放 session id。

    这样在获得一个消息的时候,能很方便的知道消息是发给哪个 session id的。

    继续一段伪代码。

    local ngx_shared = ngx.shared
    local _shared_dicts = {ngx_shared.message_1,ngx_shared.message_2,ngx_shared.message_n,}
    local current_shared = _shared_dicts[ngx_worker_id + 1]
    local current_message_id = 1
    --- 如果在当前进程
    if _messages[session_id] then
    table.insert(_messages[session_id], "message data")
    _semaphores[session_id]:post(1)
    --- 会进入到,上述 _push_thread_function 方法中,进行发送逻辑
    else
    local shared_id = session_id % 100000
    local message_shared = _shared_dicts[shared_id]
    local message_id = message_shared:incr("id", 1, 0)
    message_shared:set("message." .. message_id, "message data", 60, session_id)
    end
    其他

    https://github.com/chenxiaofa/p

    借鉴了这位同学的设计思路,实现了额外逻辑。如:

    加入、退出、销毁组 各 worker之间的 cmd内部命令执行 热更新的特殊处理 等
posted on 2018-05-04 12:05 思月行云 阅读(3152) 评论(0)  编辑 收藏 引用 所属分类: Nginx\Openresty

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理