Fork me on GitHub
随笔 - 215  文章 - 13  trackbacks - 0
<2017年2月>
2930311234
567891011
12131415161718
19202122232425
2627281234
567891011


专注即时通讯及网游服务端编程
------------------------------------
Openresty 官方模块
Openresty 标准模块(Opm)
Openresty 三方模块
------------------------------------
本博收藏大部分文章为转载,并在文章开头给出了原文出处,如有再转,敬请保留相关信息,这是大家对原创作者劳动成果的自觉尊重!!如为您带来不便,请于本博下留言,谢谢配合。

常用链接

留言簿(1)

随笔分类

随笔档案

相册

Awesome

Blog

Book

GitHub

Link

搜索

  •  

积分与排名

  • 积分 - 211162
  • 排名 - 118

最新评论

阅读排行榜

http://blog.csdn.net/shagoo/article/details/6396089
Socket(套接字)一直是网络层的底层核心内容,也是 TCP/IP 以及 UDP 底层协议的实现通道。随着互联网信息时代的爆炸式发展,当代服务器的性能问题面临越来越大的挑战,著名的 C10K 问题(http://www.kegel.com/c10k.html)也随之出现。幸亏通过大牛们的不懈努力,区别于传统的 select/poll 的 epoll/kqueue 方式出现了,目前 linux2.6 以上的内核都普遍支持,这是 Socket 领域一项巨大的进步,不仅解决了 C10K 问题,也渐渐成为了当代互联网的底层核心技术。libevent 库就是其中一个比较出彩的项目(现在非常多的开源项目都有用到,包括 Memcached),感兴趣的朋友可以研究一下。

由于网络上系统介绍这个部分的文章并不多,而涉及 PHP 的就更少了,所以石头君在这里希望通过《Socket深度探究4PHP》这个系列给对这个领域感兴趣的读者们一定的帮助,也希望大家能和我一起对这个问题进行更深入的探讨。首先,解释一下目前 Socket 领域比较易于混淆的概念有:阻塞/非阻塞、同步/异步、多路复用等。

1、阻塞/非阻塞:这两个概念是针对 IO 过程中进程的状态来说的,阻塞 IO 是指调用结果返回之前,当前线程会被挂起;相反,非阻塞指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。

2、同步/异步:这两个概念是针对调用如果返回结果来说的,所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回;相反,当一个异步过程调用发出后,调用者不能立刻得到结果,实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

3、多路复用(IO/Multiplexing):为了提高数据信息在网络通信线路中传输的效率,在一条物理通信线路上建立多条逻辑通信信道,同时传输若干路信号的技术就叫做多路复用技术。对于 Socket 来说,应该说能同时处理多个连接的模型都应该被称为多路复用,目前比较常用的有 select/poll/epoll/kqueue 这些 IO 模型(目前也有像 Apache 这种每个连接用单独的进程/线程来处理的 IO 模型,但是效率相对比较差,也很容易出问题,所以暂时不做介绍了)。在这些多路复用的模式中,异步阻塞/非阻塞模式的扩展性和性能最好。

感觉概念很抽象对吧,“一切答案在于现场”,下面让我们从三种经典的 PHP Socket IO 模型实例来对以上的概念再做一次分析:

1、使用 accept 阻塞的古老模型:属于同步阻塞 IO 模型,代码如下:

socket_server.php
<?php
/**
 * SocketServer Class
 * By James.Huang <shagoo#gmail.com>
*
*/
set_time_limit(0);
class SocketServer 
{
    private static $socket;
    function SocketServer($port
    {
        global $errno$errstr;
        if ($port < 1024) {
            die("Port must be a number which bigger than 1024/n");
        }
        
        $socket = stream_socket_server("tcp://0.0.0.0:{$port}", $errno$errstr);
        if (!$socketdie("$errstr ($errno)");
        
//        stream_set_timeout($socket, -1); // 保证服务端 socket 不会超时,似乎没用:)
        
        while ($conn = stream_socket_accept($socket, -1)) { // 这样设置不超时才油用
            static $id = 0;
            static $ct = 0;
            $ct_last = $ct;
            $ct_data = '';
            $buffer = '';
            $id++; // increase on each accept
            echo "Client $id come./n";
            while (!preg_match('//r?/n/', $buffer)) { // 没有读到结束符,继续读
//                if (feof($conn)) break; // 防止 popen 和 fread 的 bug 导致的死循环

                $buffer = fread($conn, 1024);
                echo 'R'; // 打印读的次数
                $ct += strlen($buffer);
                $ct_data .= preg_replace('//r?/n/', '', $buffer);
            }
            $ct_size = ($ct - $ct_last) * 8;
            echo "[$id] " . __METHOD__ . " > " . $ct_data . "/n";
            fwrite($conn, "Received $ct_size byte data./r/n");
            fclose($conn);
        }
        
        fclose($socket);
    }
}
new SocketServer(2000);
socket_client.php
<?php
/**
 * Socket Test Client
 * By James.Huang <shagoo#gmail.com>
*
*/
function debug ($msg)
{
//    echo $msg;
    error_log($msg, 3, '/tmp/socket.log');
}
if ($argv[1]) {
    
    $socket_client = stream_socket_client('tcp://0.0.0.0:2000', $errno$errstr, 30);
    
//    stream_set_blocking($socket_client, 0);
//    stream_set_timeout($socket_client, 0, 100000);

    
    if (!$socket_client) {
        die("$errstr ($errno)");
    } else {
        $msg = trim($argv[1]);
        for ($i = 0; $i < 10; $i++) {
            $res = fwrite($socket_client, "$msg($i)");
            usleep(100000);
            echo 'W'; // 打印写的次数
//            debug(fread($socket_client, 1024)); // 将产生死锁,因为 fread 在阻塞模式下未读到数据时将等待

        }
        fwrite($socket_client, "/r/n"); // 传输结束符
        debug(fread($socket_client, 1024));
        fclose($socket_client);
    }
}
else {
    
//    $phArr = array();
//    for ($i = 0; $i < 10; $i++) {
//        $phArr[$i] = popen("php ".__FILE__." '{$i}:test'", 'r');
//    }
//    foreach ($phArr as $ph) {
//        pclose($ph);
//    }

    
    for ($i = 0; $i < 10; $i++) {
        system("php ".__FILE__." '{$i}:test'");
    }
}
首先,解释一下以上的代码逻辑:客户端 socket_client.php 循环发送数据,最后发送结束符;服务端 socket_server.php 使用 accept 阻塞方式接收 socket 连接,然后循环接收数据,直到收到结束符,返回结果数据(接收到的字节数)。虽然逻辑很简单,但是其中有几种情况很值得分析一下:

A> 默认情况下,运行 php socket_client.php test,客户端打出 10 个 W,服务端打出若干个 R 后面是接收到的数据,/tmp/socket.log 记录下服务端返回的接收结果数据。这种情况很容易理解,不再赘述。然后,使用 telnet 命令同时打开多个客户端,你会发现服务器一个时间只处理一个客户端,其他需要在后面“排队”;这就是阻塞 IO 的特点,这种模式的弱点很明显,效率极低。

B> 只打开 socket_client.php 第 26 行的注释代码,再次运行 php socket_client.php test 客户端打出一个 W,服务端也打出一个 R,之后两个程序都卡住了。这是为什么呢,分析逻辑后你会发现,这是由于客户端在未发送结束符之前就向服务端要返回数据;而服务端由于未收到结束符,也在向客户端要结束符,造成死锁。而之所以只打出一个 W 和 R,是因为 fread 默认是阻塞的。要解决这个死锁,必须打开 socket_client.php 第 16 行的注释代码,给 socket 设置一个 0.1 秒的超时,再次运行你会发现隔 0.1 秒出现一个 W 和 R 之后正常结束,服务端返回的接收结果数据也正常记录了。可见 fread 缺省是阻塞的,我们在编程的时候要特别注意,如果没有设置超时,就很容易会出现死锁。

C> 只打开 15 行注释,运行 php socket_client.php test,结果基本和情况 A 相同,唯一不同的是 /tmp/socket.log 没有记录下返回数据。这里可以看出客户端运行在阻塞和非阻塞模式的区别,当然在客户端不在乎接受结果的情况下,可以使用非阻塞模式来获得最大效率。

D> 运行 php socket_client.php 是连续运行 10 次上面的逻辑,这个没什么问题;但是很奇怪的是如果你使用 35 - 41 行的代码,用 popen 同时开启 10 个进程来运行,就会造成服务器端的死循环,十分怪异!后来经调查发现只要是用 popen 打开的进程创建的连接会导致 fread 或者 socket_read 出错直接返回空字串,从而导致死循环,查阅 PHP 源代码后发现 PHP 的 popen 和 fread 函数已经完全不是 C 原生的了,里面都插入了大量的 php_stream_* 实现逻辑,初步估计是其中的某个 bug 导致的 Socket 连接中断所导致的,解决方法就是打开 socket_server.php 中 31 行的代码,如果连接中断则跳出循环,但是这样一来就会有很多数据丢失了,这个问题需要特别注意!

2、使用 select/poll 的同步模型:属于同步非阻塞 IO 模型,代码如下:

select_server.php
<?php
/**
 * SelectSocketServer Class
 * By James.Huang <shagoo#gmail.com>
*
*/
set_time_limit(0);
class SelectSocketServer 
{
    private static $socket;
    private static $timeout = 60;
    private static $maxconns = 1024;
    private static $connections = array();
    function SelectSocketServer($port
    {
        global $errno$errstr;
        if ($port < 1024) {
            die("Port must be a number which bigger than 1024/n");
        }
        
        $socket = socket_create_listen($port);
        if (!$socketdie("Listen $port failed");
        
        socket_set_nonblock($socket); // 非阻塞
        
        while (true
        {
            $readfds = array_merge(self::$connectionsarray($socket));
            $writefds = array();
            
            // 选择一个连接,获取读、写连接通道
            if (socket_select($readfds$writefds$e = null$t = self::$timeout)) 
            {
                // 如果是当前服务端的监听连接
                if (in_array($socket$readfds)) {
                    // 接受客户端连接
                    $newconn = socket_accept($socket);
                    $i = (int) $newconn;
                    $reject = '';
                    if (count(self::$connections) >= self::$maxconns) {
                        $reject = "Server full, Try again later./n";
                    }
                    // 将当前客户端连接放入 socket_select 选择
                    self::$connections[$i] = $newconn;
                    // 输入的连接资源缓存容器
                    $writefds[$i] = $newconn;
                    // 连接不正常
                    if ($reject) {
                        socket_write($writefds[$i], $reject);
                        unset($writefds[$i]);
                        self::close($i);
                    } else {
                        echo "Client $i come./n";
                    }
                    // remove the listening socket from the clients-with-data array
                    $key = array_search($socket$readfds);
                    unset($readfds[$key]);
                }
                
                // 轮循读通道
                foreach ($readfds as $rfd) {
                    // 客户端连接
                    $i = (int) $rfd;
                    // 从通道读取
                    $line = @socket_read($rfd, 2048, PHP_NORMAL_READ);
                    if ($line === false) {
                        // 读取不到内容,结束连接          
                        echo "Connection closed on socket $i./n";
                        self::close($i);
                        continue;
                    }
                    $tmp = substr($line, -1);
                    if ($tmp != "/r" && $tmp != "/n") {
                        // 等待更多数据
                        continue;
                    }
                    // 处理逻辑
                    $line = trim($line);
                    if ($line == "quit") {
                        echo "Client $i quit./n";
                        self::close($i);
                        break;
                    }
                    if ($line) {
                        echo "Client $i >>" . $line . "/n";
                    }
                }
                
                // 轮循写通道
                foreach ($writefds as $wfd) {
                    $i = (int) $wfd;
                    $w = socket_write($wfd, "Welcome Client $i!/n");
                }
            }
        }
    }
    
    function close ($i
    {
        socket_shutdown(self::$connections[$i]);
        socket_close(self::$connections[$i]);
        unset(self::$connections[$i]);
    }
}
new SelectSocketServer(2000);
select_client.php
<?php
/**
 * SelectSocket Test Client
 * By James.Huang <shagoo#gmail.com>
*
*/
function debug ($msg)
{
//    echo $msg;
    error_log($msg, 3, '/tmp/socket.log');
}
if ($argv[1]) {
    
    $socket_client = stream_socket_client('tcp://0.0.0.0:2000', $errno$errstr, 30);
    
//    stream_set_timeout($socket_client, 0, 100000);
    
    if (!$socket_client) {
        die("$errstr ($errno)");
    } else {
        $msg = trim($argv[1]);
        for ($i = 0; $i < 10; $i++) {
            $res = fwrite($socket_client, "$msg($i)/n");
            usleep(100000);
//            debug(fread($socket_client, 1024)); // 将产生死锁,因为 fread 在阻塞模式下未读到数据时将等待
        }
        fwrite($socket_client, "quit/n"); // add end token
        debug(fread($socket_client, 1024));
        fclose($socket_client);
    }
}
else {
    
    $phArr = array();
    for ($i = 0; $i < 10; $i++) {
        $phArr[$i] = popen("php ".__FILE__." '{$i}:test'", 'r');
    }
    foreach ($phArr as $ph) {
        pclose($ph);
    }
    
//    for ($i = 0; $i < 10; $i++) {
//        system("php ".__FILE__." '{$i}:test'");
//    }

}
以上代码的逻辑也很简单,select_server.php 实现了一个类似聊天室的功能,你可以使用 telnet 工具登录上去,和其他用户文字聊天,也可以键入“quit”命令离开;而 select_client.php 则模拟了一个登录用户连续发 10 条信息,然后退出。这里也分析两个问题:

A> 这里如果我们执行 php select_client.php 程序将会同时打开 10 个连接,同时进行模拟登录用户操作;观察服务端打印的数据你会发现服务端确实是在同时处理这些连接,这就是多路复用实现的非阻塞 IO 模型,当然这个模型并没有真正的实现异步,因为最终服务端程序还是要去通道里面读取数据,得到结果后同步返回给客户端。如果这次你也使用 telnet 命令同时打开多个客户端,你会发现服务端可以同时处理这些连接,这就是非阻塞 IO,当然比古老的阻塞 IO 效率要高多了,但是这种模式还是有局限的,继续看下去你就会发现了~

B> 我在 select_server.php 中设置了几个参数,大家可以调整试试:
$timeout :表示的是 select 的超时时间,这个一般来说不要太短,否则会导致 CPU 负载过高。
$maxconns :表示的是最大连接数,客户端超过这个数的话,服务器会拒绝接收。这里要提到的一点是,由于 select 是通过句柄来读写的,所以会受到系统默认参数 __FD_SETSIZE 的限制,一般默认值为 1024,修改的话需要重新编译内核;另外通过测试发现 select 模式的性能会随着连接数的增大而线性便差(详情见《Socket深度探究4PHP(二)》),这也就是 select 模式最大的问题所在,所以如果是超高并发服务器建议使用下一种模式。

3、使用 epoll/kqueue 的异步模型:属于异步阻塞/非阻塞 IO 模型,代码如下:

epoll_server.php
<?php
/**
 * EpollSocketServer Class (use libevent)
 * By James.Huang <shagoo#gmail.com>
 * 
 * Defined constants:
 * 
 * EV_TIMEOUT (integer)
 * EV_READ (integer)
 * EV_WRITE (integer)
 * EV_SIGNAL (integer)
 * EV_PERSIST (integer)
 * EVLOOP_NONBLOCK (integer)
 * EVLOOP_ONCE (integer)
*
*/
set_time_limit(0);
class EpollSocketServer
{
    private static $socket;
    private static $connections;
    private static $buffers;
    
    function EpollSocketServer ($port)
    {
        global $errno$errstr;
        
        if (!extension_loaded('libevent')) {
            die("Please install libevent extension firstly/n");
        }
        
        if ($port < 1024) {
            die("Port must be a number which bigger than 1024/n");
        }
        
        $socket_server = stream_socket_server("tcp://0.0.0.0:{$port}", $errno$errstr);
        if (!$socket_serverdie("$errstr ($errno)");
        
        stream_set_blocking($socket_server, 0); // 非阻塞
        
        $base = event_base_new();
        $event = event_new();
        event_set($event$socket_server, EV_READ | EV_PERSIST, array(__CLASS__, 'ev_accept'), $base);
        event_base_set($event$base);
        event_add($event);
        event_base_loop($base);
        
        self::$connections = array();
        self::$buffers = array();
    }
    
    function ev_accept($socket$flag$base
    {
        static $id = 0;
    
        $connection = stream_socket_accept($socket);
        stream_set_blocking($connection, 0);
    
        $id++; // increase on each accept
    
        $buffer = event_buffer_new($connectionarray(__CLASS__, 'ev_read'), array(__CLASS__, 'ev_write'), array(__CLASS__, 'ev_error'), $id);
        event_buffer_base_set($buffer$base);
        event_buffer_timeout_set($buffer, 30, 30);
        event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
        event_buffer_priority_set($buffer, 10);
        event_buffer_enable($buffer, EV_READ | EV_PERSIST);
    
        // we need to save both buffer and connection outside
        self::$connections[$id] = $connection;
        self::$buffers[$id] = $buffer;
    }
    
    function ev_error($buffer$error$id
    {
        event_buffer_disable(self::$buffers[$id], EV_READ | EV_WRITE);
        event_buffer_free(self::$buffers[$id]);
        fclose(self::$connections[$id]);
        unset(self::$buffers[$id], self::$connections[$id]);
    }
    
    function ev_read($buffer$id
    {
        static $ct = 0;
        $ct_last = $ct;
        $ct_data = '';
        while ($read = event_buffer_read($buffer, 1024)) {
            $ct += strlen($read);
            $ct_data .= $read;
        }
        $ct_size = ($ct - $ct_last) * 8;
        echo "[$id] " . __METHOD__ . " > " . $ct_data . "/n";
        event_buffer_write($buffer, "Received $ct_size byte data./r/n");
    }
    
    function ev_write($buffer$id
    {
        echo "[$id] " . __METHOD__ . "/n";
    }
}
new EpollSocketServer(2000);
epoll_client.php
<?php
/**
 * EpollSocket Test Client
 * By James.Huang <shagoo#gmail.com>
*
*/
function debug ($msg)
{
//    echo $msg;
    error_log($msg, 3, '/tmp/socket.log');
}
if ($argv[1]) {
    $socket_client = stream_socket_client('tcp://0.0.0.0:2000', $errno$errstr, 30);
//    stream_set_blocking($socket_client, 0);
    if (!$socket_client) {
        die("$errstr ($errno)");
    } else {
        $msg = trim($argv[1]);
        for ($i = 0; $i < 10; $i++) {
            $res = fwrite($socket_client, "$msg($i)");
            usleep(100000);
            debug(fread($socket_client, 1024));
        }
        fclose($socket_client);
    }
}
else {
    
    $phArr = array();
    for ($i = 0; $i < 10; $i++) {
        $phArr[$i] = popen("php ".__FILE__." '{$i}:test'", 'r');
    }
    foreach ($phArr as $ph) {
        pclose($ph);
    }
    
//    for ($i = 0; $i < 10; $i++) {
//        system("php ".__FILE__." '{$i}:test'");
//    }

}
先说一下,以上的例子是基于 PHP 的 libevent 扩展实现的,需要运行的话要先安装此扩展,参考:http://pecl.php.net/package/libevent。

这个例子做的事情和前面介绍的第一个模型一样,epoll_server.php 实现的服务端也是接受客户端数据,然后返回结果(接收到的字节数)。但是,当你运行 php epoll_client.php 的时候你会发现服务端打印出来的结果和 accept 阻塞模型就大不一样了,当然运行效率也有极大的提升,这是为什么呢?接下来就介绍一下 epoll/kqueue 模型:在介绍 select 模式的时候我们提到了这种模式的局限,而 epoll 就是为了解决 poll 的这两个缺陷而生的。首先,epoll 模式基本没有限制(参考 cat /proc/sys/fs/file-max 默认就达到 300K,很令人兴奋吧,其实这也就是所谓基于 epoll 的 Erlang 服务端可以同时处理这么多并发连接的根本原因,不过现在 PHP 理论上也可以做到了,呵呵);另外,epoll 模式的性能也不会像 select 模式那样随着连接数的增大而变差,测试发现性能还是很稳定的(下篇会有详细介绍)。

epoll 工作有两种模式 LT(level triggered) 和 ET(edge-triggered),前者是缺省模式,同时支持阻塞和非阻塞 IO 模式,虽然性能比后者差点,但是比较稳定,一般来说在实际运用中,我们都是用这种模式(ET 模式和 WinSock 都是纯异步非阻塞模型)。而另外一点要说的是 libevent 是在编译阶段选择系统的 I/O demultiplex 机制的,不支持在运行阶段根据配置再次选择,所以我们在这里也就不细讨论 libevent 的实现的细节了,如果朋友有兴趣进一步了解的话,请参考:http://monkey.org/~provos/libevent/。

到这里,第一部分的内容结束了,相信大家已经了解了 Socket 编程的几个重点概念和一些实战技巧,在下一篇《Socket深度探究4PHP(二) 》我将会对 select/poll/epoll/kqueue 几种模式做一下深入的介绍和对比,另外也会涉及到两种重要的 I/O 多路复用模式:Reactor 和 Proactor 模式。

To be continued ...

http://blog.csdn.net/shagoo/article/details/6531950

上一篇《Socket深度探究4PHP(一)》中,大家应该对 poll/select/epoll/kqueue 这几个 IO 模型有了一定的了解,为了让大家更深入的理解 Socket 的技术内幕,在这个篇幅,我会对这几种模式做一个比较详细的分析和对比;另外,大家可能也同说过 AIO 的概念,这里也会做一个简单的介绍;最后我们会对两种主流异步模式 Reactor 和 Proactor 模式进行对比和讨论。

首先,然我们逐个介绍一下 2.6 内核(2.6.21.1)中的 poll/select/epoll/kqueue 这几个 IO 模型。

> POLL

先说说 poll,poll 和 select 为大部分 Unix/Linux 程序员所熟悉,这俩个东西原理类似,性能上也不存在明显差异,但 select 对所监控的文件描述符数量有限制,所以这里选用 poll 做说明。poll 是一个系统调用,其内核入口函数为 sys_poll,sys_poll 几乎不做任何处理直接调用 do_sys_poll,do_sys_poll 的执行过程可以分为三个部分:

1、将用户传入的 pollfd 数组拷贝到内核空间,因为拷贝操作和数组长度相关,时间上这是一个 O(n) 操作,这一步的代码在 do_sys_poll 中包括从函数开始到调用 do_poll 前的部分。

2、查询每个文件描述符对应设备的状态,如果该设备尚未就绪,则在该设备的等待队列中加入一项并继续查询下一设备的状态。查询完所有设备后如果没有一个设备就绪,这时则需要挂起当前进程等待,直到设备就绪或者超时,挂起操作是通过调用 schedule_timeout 执行的。设备就绪后进程被通知继续运行,这时再次遍历所有设备,以查找就绪设备。这一步因为两次遍历所有设备,时间复杂度也是 O(n),这里面不包括等待时间。相关代码在 do_poll 函数中。

3、将获得的数据传送到用户空间并执行释放内存和剥离等待队列等善后工作,向用户空间拷贝数据与剥离等待队列等操作的的时间复杂度同样是 O(n),具体代码包括 do_sys_poll 函数中调用 do_poll 后到结束的部分。

但是,即便通过 select() 或者 poll() 函数复用事件通知具有突出的优点,不过其他具有类似功能的函数实现也可以达到同样的性能。然而,这些实现在跨平台方面没有实现标准化。你必须在使用这些特定函数实现同丧失可移植性之间进行权衡。我们现在就讨论一下两个替代方法:Solaris 系统下的 /dev/poll 和 FreeBSD 系统下的 kqueue:

1、Solaris 系统下的 /dev/poll:在Solaris 7系统上,Sun引入了/dev/poll设备。在使用 /dev/poll的时候,你首先要打开/dev/poll作为一个普通文件。然后构造pollfd结构,方式同普通的poll()函数调用一样。这些 pollfd结构随后写入到打开的 /dev/poll 文件描述符。在打开句柄的生存周期内, /dev/poll会根据pollfd结构返回事件(注意,pollfd结构内的事件字段中的特定POLLREMOVE将从/dev/poll的列表中删除对应的fd)。通过调用特定的ioctl (DP_POLL) 和dvpoll,程序就可以从/dev/poll获得需要的信息。在使用dvpoll结构的情况下,发生的事件就可以被检测到了。

2、FreeBSD 系统下的 kqueue:在FreeBSD 4.1中推出。FreeBSD的kqueue API设计为比其他对应函数提供更为广泛的事件通知能力。kqueue API提供了一套通用过滤器,可以模仿poll()语法(EVFILT_READ和EVFILT_WRITE)。不过,它还实现了文件系统变化(EVFILT_VNODE)、进程状态变更(EVFILT_PROC)和信号交付(EVFILT_SIGNAL)的有关通知。

> EPOLL


接下来分析 epoll,与 poll/select 不同,epoll 不再是一个单独的系统调用,而是由 epoll_create/epoll_ctl/epoll_wait 三个系统调用组成,后面将会看到这样做的好处。先来看 sys_epoll_create(epoll_create对应的内核函数),这个函数主要是做一些准备工作,比如创建数据结构,初始化数据并最终返回一个文件描述符(表示新创建的虚拟 epoll 文件),这个操作可以认为是一个固定时间的操作。epoll 是做为一个虚拟文件系统来实现的,这样做至少有以下两个好处:

1、可以在内核里维护一些信息,这些信息在多次 epoll_wait 间是保持的,比如所有受监控的文件描述符。

2、epoll 本身也可以被 poll/epoll。

具体 epoll 的虚拟文件系统的实现和性能分析无关,不再赘述。

在 sys_epoll_create 中还能看到一个细节,就是 epoll_create 的参数 size 在现阶段是没有意义的,只要大于零就行。

接着是 sys_epoll_ctl(epoll_ctl对应的内核函数),需要明确的是每次调用 sys_epoll_ctl 只处理一个文件描述符,这里主要描述当 op 为 EPOLL_CTL_ADD 时的执行过程,sys_epoll_ctl 做一些安全性检查后进入 ep_insert,ep_insert 里将 ep_poll_callback 做为回掉函数加入设备的等待队列(假定这时设备尚未就绪),由于每次 poll_ctl 只操作一个文件描述符,因此也可以认为这是一个 O(1) 操作。ep_poll_callback 函数很关键,它在所等待的设备就绪后被系统回掉,执行两个操作:

1、将就绪设备加入就绪队列,这一步避免了像 poll 那样在设备就绪后再次轮询所有设备找就绪者,降低了时间复杂度,由 O(n) 到 O(1)。
 
2、唤醒虚拟的 epoll 文件。

最后是 sys_epoll_wait,这里实际执行操作的是 ep_poll 函数。该函数等待将进程自身插入虚拟 epoll 文件的等待队列,直到被唤醒(见上面 ep_poll_callback 函数描述),最后执行 ep_events_transfer 将结果拷贝到用户空间。由于只拷贝就绪设备信息,所以这里的拷贝是一个 O(1) 操作。

还有一个让人关心的问题就是 epoll 对 EPOLLET 的处理,即边沿触发的处理,粗略看代码就是把一部分水平触发模式下内核做的工作交给用户来处理,直觉上不会对性能有太大影响,感兴趣的朋友欢迎讨论。

> POLL/EPOLL 对比:

表面上 poll 的过程可以看作是由一次 epoll_create,若干次 epoll_ctl,一次 epoll_wait,一次 close 等系统调用构成,实际上 epoll 将 poll 分成若干部分实现的原因正是因为服务器软件中使用 poll 的特点(比如Web服务器):

1、需要同时 poll 大量文件描述符;

2、每次 poll 完成后就绪的文件描述符只占所有被 poll 的描述符的很少一部分。

3、前后多次 poll 调用对文件描述符数组(ufds)的修改只是很小;

传统的 poll 函数相当于每次调用都重起炉灶,从用户空间完整读入 ufds,完成后再次完全拷贝到用户空间,另外每次 poll 都需要对所有设备做至少做一次加入和删除等待队列操作,这些都是低效的原因。

epoll 将以上情况都细化考虑,不需要每次都完整读入输出 ufds,只需使用 epoll_ctl 调整其中一小部分,不需要每次 epoll_wait 都执行一次加入删除等待队列操作,另外改进后的机制使的不必在某个设备就绪后搜索整个设备数组进行查找,这些都能提高效率。另外最明显的一点,从用户的使用来说,使用 epoll 不必每次都轮询所有返回结果已找出其中的就绪部分,O(n) 变 O(1),对性能也提高不少。

此外这里还发现一点,是不是将 epoll_ctl 改成一次可以处理多个 fd(像 semctl 那样)会提高些许性能呢?特别是在假设系统调用比较耗时的基础上。不过关于系统调用的耗时问题还会在以后分析。

> POLL/EPOLL 测试数据对比:


测试的环境:我写了三段代码来分别模拟服务器,活动的客户端,僵死的客户端,服务器运行于一个自编译的标准 2.6.11 内核系统上,硬件为 PIII933,两个客户端各自运行在另外的 PC 上,这两台PC比服务器的硬件性能要好,主要是保证能轻易让服务器满载,三台机器间使用一个100M交换机连接。

服务器接受并poll所有连接,如果有request到达则回复一个response,然后继续poll。

活动的客户端(Active Client)模拟若干并发的活动连接,这些连接不间断的发送请求接受回复。

僵死的客户端(zombie)模拟一些只连接但不发送请求的客户端,其目的只是占用服务器的poll描述符资源。

测试过程:保持10个并发活动连接,不断的调整僵并发连接数,记录在不同比例下使用 poll 与 epoll 的性能差别。僵死并发连接数根据比例分别是:0,10,20,40,80,160,320,640,1280,2560,5120,10240。

下图中横轴表示僵死并发连接与活动并发连接之比,纵轴表示完成 40000 次请求回复所花费的时间,以秒为单位。红色线条表示 poll 数据,绿色表示 epoll 数据。可以看出,poll 在所监控的文件描述符数量增加时,其耗时呈线性增长,而 epoll 则维持了一个平稳的状态,几乎不受描述符个数影响。



但是要注意的是在监控的所有客户端都是活动时,poll 的效率会略高于 epoll(主要在原点附近,即僵死并发连接为0时,图上不易看出来),究竟 epoll 实现比 poll 复杂,监控少量描述符并非它的长处。

> epoll 的优点综述

1、支持一个进程打开大数目的socket描述符(FD):select 最不能忍受的是一个进程所打开的 FD 是有一定限制的,由 FD_SETSIZE 设置,在 Linux 中,这个值是 1024。对于那些需要支持的上万连接数目的网络服务器来说显然太少了。这时候你一是可以选择修改这个宏然后重新编译内核,不过资料也同时指出这样会带来网络效率的下降,二是可以选择多进程的解决方案(传统的 Apache 方案),不过虽然 linux 上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。不过 epoll 则没有这个限制,它所支持的 FD 上限是最大可以打开文件的数目,这个数字一般远大于 1024,举个例子,在 1GB 内存的机器上大约是 10 万左右,具体数目可以 cat /proc/sys/fs/file-max 察看,一般来说这个数目和系统内存关系很大。

2、IO 效率不随 FD 数目增加而线性下降:传统的 select/poll 另一个致命弱点就是当你拥有一个很大的 socket 集合,不过由于网络延时,任一时间只有部分的 socket 是"活跃"的,但是 select/poll 每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是 epoll 不存在这个问题,它只会对"活跃"的 socket 进行操作---这是因为在内核实现中 epoll 是根据每个 fd 上面的 callback 函数实现的。那么,只有"活跃"的 socket 才会主动的去调用 callback 函数,其他 idle 状态 socket 则不会,在这点上,epoll 实现了一个"伪"AIO,因为这时候推动力在 os 内核。在一些 benchmark 中,如果所有的 socket 基本上都是活跃的 --- 比如一个高速LAN环境,epoll 并不比 select/poll 有什么效率,相反,如果过多使用 epoll_ctl,效率相比还有稍微的下降。但是一旦使用 idle connections 模拟 WAN 环境,epoll 的效率就远在 select/poll 之上了。

3、使用 mmap 加速内核与用户空间的消息传递:这点实际上涉及到 epoll 的具体实现了。无论是 select,poll 还是 epoll 都需要内核把 FD 消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll 是通过内核于用户空间 mmap 同一块内存实现的。而如果你想我一样从 2.5 内核就关注 epoll 的话,一定不会忘记手工 mmap 这一步的。

4、内核微调:这一点其实不算 epoll 的优点了,而是整个 linux 平台的优点。也许你可以怀疑 linux 平台,但是你无法回避 linux 平台赋予你微调内核的能力。比如,内核 TCP/IP 协议栈使用内存池管理 sk_buff 结构,那么可以在运行时期动态调整这个内存 pool(skb_head_pool) 的大小 --- 通过 echo XXXX > /proc/sys/net/core/hot_list_length 完成。再比如 listen 函数的第 2 个参数(TCP 完成 3 次握手的数据包队列长度),也可以根据你平台内存大小动态调整。更甚至在一个数据包面数目巨大但同时每个数据包本身大小却很小的特殊系统上尝试最新的 NAPI 网卡驱动架构。

> AIO 和 Epoll


epoll 和 aio(这里的aio是指linux 2.6内核后提供的aio api)的区别:

1、aio 是异步非阻塞的。其实是aio是用线程池实现了异步IO。

2、epoll 在这方面的定义上有点复杂,首先 epoll 的 fd 集里面每一个 fd 都是非阻塞的,但是 epoll(包括 select/poll)在调用时阻塞等待 fd 可用,然后 epoll 只是一个异步通知机制,只是在 fd 可用时通知你,并没有做任何 IO 操作,所以不是传统的异步。

在这方面,Windows 无疑是前行者,当然 Boost C++ 库已经实现了 linux 下 aio 的机制,有兴趣的朋友可以参考:http://stlchina.huhoo.net/twiki/bin/view.pl/Main/WebHome

> Reactor 和 Proactor

一般地,I/O多路复用机制都依赖于一个事件多路分离器(Event Demultiplexer)。分离器对象可将来自事件源的I/O事件分离出来,并分发到对应的read/write事件处理器(Event Handler)。开发人员预先注册需要处理的事件及其事件处理器(或回调函数);事件分离器负责将请求事件传递给事件处理器。两个与事件分离器有关的模式是Reactor和Proactor。Reactor模式采用同步IO,而Proactor采用异步IO。

在Reactor中,事件分离器负责等待文件描述符或socket为读写操作准备就绪,然后将就绪事件传递给对应的处理器,最后由处理器负责完成实际的读写工作。而在Proactor模式中,处理器--或者兼任处理器的事件分离器,只负责发起异步读写操作。IO操作本身由操作系统来完成。传递给操作系统的参数需要包括用户定义的数据缓冲区地址和数据大小,操作系统才能从中得到写出操作所需数据,或写入从socket读到的数据。事件分离器捕获IO操作完成事件,然后将事件传递给对应处理器。比如,在windows上,处理器发起一个异步IO操作,再由事件分离器等待IOCompletion事件。典型的异步模式实现,都建立在操作系统支持异步API的基础之上,我们将这种实现称为“系统级”异步或“真”异步,因为应用程序完全依赖操作系统执行真正的IO工作。

举个例子,将有助于理解Reactor与Proactor二者的差异,以读操作为例(类操作类似)。

在Reactor中实现读:
 - 注册读就绪事件和相应的事件处理器
 - 事件分离器等待事件
 - 事件到来,激活分离器,分离器调用事件对应的处理器。
 - 事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制权。

与如下Proactor(真异步)中的读过程比较:
 - 处理器发起异步读操作(注意:操作系统必须支持异步IO)。在这种情况下,处理器无视IO就绪事件,它关注的是完成事件。
 - 事件分离器等待操作完成事件
 - 在分离器等待过程中,操作系统利用并行的内核线程执行实际的读操作,并将结果数据存入用户自定义缓冲区,最后通知事件分离器读操作完成。
 - 事件分离器呼唤处理器。
 - 事件处理器处理用户自定义缓冲区中的数据,然后启动一个新的异步操作,并将控制权返回事件分离器。

对于不提供异步 IO API 的操作系统来说,这种办法可以隐藏 Socket API 的交互细节,从而对外暴露一个完整的异步接口。借此,我们就可以进一步构建完全可移植的,平台无关的,有通用对外接口的解决方案。上述方案已经由Terabit P/L公司实现为 TProactor (ACE compatible Proactor) :http://www.terabit.com.au/solutions.php。正是因为 linux 对 aio 支持的不完整,所以 ACE_Proactor 框架在 linux 上的表现很差,大部分在 windows 上执行正常的代码,在 linux 则运行异常,甚至不能编译通过。这个问题一直困扰着很大多数 ACE 的用户,现在好了,有一个 TProactor 帮助解决了在 Linux 不完整支持 AIO 的条件下,正常使用(至少是看起来正常)ACE_Proactor。TProactor 有两个版本:C++ 和 Java 的。C++ 版本采用 ACE 跨平台底层类开发,为所有平台提供了通用统一的主动式异步接口。Boost.Asio 库,也是采取了类似的这种方案来实现统一的 IO 异步接口。

以下是一张 TProactor 架构设计图,有兴趣的朋友可以看看:



到这里,第二部分的内容结束了,相信大家对 Socket 的底层技术原理有了一个更深层次的理解,在下一篇《Socket深度探究4PHP(三)》我将会深入 PHP 源代码,探究一下 PHP 在 Socket 这部分的一些技术内幕,然后介绍一下目前在这个领域比较活跃的项目(node.js)。

To be continued ...

http://blog.csdn.net/shagoo/article/details/6647961

看过前两篇文章《Socket深度探究4PHP(一)》和《Socket深度探究4PHP(二)》,大家应该对目前 Socket 技术的底层有了一定的了解。本文我们会对 PHP-5.3.6 的源码中的 Socket 模块进行一定的分析,然后再简单介绍一下目前比较热门的一些相关技术,比如 Node.js 等。

自 PHP4 之后,越来越多的模块都被作为扩展提取出来(可单独编译),都在 PHP 源码的 ext 目录下面,因此我们我需要先进入 ext/sockets/ 目录,做过 PHP 扩展的同学应该都很熟悉下面的一些文件了,这次我们主要分析的是 php_sockets.h 和 sockets.c 这两个 C 源码文件。

ext/sockets/php_sockets.h

这个头文件很简单,我们主要看一下下面列出的几个重点:

32 行:
  1. #ifdef PHP_WIN32  
  2. #include <winsock.h>  
  3. #else  
  4. #if HAVE_SYS_SOCKET_H  
  5. #include <sys/socket.h>  
  6. #endif  
  7. #endif  

以上就是 PHP 对于不同环境 Socket 底层调用的定义了,我们可以看到不管是 Unix 还是 Windows 环境,PHP均调用的是系统标准的 BSD Socket 库。然后我们看下面这个重要的结构体定义:

82 行:
  1. typedef struct {  
  2.     PHP_SOCKET bsd_socket;  
  3.     int        type;  
  4.     int        error;  
  5.     int        blocking;  
  6. } php_socket;  

这个就是 php socket 的存储结构了,此结构体在以下的代码阅读中将会大量出现,里面的几个字段很容易理解:bsd_socket 就是标准的 socket 类型,type 表示 socket 类型(PF_UNIX/AF_UNIX),error 是错误代码,blocking 则表示是否阻塞。

ext/sockets/sockets.c

这个文件比较长,为了直接切入重点,我们会按照《Socket 深度探索 4 PHP (一) 》中 select_server.php 部分代码来按顺序分析一下在最经典的 select 模式中我们用到的主要方法:

>socket_create_listen

859 行:PHP_FUNCTION(socket_create_listen)
这个函数很简单,初始化 php_sock 并获取 socket 需要监听的端口,然后传入下面的 php_open_listen_sock 函数进行加工,最后调用 ZEND_REGISTER_RESOURCE 宏返回 php_sock。

347行:static int php_open_listen_sock(php_socket **php_sock, int port, int backlog TSRMLS_DC)
此函数基本上就是 socket 的标准初始化过程:socket(...) -> bind(...) -> listen(...)(详见 368 行至 391 行)。
  1. sock->bsd_socket = socket(PF_INET, SOCK_STREAM, 0);  
  2. sock->blocking = 1;  
  3. ...  
  4. sock->type = PF_INET;  
  5. ...  
  6. if (bind(sock->bsd_socket, (struct sockaddr *)&la, sizeof(la)) != 0) {  
  7. ...  
  8. }  
  9. if (listen(sock->bsd_socket, backlog) != 0) {  
  10. ...  
  11. }  

>socket_set_nonblock

906 行:PHP_FUNCTION(socket_set_nonblock)
这个函数也很简单,从 ZEND_FETCH_RESOURCE 取出 runtime 中的 php_sock 然后调用 php_set_sock_blocking 函数来设置 sockfd 的阻塞或者非阻塞(此函数可以参考 main/network.c 第 1069 行,我们可以看到 PHP 是使用 fcntl 函数来设置的)。

>socket_select

785 行:PHP_FUNCTION(socket_select)
也是标准的 select 函数调用,过程如下:FD_ZERO(...) -> php_sock_array_to_fd_set(...) -> select(...) -> php_sock_array_from_fd_set(...),可能比较特殊的就是 php_sock_array_from_fd_set() 和 php_sock_array_from_fd_set() 两个函数,这是由于我们要先把 PHP 的 fd 数组转换成原生 fd 集合,才能调用原生的 select 函数,而最后系统还把 fd 集合重新转回到 PHP 的 fd 数组(具体代码参考 799 行至 851 行)。

>socket_accept

881 行:PHP_FUNCTION(socket_accept)
此函数基本上也就是 socket 原生 accept 函数的包装,具体代码可参考 397 行:php_accept_connect 函数中的逻辑,最后调用 ZEND_REGISTER_RESOURCE 宏返回 new_sock,若失败程序会清理使用的 out_socket 资源。

>socket_write

986 行:PHP_FUNCTION(socket_write)
按照以上的思路看这个函数也非常简单,详见 986 行,唯一值得注意的是对于不同操作系统调用的函数有点不同,代码(见 1004 行)如下:
  1. #ifndef PHP_WIN32  
  2.     retval = write(php_sock->bsd_socket, str, MIN(length, str_len));  
  3. #else  
  4.     retval = send(php_sock->bsd_socket, str, min(length, str_len), 0);  
  5. #endif  

>socket_read

1021 行:PHP_FUNCTION(socket_read)
此函数是用于接受 socket 的数据,调用的原生函数是 recv(),不过这里需要注意的是 PHP 为我们提供两种获取方式:
1、PHP_NORMAL_READ
按行读取,具体代码见 419 行:php_read 函数的逻辑,我们注意到此函数在非阻塞模式下会立即返回,否则将会读取直至遇到 \n 或者 \r 字符。
2、PHP_BINARY_READ
代码见 1045 行:retval = recv(php_sock->bsd_socket, tmpbuf, length, 0); 相当原生和“环保”。
最后,如果返回值为 -1 则会进行一些错误记录和系统清理工作。

>socket_close

970 行:PHP_FUNCTION(socket_close)
清理 socket 运行时所用的资源。

>socket_shutdown

1968 行:PHP_FUNCTION(socket_shutdown)
调用原生 shutdown 函数来关闭 socket。

分析下来,PHP 的 socket 模块中绝大部分的代码还是使用的是系统标准的原生 socket 库,其中唯一有可能造成性能隐患的就是 select 中 PHP 的 fd 数组与原生 fd 集合转换,至于其他的一些简单的数据拷贝基本对效率不会有什么影响。总的来说,PHP 的 socket 模块应该效率还是比较高的,但是在使用的时候还是需要注意到一些资源的及时释放,因为毕竟是 Daemon 程序,需要不断运行的,而且 PHP 的数据结构是很占内存(是原生 C 的 4 倍左右)的。

node.js

最后,我们看看现在很流行的 Node.js(http://nodejs.org/),它采用了 JavaScript 的语言引擎,语法非常的简洁,对闭包的完美支持让它特别适合做异步 IO 的代码编写,下面是一个最简单的 HTTP Server,只用仅仅六行代码:
[javascript] view plain copy
  1. var http = require('http');  
  2. http.createServer(function (req, res) {  
  3.   res.writeHead(200, {'Content-Type': 'text/plain'});  
  4.   res.end('Hello World\n');  
  5. }).listen(8000, "127.0.0.1");  
  6. console.log('Server running at http://127.0.0.1:8000/');  

运行起来感受一下,有没有惊艳的感觉啊?事实上用它来写一些简单的服务确实很不错,有兴趣的朋友可以多研究研究(中文社区:http://cnodejs.org/),它有 8000 行 C++ 代码,2000 行 javascript 代码,使用 Google 的 V8 引擎(和 Mongodb 一样),相当的很小巧精悍。下面是我在使用过程总结出中几个要点,大家可以参考:

1、使用 V8 引擎(和 Mongodb 一样),内置 JSON,代码简洁,使用方便。
2、使用单线程非阻塞 I/O 中的 select 方式,比较稳定(但是对于超高并发有点力不从心)。
3、一些第三方应用接口不是很稳定,比如 Mongodb 的接口,并发 200 出现卡死现象,Mysql 接口也比 fast-cgi 差很多。
4、注意使用 try{...}catch{...} 来捕获错误;使用 process.on('uncaughtException', function(err){...}); 来处理未捕获的错误,否则出错会导致整个服务退出。

当然,Node.js 还在不断的更新发展中,虽然目前我在公司的服务架构中还不敢使用它,我还是很希望它能够迅速成长起来,这样子我们开发服务中间件的时候,就会多出一个很棒的选项啦~
posted on 2016-08-30 15:46 思月行云 阅读(1001) 评论(0)  编辑 收藏 引用 所属分类: PHP

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理