Fork me on GitHub
随笔 - 215  文章 - 13  trackbacks - 0
<2016年12月>
27282930123
45678910
11121314151617
18192021222324
25262728293031
1234567


专注即时通讯及网游服务端编程
------------------------------------
Openresty 官方模块
Openresty 标准模块(Opm)
Openresty 三方模块
------------------------------------
本博收藏大部分文章为转载,并在文章开头给出了原文出处,如有再转,敬请保留相关信息,这是大家对原创作者劳动成果的自觉尊重!!如为您带来不便,请于本博下留言,谢谢配合。

常用链接

留言簿(1)

随笔分类

随笔档案

相册

Awesome

Blog

Book

GitHub

Link

搜索

  •  

积分与排名

  • 积分 - 212050
  • 排名 - 118

最新评论

阅读排行榜

云凤大神的开发笔记:http://blog.codingnow.com/2011/11/dev_note_3.html

http://www.linuxde.net/2013/05/13647.html

首先了解下ZMQ的概念,ØMQ(ZeroMQ)是啥玩意儿?

ZeroMQ是个类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。

ZeroMQ是网络栈中新的一层,它是个可伸缩层,分散在分布式系统间。因此,它可支持任意大的应用程序。ØMQ不是简单的点对点交互,相反,它定义了分布式系统的全局拓扑。ØMQ应用程序没有锁,可并行运行。此外,它可在多个线程、内核和主机盒之间弹性伸缩。

与其他的消息队列相比,ZeroMQ有以下一些特点

1、点对点无中间节点

传统的消息队列都需要一个消息服务器来存储转发消息。而ZeroMQ则放弃了这个模式,把侧重点放在了点对点的消息传输上,并且(试图)做到极致。以为消息服务器最终还是转化为服务器对其他节点的点对点消息传输上。ZeroMQ能缓存消息,但是是在发送端缓存。ZeroMQ里有水位设置的相关接口来控制缓存量。当然,ZeroMQ也支持传统的消息队列(通过zmq_device来实现)。

2、强调消息收发模式

在点对点的消息传输上ZeroMQ将通信的模式做了归纳,比如常见的订阅模式(一个消息发多个客户),分发模式(N个消息平均分给X个客户)等等。下面是目前支持的消息模式配对,任何一方都可以做为服务端。

• PUB and SUB
• REQ and REP
• REQ and XREP
• XREQ and REP
• XREQ and XREP
• XREQ and XREQ
• XREP and XREP
• PUSH and PULL
• PAIR and PAIR

3、以统一接口支持多种底层通信方式(线程间通信,进程间通信,跨主机通信)

如果你想把本机多进程的软件放到跨主机的环境里去执行,通常要将IPC接口用套接字重写一遍。非常麻烦。而有了ZeroMQ就方便多了,只要把通信协议从"ipc:///xxx"改为"tcp://*.*.*.*:****"就可以了,其他代码通通不需要改,如果这个是从配置文件里读的话,那么程序就完全不要动了,直接复制到其他机器上就可以了。以为ZeroMQ为我们做了很多。

4、异步,强调性能

ZeroMQ设计之初就是为了高性能的消息发送而服务的,所以其设计追求简洁高效。它发送消息是异步模式,通过单独出一个IO线程来实现,所以消息发送调用之后不要立刻释放相关资源哦,会出错的(以为还没发送完),要把资源释放函数交给ZeroMQ让ZeroMQ发完消息自己释放。

zeromq的官方网站: http://www.zeromq.org/

系统环境约定:

OS:CentOS 6.3 X64
web目录:/htdoc/web
软件临时存放目录:/opt/
zeromq安装目录:/usr/local/webserver/zeromq

一、安装zeromq

Tip:To build on UNIX-like systems,make sure that libtool, autoconf, automake are installed.

[root@iredmail opt]# wget http://download.zeromq.org/zeromq-3.2.3.tar.gz
[root@iredmail opt]# tar -zxvf zeromq-3.2.3.tar.gz
[root@iredmail opt]# cd zeromq-3.2.3
[root@iredmail zeromq-3.2.3]# ./configure --prefix=/usr/local/webserver/zeromq --with-pgm=libpgm-5.1.118~dfsg
(libpgm-5.1.118~dfsg位于zeromq-3.2.3/foreign/openpgm/目录下)
[root@iredmail zeromq-3.2.3]# make && make install
[root@iredmail zeromq-3.2.3]# ldconfig

官方文档:http://www.zeromq.org/area:download

Tip:If you get an error:configure: "error: cannot link with -luuid, install uuid-dev".Please install e2fsprogs-devel

二、安装PHP扩展

[root@iredmail zeromq-3.2.3]# cd..
[root@iredmail opt]# git clone git://github.com/mkoppanen/php-zmq.git
Tip: If you get an error of git command,please install git or use other download tools
[root@iredmail opt]# cd php-zmq
[root@iredmail php-zmq]# /usr/local/webserver/php5318/bin/phpize

Tip: 
①/usr/local/webserver/php5318/is my php installation path;
②If you are using php installed from your distribution's package manager the 'phpize' command is usually in php-dev or php-devel package
[root@iredmail php-zmq]# ./configure --with-php-config=/usr/local/webserver/php5318/bin/php-config --with-zmq=/usr/local/webserver/zeromq
Tip: 
①/usr/local/webserver/zeromq is my zeromq installation path;
②If you get an error:"checking libzmq installation... configure: error: Unable to find libzmq installation",please use param"--with-zmq" specify the zeromq installation path.

[root@iredmail php-zmq]# make && make install
Installing shared extensions:     /usr/local/webserver/php5318/lib/php/extensions/no-debug-non-zts-20090626/
表示生成了动态链接库文件zmq.so.这个时候可以查看一下目录里有没有zmq.so 这个文件.

Add the following line to your php.ini:

extension=zmq.so
OR:
If you are using PHP 5.4.x and/or using PHP-FPM, you will need to add a zmq.ini file in /etc/php5/conf.d:
Add the following:
extension=zmq.so

Restart php-fpm

访问phpinfo页面就可以看到zeroMQ的消息了:

官方文档:http://www.zeromq.org/bindings:php
If you need an JAVA language environment,you can get client from:https://github.com/zeromq/jzmq
If you need an Python language environment,you can get client from:https://github.com/zeromq/pyzmq

三、实例测试

ZMQ提供了三个基本的通信模型,分别是“Request-Reply”、“Publisher-Subscriber”、“Parallel Pipeline”,具体介绍可参阅官方提供的《ØMQ(ZeroMQ) PHP Guide》,这里我们使用Request-Reply模型测试

[root@iredmail php-zmq]# cp -a examples /htdoc/web/examples
[root@iredmail php-zmq]# cd /htdoc/web/examples
[root@iredmail examples]# php poll-server.php   //运行服务端
Added object with id o:000000000b8afe2b000000002aec4116
Received message: hello there!

访问http://localhost/examples/client.php,会看到 string(7) "Got it!" 字样,表示写入队列成功.

[root@iredmail examples]# php client.php
string(7) "Got it!"

从zeromq的examples实例代码中,我们可以了解到使用ZMQ写基本的程序的方法,需要注意的是:

1) 服务端和客户端无论谁先启动,效果是相同的,这点不同于Socket。
2) 在服务端收到信息以前,程序是阻塞的,会一直等待客户端连接上来。
3) 服务端收到信息以后,会send一个“World”给客户端。值得注意的是一定是client连接上来以后,send消息给Server,然后Server再rev然后响应client,这种一问一答式的。如果Server先send,client先rev是会报错的。
4) ZMQ通信通信单元是消息,他除了知道Bytes的大小,他并不关心的消息格式。因此,你可以使用任何你觉得好用的数据格式。Xml、Protocol Buffers、Thrift、json等等。
5) 虽然可以使用ZMQ实现HTTP协议,但是,这绝不是他所擅长的。

检查zeroMQ状态

现在可以查看是否监听了 5555 端口

[root@iredmail php-zmq]# lsof -i:5555
COMMAND  PID USER   FD   type DEVICE SIZE/OFF NODE NAME
php-fpm 1047  www   11u  IPv4  93051      0t0  TCP iredmail:58208->iredmail:personal-agent (ESTABLISHED)
php-fpm 1048  www   11u  IPv4  93047      0t0  TCP iredmail:58206->iredmail:personal-agent (ESTABLISHED)
php-fpm 1051  www   11u  IPv4  93049      0t0  TCP iredmail:58207->iredmail:personal-agent (ESTABLISHED)
php     1893 root    9u  IPv4  93046      0t0  TCP iredmail:personal-agent (LISTEN)
php     1893 root   10u  IPv4  93048      0t0  TCP iredmail:personal-agent->iredmail:58206 (ESTABLISHED)
php     1893 root   11u  IPv4  93050      0t0  TCP iredmail:personal-agent->iredmail:58207 (ESTABLISHED)
php     1893 root   12u  IPv4  93052      0t0  TCP iredmail:personal-agent->iredmail:58208 (ESTABLISHED)

[root@iredmail php-zmq]# netstat -anp | grep 5555
tcp        0      0 127.0.0.1:5555              0.0.0.0:*                   LISTEN      1893/php            
tcp        0      0 127.0.0.1:58207             127.0.0.1:5555              ESTABLISHED 1051/php-fpm        
tcp        0      0 127.0.0.1:58206             127.0.0.1:5555              ESTABLISHED 1048/php-fpm        
tcp        0      0 127.0.0.1:58208             127.0.0.1:5555              ESTABLISHED 1047/php-fpm        
tcp        0      0 127.0.0.1:5555              127.0.0.1:58206             ESTABLISHED 1893/php            
tcp        0      0 127.0.0.1:5555              127.0.0.1:58208             ESTABLISHED 1893/php            
tcp        0      0 127.0.0.1:5555              127.0.0.1:58207             ESTABLISHED 1893/php

更多ZeroMQ的帮助文档:

ZMQ API参考手册:http://api.zeromq.org/
PHP使用手册可参考:http://zguide.zeromq.org/php:all
ZeroMQ的学习和研究:http://www.searchtb.com/2012/08/zeromq-primer.html
ZMQ PHP编程参考手册:http://php.zero.mq
这里有大量程序示例可供参考:https://github.com/imatix/zguide

zeromq_传说中最快的消息队列

2012-03-03 08:42 by 轩脉刃, 17282 阅读, 0 评论, 收藏, 编辑

Zeromq的资源:

Zeromq模式:

http://blog.codingnow.com/2011/02/zeromq_message_patterns.html

zeromq主页:

http://www.zeromq.org/

Zeromq Guild:

http://zguide.zeromq.org/page:all#Fixing-the-World

Zeromq 中文简介:

http://blog.csdn.net/program_think/article/details/6687076

Zero wiki:

http://en.wikipedia.org/wiki/%C3%98MQ

zeromq系列:

http://iyuan.iteye.com/blog/972949

Zeromq资源阅读:

ØMQ(Zeromq) 是一个更为高效的传输层

优势是:

1 程序接口库是一个并发框架

2 在集群和超级计算机上表现得比TCP更快

3 通过inproc, IPC, TCP, 和 multicast进行传播消息

4 通过发散,订阅,流水线,请求的方式连接

5 对于不定规模的多核消息传输应用使用异步IO

6 有非常大并且活跃的开源社区

7 支持30+的语言

8 支持多种系统

 

Zeromq定义为“史上最快的消息队列”

从网络通信的角度看,它处于会话层之上,应用层之下。

ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.


Zeromq中传递的数据格式是由用户自己负责,就是说如果server发送的string是有带"\0"的,那么client就必须要知道有这个

 

Pub_Sub模式。

the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,使用proxy。

Zeromq示例:

1 获取例子

git clone --depth=1 git://github.com/imatix/zguide.git

2 服务器端:

(当服务器收到消息的时候,服务器回复“World”)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php
    /*
    *  Hello World server
    *  Binds REP socket to tcp://*:5555
    *  Expects "Hello" from client, replies with "World"
    * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
    */
     
    $context = new ZMQContext(1);
     
    //  Socket to talk to clients
    $responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
    $responder->bind("tcp://*:5555");
     
    while(true) {
        //  Wait for next request from client
        $request = $responder->recv();
        printf ("Received request: [%s]\n", $request);
     
        //  Do some 'work'
        sleep (1);
     
        //  Send reply back to client
        $responder->send("World");   
 
}

3 客户端:

(客户端发送消息)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php
    /*
    *  Hello World client
    *  Connects REQ socket to tcp://localhost:5555
    *  Sends "Hello" to server, expects "World" back
    * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
    */
     
    $context = new ZMQContext();
     
    //  Socket to talk to server
    echo "Connecting to hello world server…\n";
    $requester = new ZMQSocket($context, ZMQ::SOCKET_REQ);
    $requester->connect("tcp://localhost:5555");
     
    for($request_nbr = 0; $request_nbr != 10; $request_nbr++) {
        printf ("Sending request %d…\n", $request_nbr);
        $requester->send("Hello");
         
        $reply = $requester->recv();
        printf ("Received reply %d: [%s]\n", $request_nbr, $reply);
 
}
1
 

天气气候订阅系统:(pub-sub)

1 server端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?php
    /*
    *  Weather update server
    *  Binds PUB socket to tcp://*:5556
    *  Publishes random weather updates
    * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
    */
     
    //  Prepare our context and publisher
    $context = new ZMQContext();
    $publisher = $context->getSocket(ZMQ::SOCKET_PUB);
    $publisher->bind("tcp://*:5556");
    $publisher->bind("ipc://weather.ipc");
     
    while (true) {
        //  Get values that will fool the boss
        $zipcode     = mt_rand(0, 100000);
        $temperature = mt_rand(-80, 135);
        $relhumidity = mt_rand(10, 60);
     
        //  Send message to all subscribers
        $update = sprintf ("%05d %d %d", $zipcode, $temperature, $relhumidity);
        $publisher->send($update);
    }

2 client端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php
    /*
    *  Weather update client
    *  Connects SUB socket to tcp://localhost:5556
    *  Collects weather updates and finds avg temp in zipcode
    * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
    */
     
    $context = new ZMQContext();
     
    //  Socket to talk to server
    echo "Collecting updates from weather server…", PHP_EOL;
    $subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB);
    $subscriber->connect("tcp://localhost:5556");
     
    //  Subscribe to zipcode, default is NYC, 10001
    $filter = $_SERVER['argc'] > 1 ? $_SERVER['argv'][1] : "10001";
    $subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter);
     
    //  Process 100 updates
    $total_temp = 0;
    for ($update_nbr = 0; $update_nbr < 100; $update_nbr++) {
        $string = $subscriber->recv();
        sscanf ($string, "%d %d %d", $zipcode, $temperature, $relhumidity);
        $total_temp += $temperature;
    }
    printf ("Average temperature for zipcode '%s' was %dF\n",
        $filter, (int) ($total_temp / $update_nbr));
1
------------------------
1
pub-sub的proxy模式:
1
图示是:

clip_image001_thumb[2]

Proxy节点的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?php
    /*
    *  Weather proxy device
    * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
    */
     
    $context = new ZMQContext();
     
    //  This is where the weather server sits
    $frontend = new ZMQSocket($context, ZMQ::SOCKET_SUB);
    $frontend->connect("tcp://192.168.55.210:5556");
     
    //  This is our public endpoint for subscribers
    $backend = new ZMQSocket($context, ZMQ::SOCKET_PUB);
    $backend->bind("tcp://10.1.1.0:8100");
     
    //  Subscribe on everything
    $frontend->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");
     
    //  Shunt messages out to our own subscribers
    while(true) {
        while(true) {
            //  Process all parts of the message
            $message = $frontend->recv();
            $more = $frontend->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
            $backend->send($message, $more ? ZMQ::SOCKOPT_SNDMORE : 0);
            if(!$more) {
                break; // Last message part
            }
        }
 
}
其实就是proxy同时是作为pub又作为sub的

    ----------------------

作者:yjf512(轩脉刃)

出处:http://www.cnblogs.com/yjf512/

本文版权归yjf512和cnBlog共有,欢迎转载,但未经作者同意必须保留此段声明


posted on 2016-09-02 17:30 思月行云 阅读(1606) 评论(0)  编辑 收藏 引用 所属分类: PHP

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理