PhP-WebSocket Server

в 19:56, , рубрики: php, websocket server, websockets, Веб-разработка, самопал, метки: , , ,

Всем привет!

Не так давно я начал переписывать один интересный проект, своего рода чат, в котором до моего вмешательства использовался комет-сервер на вебсокеты. Рассказывать о том, что такое WebSockets я не стану, т.к я думаю, что это всем уже известно, да и пост не об этом. Поэтому под катом просто немного PhP-кода.

Да, я знаю, что готовых решений полным-полно, однако меня они по тем или иным причинам не устроили и я решил написать свой PhP WebSocket Server. Плюс ко всему, это интересно :) Так как до этого я не сталкивался с сокетами, чтобы понять на деле, что это за блюдо и с чем его едят, пришлось покурить немного мануалов. Так же я разобрал и изучил принцип работы некоторых готовых решений, что позволило мне лучше узнать их. Сам код я выложу в конце статьи, а перед этим хотелось бы сказать пару слов об его использовании.

Сразу скажу, что это первая версия сервера (так сказать 1.0) и ее функционал еще немного сыроват.

Для того, чтобы запустить наш сервер необходимо сделать следущее:

// Инклудим наш сервер
require_once "./WebSocketApi.php";
//..
$wsApi = new WebSocketApi();
// Запускаем !
$wsApi->startWServer("127.0.0.7", 8000);

Ничего сложного, не правда ли?
В сервере есть 3 event'a.

  • onOpen — подключение клиента к серверу
  • onMsg — получение сообщения от клиента
  • onClose — закрытие соединения с клиентом

Эти event'ы позволяют нам вызывать пользовательские функции при определенных событиях.

Например:

// Инклудим наш сервер
require_once "./WebSocketApi.php";
//..
$wsApi = new WebSocketApi();
// Вызываем наши функции при соединении с клиентом
$wsApi->events['onOpen'] = array(
    "myFunction", "myFunction_2"
);
// Запускаем !
$wsApi->startWServer("127.0.0.7", 8000);

Или так:

// Инклудим наш сервер
require_once "./WebSocketApi.php";
//..
$wsApi = new WebSocketApi();
// Вызываем наши функции при соединении с клиентом
$wsApi->events['onMsg'] = array(
    "function_1" => array("param_1", "param_2"),
    "function_2" => array("param_1")
);
// Запускаем !
$wsApi->startWServer("127.0.0.7", 8000);

А вот и сам код:

Тынц

<?php
/**
 * User: byabuzyak
 * Date: 6/23/14
 * Time: 9:12 PM
 */
class WebSocketApi {

    const MAX_FRAME_RECV            = 100000;
    const MAX_TIMEOUT               = 25;

    const TIMEOUT_PONG              = 5;
    const TIMEOUT_RECV              = 10;

    const OPCODE_CONTINUATION       = 0;
    const OPCODE_TEXT               = 1;
    const OPCODE_BINARY             = 2;
    const OPCODE_CLOSE              = 8;
    const OPCODE_PING               = 9;
    const OPCODE_PONG               = 10;

    const READY_STATE_CONNECTING    = 0;
    const READY_STATE_OPEN          = 1;
    const READY_STATE_CLOSING       = 2;
    const READY_STATE_CLOSED        = 3;

    const PAYLOAD_LENGTH_16         = 126;
    const PAYLOAD_LENGTH_63         = 127;

    const STATUS_PROTOCOL_ERROR     = 1002;
    const STATUS_MESSAGE_TOO_BIG    = 1004;

    const FIN                       = 128;
    const MASK                      = 128;


    /**
     * @var array
     */
    public $clients                 = array();
    /**
     * @var array
     */
    public $ws                      = array();

    /**
     * @var array
     */
    public $events                  = array();
    /**
     * @var int
     */
    public $clientsCount            = 0;

    /**
     * @var null
     */
    public static $_instance        = null;

    /**
     * @return WebSocketApi|null
     */
    public static function getInstance(){
        if(self::$_instance === null){
            self::$_instance = new self();
        }

        return self::$_instance;
    }

    /**
     * @param string $host
     * @param int $port
     * @return bool
     */
    public function startWServer($host = '127.0.0.1', $port = 8000){
        if(isset($this->ws[0]))
            return false;

        if(phpversion() >= 5.5){
            cli_set_process_title("WebSockets Server 1.0");
        }

        if (!$this->ws[0] = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) {
            return false;
        }
        if (!socket_set_option($this->ws[0], SOL_SOCKET, SO_REUSEADDR, 1)) {
            socket_close($this->ws[0]);
            return false;
        }
        if (!socket_bind($this->ws[0], $host, $port)) {
            socket_close($this->ws[0]);
            return false;
        }
        if (!socket_listen($this->ws[0], 10)) {
            socket_close($this->ws[0]);
            return false;
        }

        $write  = null;
        $except = null;

        $nextPingCheck = time() + 1;
        while(isset($this->ws[0])){
            $read   = $this->ws;
            $count  = socket_select($read, $write, $except, 1);
            if($count === false){
                socket_close($this->ws[0]);
                return false;
            }elseif($count > 0){
                foreach($read as $clientId => $socket){
                    if($clientId != 0){
                        $buffer = '';
                        $bytes  = @socket_recv($socket, $buffer, 4096, 0);

                        if($bytes === false){
                            $this->_closeClient($clientId);
                        }elseif($bytes > 0){
                            if(!$this->_checkClient($clientId, $buffer, $bytes)){
                                $this->_closeClient($clientId);
                            }
                        }else{
                            $this->_removeClient($clientId);
                        }
                    }else{
                        $client = socket_accept($this->ws[0]);
                        if($client !== false){
//                            TODO: Добавить ограничения по макс. количеству клиентов
                            $ip = '';
                            $result = socket_getpeername($client, $ip);
                            $ip     = ip2long($ip);
                            if($result !== false){
                                $this->_addClient($client, $ip);
                            }else{
                                socket_close($client);
                            }
                        }
                    }
                }
            }
            if(time() >= $nextPingCheck){
                $nextPingCheck = time() + 1;
                $this->_checkActiveClients();
            }
        }
        return true;
    }

    /**
     *
     */
    private function _checkActiveClients(){
        $time = time();
        foreach($this->clients as $clientId => $socket){
            if($socket->state != self::READY_STATE_CLOSED){
                if($socket->ready_state !== false){
                    if($time >= $socket->ready_state + self::TIMEOUT_PONG){
                        $this->_closeClient($clientId);
                        $this->_removeClient($clientId);
                    }
                }elseif($time >= $socket->time + self::TIMEOUT_RECV){
                    if($socket->state != self::READY_STATE_CONNECTING){
                        $this->clients[$clientId]->ready_state = time();
                        $this->sendClientMsg($clientId, self::OPCODE_PING, '');
                    }else{
                        $this->_removeClient($clientId);
                    }
                }
            }
        }
    }

    /**
     * @param $socket
     * @param string $clientIp
     */
    private function _addClient($socket, $clientIp = ''){
        $this->clientsCount += 1;
        $clientId = $this->_nextId();

        $this->clients[$clientId]   = (object) array(
            'socket'        => $socket,
            'msg_buffer'    => '',
            'state'         => self::READY_STATE_CONNECTING,
            'time'          => time(),
            'ready_state'   => false,
            'close_status'  => 0,
            'client_ip'     => $clientIp,
            'header_length' => false,
            'buffer_length' => 0,
            'buffer'        => '',
            'msg_opcode'    => 0,
            'msg_data_len'  => 0
        );
        $this->ws[$clientId]        = $socket;
    }

    /**
     * @return int
     */
    private function _nextId(){
        $i = 1;
        while(isset($this->ws[$i]))
            $i++;
        return $i;
    }

    /**
     * @param $clientId
     */
    private function _removeClient($clientId){
        if(array_key_exists('onClose', $this->events)){
            foreach($this->events['onClose'] as $function => $args){
                if(is_array($args)){
                    call_user_func_array($function, $args);
                }else{
                    call_user_func($function, $args);
                }
            }
        }

        socket_close($this->clients[$clientId]->socket);
        unset($this->ws[$clientId], $this->clients[$clientId]);
        $this->clientsCount -= 1;
    }

    /**
     * @param $clientId
     * @return bool
     */
    private function _closeClient($clientId){
        if($this->clients[$clientId]->state == self::READY_STATE_CLOSING || $this->clients[$clientId]->state == self::READY_STATE_CLOSED){
            return true;
        }

        $this->clients[$clientId]->close_status = self::STATUS_PROTOCOL_ERROR;
        $this->sendClientMsg($clientId, self::OPCODE_CLOSE, pack('n', self::STATUS_PROTOCOL_ERROR));
        $this->clients[$clientId]->state = self::READY_STATE_CLOSING;
    }

    /**
     * @param $clientId
     * @param $opCode
     * @param $msg
     * @return bool
     */
    public function sendClientMsg($clientId, $opCode, $msg){
        if($this->clients[$clientId]->state == self::READY_STATE_CLOSING || $this->clients[$clientId]->state == self::READY_STATE_CLOSED){
            return true;
        }

        $msgLength  = strlen($msg);
        $buffSize   = 4096;

        $frameCount = ceil($msgLength / $buffSize);
        if($frameCount == 0){
            $frameCount = 1;
        }

        $maxFrame = $frameCount - 1;
        $lastFrameBuffLength = ($msgLength % $buffSize) != 0 ? $msgLength % $buffSize : ($msgLength != 0 ? $buffSize : 0);

        for($i=0; $i<$frameCount; $i++){
            $final      = $i != $maxFrame ? 0 : self::FIN;
            $opCode     = $i != 0 ? self::OPCODE_CONTINUATION : $opCode;
            $buffLength = $i != $maxFrame ? $buffSize : $lastFrameBuffLength;

            if($buffLength <= 125){
                $payloadLength      =   $buffLength;
                $payloadLengthExt   =   '';
                $payloadLengthExtL  =   0;
            }elseif($buffLength <= 65535){
                $payloadLength      =   self::PAYLOAD_LENGTH_16;
                $payloadLengthExt   =   pack('n', $buffLength);
                $payloadLengthExtL  =   2;
            }else{
                $payloadLength      =   self::PAYLOAD_LENGTH_63;
                $payloadLengthExt   =   pack('xxxxN', $buffLength);
                $payloadLengthExtL  =   8;
            }

            $buffer = pack('n', (($final | $opCode) << 8) | $payloadLength) . $payloadLengthExt . substr($msg, $i * $buffSize, $buffLength);
            $socket = $this->clients[$clientId]->socket;
            $left   = 2 + $payloadLengthExtL + $buffLength;

            do{
                $sent = @socket_send($socket, $buffer, $left, 0);
                if($sent === false){
                    return false;
                }
                $left -= $sent;
                if($sent > 0){
                    $buffer = substr($buffer, $sent);
                }
            }
            while($left > 0);
        }

        return true;
    }

    /**
     * @param $clientId
     * @param $buffer
     * @param $bLength
     * @return bool
     */
    private function _checkClient($clientId, &$buffer, $bLength){
        if($this->clients[$clientId]->state == self::READY_STATE_OPEN){
            $result = $this->_buildClientFrame($clientId, $buffer, $bLength);
        }elseif($this->clients[$clientId]->state == self::READY_STATE_CONNECTING){
            $result = $this->_makeHandShake($clientId, $buffer);
            if($result){
                $this->clients[$clientId]->state = self::READY_STATE_OPEN;
                if(array_key_exists('onOpen', $this->events)){
                    foreach($this->events['onOpen'] as $function => $args){
                        if(is_array($args)){
                            call_user_func_array($function, $args);
                        }else{
                            call_user_func($function, $args);
                        }
                    }
                }
            }
        }else{
            $result = false;
        }

        return $result;
    }

    /**
     * @param $clientId
     * @param $buffer
     * @param $bufferLength
     * @return bool
     */
    private function _buildClientFrame($clientId, &$buffer, $bufferLength){
        $this->clients[$clientId]->buffer_length += $bufferLength;
        $this->clients[$clientId]->buffer .= $buffer;

        if($this->clients[$clientId]->header_length !== false || $this->_checkSizeClientFrame($clientId) == true){
            $headerLength   = ($this->clients[$clientId]->header_length <= 125 ? 0 : ($this->clients[$clientId]->header_length <= 65535 ? 2 : 8)) + 6;
            $frameLength    = $this->clients[$clientId]->header_length + $headerLength;
            if($this->clients[$clientId]->buffer_length >= $frameLength){
                $nextFrameLength = $this->clients[$clientId]->buffer_length - $frameLength;
                if($nextFrameLength > 0){
                    $this->clients[$clientId]->buffer_length -= $nextFrameLength;
                    $nextFrameBytes         = substr($this->clients[$clientId]->buffer, $frameLength);
                    $this->clients[$clientId]->buffer = substr($this->clients[$clientId]->buffer, 0, $frameLength);
                }

                $result = $this->_processClientFrame($clientId);

                if(isset($this->clients[$clientId])){
                    $this->clients[$clientId]->header_length = false;
                    $this->clients[$clientId]->buffer_length = 0;
                    $this->clients[$clientId]->buffer = '';
                }

                if($nextFrameLength <= 0 || !$result){
                    return $result;
                }

                return $this->_buildClientFrame($clientId, $nextFrameBytes, $nextFrameLength);
            }
        }

        return true;
    }

    /**
     * @param $clientId
     * @return bool
     */
    private function _processClientFrame($clientId){
        $this->clients[$clientId]->time = time();

        $buffer = &$this->clients[$clientId]->buffer;
        if(substr($buffer, 5, 1) === false)
            return false;

        $a1 = ord(substr($buffer, 0, 1));
        $a2 = ord(substr($buffer, 1, 1));

        $f      = $a1 & self::FIN;
        $opCode = $a1 & 15;
        $mask   = $a2 & self::MASK;

        if(!$mask)
            return false;

        $seek       = $this->clients[$clientId]->header_length <= 125 ? 2 : ($this->clients[$clientId]->header_length <= 65535 ? 4 : 10);
        $maskKey    = substr($buffer, $seek, 4);

        $array = unpack('Na', $maskKey);
        $maskKey = $array['a'];
        $maskKey = array(
            $maskKey >> 24,
            ($maskKey >> 16) & 255,
            ($maskKey >> 8) & 255,
            $maskKey & 255
        );
        $seek += 4;

        if (substr($buffer, $seek, 1) !== false) {
            $data = str_split(substr($buffer, $seek));
            foreach($data as $key => $byte){
                $data[$key] = chr(ord($byte) ^ ($maskKey[$key % 4]));
            }
            $data = implode('', $data);
        }else{
            $data = '';
        }

        if ($opCode != self::OPCODE_CONTINUATION && $this->clients[$clientId]->msg_data_len > 0) {
            $this->clients[$clientId]->msg_data_len    = 0;
            $this->clients[$clientId]->msg_buffer     = '';
        }

        if($f == self::FIN){
            if($opCode != self::OPCODE_CONTINUATION){
                return
                    $this->_processClientMsg($clientId, $opCode, $data, $this->clients[$clientId]->header_length);
            }else{
                $this->clients[$clientId]->msg_data_len += $this->clients[$clientId]->header_length;
                $this->clients[$clientId]->msg_buffer  .= $data;

                $result = $this->_processClientMsg($clientId, $this->clients[$clientId]->msg_opcode, $this->clients[$clientId]->msg_buffer, $this->clients[$clientId]->msg_data_len);

                if(isset($this->clients[$clientId])){
                    $this->clients[$clientId]->msg_buffer     = '';
                    $this->clients[$clientId]->msg_opcode    = 0;
                    $this->clients[$clientId]->msg_data_len    = 0;
                }

                return $result;
            }
        }else{
            if($opCode & 8)
                return false;
            $this->clients[$clientId]->msg_data_len += $this->clients[$clientId]->header_length;
            $this->clients[$clientId]->msg_buffer  .= $data;

            if($opCode != self::OPCODE_CONTINUATION){
                $this->clients[$clientId]->msg_opcode = $opCode;
            }
        }
        return true;
    }

    /**
     * @param $clientId
     * @param $opCode
     * @param $data
     * @param $dataLength
     * @return bool
     */
    private function _processClientMsg($clientId, $opCode, &$data, $dataLength){
        if($opCode == self::OPCODE_PING){
            $this->sendClientMsg($clientId, self::OPCODE_PONG, $data);
        }elseif($opCode == self::OPCODE_PONG){
            if($this->clients[$clientId]->ready_state !== false){
                $this->clients[$clientId]->ready_state = false;
            }
        }elseif($opCode == self::OPCODE_CLOSE){
            if($this->clients[$clientId]->state == self::READY_STATE_CLOSING){
                $this->clients[$clientId]->state = self::READY_STATE_CLOSED;
            }else{
//                TODO: добавить типы закрытия
                $this->_closeClient($clientId);
            }
            $this->_removeClient($clientId);
        }elseif($opCode == self::OPCODE_TEXT || $opCode == self::OPCODE_BINARY){
            if(array_key_exists('onMsg', $this->events)){
                foreach($this->events['onMsg'] as $function => $args){
                    if(is_array($args)){
                        call_user_func_array($function, $args);
                    }else{
                        call_user_func($function, $args);
                    }
                }
            }
        }else{
            return false;
        }

        return true;
    }

    /**
     * @param $clientId
     * @return bool
     */
    private function _checkSizeClientFrame($clientId){
        if($this->clients[$clientId]->buffer_length > 1){
            $payloadLength = ord(substr($this->clients[$clientId]->buffer, 1, 1)) & 127;
            if ($payloadLength <= 125) {
                $this->clients[$clientId]->header_length = $payloadLength;
            }elseif($payloadLength == 126){
                if (substr($this->clients[$clientId]->buffer, 3, 1) !== false) {
                    $payloadLengthExtended  = substr($this->clients[$clientId]->buffer, 2, 2);
                    $array                  = unpack('na', $payloadLengthExtended);
                    $this->clients[$clientId]->header_length = $array['a'];
                }
            }else{
                if (substr($this->clients[$clientId]->buffer, 9, 1) !== false) {
                    $payloadLengthExtended      = substr($this->clients[$clientId]->buffer, 2, 8);
                    $payloadLengthExtended32_1  = substr($payloadLengthExtended, 0, 4);
                    $array                      = unpack('Na', $payloadLengthExtended32_1);

                    if ($array['a'] != 0 || ord(substr($payloadLengthExtended, 4, 1)) & 128) {
                        $this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG);
                        return false;
                    }

                    $payloadLengthExtended32_2  = substr($payloadLengthExtended, 4, 4);
                    $array                      = unpack('Na', $payloadLengthExtended32_2);

                    if ($array['a'] > 2147479538) {
                        $this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG);
                        return false;
                    }

                    $this->clients[$clientId]->header_length = $array['a'];
                }
            }

            if ($this->clients[$clientId]->header_length !== false) {
                if ($this->clients[$clientId]->header_length > self::MAX_FRAME_RECV) {
                    $this->clients[$clientId]->header_length = false;
                    $this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG);
                    return false;
                }

                $controlFrame = (ord(substr($this->clients[$clientId]->buffer, 0, 1)) & 8) == 8;
                if (!$controlFrame) {
                    $newMessagePayloadLength = $this->clients[$clientId]->msg_data_len + $this->clients[$clientId]->header_length;
                    if ($newMessagePayloadLength > self::MAX_FRAME_RECV || $newMessagePayloadLength > 2147483647) {
                        $this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG);
                        return false;
                    }
                }

                return true;
            }
        }
        return false;
    }

    /**
     * @param $clientId
     * @param $buffer
     * @return bool
     */
    private function _makeHandShake($clientId, $buffer){
        $sep = strpos($buffer, "rnrn");
        if (!$sep) return false;

        $headers = explode("rn", substr($buffer, 0, $sep));
        $headersCount = sizeof($headers);
        if ($headersCount < 1)
            return false;


        $request = &$headers[0];
        $requestParts = explode(' ', $request);
        $requestPartsSize = sizeof($requestParts);
        if ($requestPartsSize < 3)
            return false;

        if (strtoupper($requestParts[0]) != 'GET')
            return false;

        $httpPart = &$requestParts[$requestPartsSize - 1];
        $httpParts = explode('/', $httpPart);
        if (!isset($httpParts[1]) || (float) $httpParts[1] < 1.1)
            return false;

        $headersKeyed = array();
        for ($i=1; $i<$headersCount; $i++) {
            $parts = explode(':', $headers[$i]);
            if (!isset($parts[1]))
                return false;

            $headersKeyed[trim($parts[0])] = trim($parts[1]);
        }

        if (!isset($headersKeyed['Host']))
            return false;

        if (!isset($headersKeyed['Sec-WebSocket-Key']))
            return false;

        $key = $headersKeyed['Sec-WebSocket-Key'];
        if (strlen(base64_decode($key)) != 16)
            return false;

        if (!isset($headersKeyed['Sec-WebSocket-Version']) || (int) $headersKeyed['Sec-WebSocket-Version'] < 7)
            return false;

        $hash = base64_encode(sha1($key.'258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true));

        $headers = array(
            'HTTP/1.1 101 Switching Protocols',
            'Upgrade: websocket',
            'Connection: Upgrade',
            'Sec-WebSocket-Accept: '.$hash
        );
        $headers    = implode("rn", $headers)."rnrn";
        $socket     = $this->clients[$clientId]->socket;

        $left = strlen($headers);
        do{
            $sent = @socket_send($socket, $headers, $left, 0);
            if($sent === false)
                return false;

            $left -= $sent;
            if($sent > 0)
                $headers = substr($headers, $sent);
        }
        while($left > 0);

        return true;
    }
} 

Чуть позже создам репозиторий на гитхабе и выложу туда. В дальнейшем планирую поддерживать и развивать.
Хотелось бы услышать от Вас уважаемых советы, пожелания или замечания.

Спасибо за внимание!

P.S в планах реализация интересных плюшек. Просто пока что не хватает времени на них.

Автор: byabuzyak

Источник

Поделиться

* - обязательные к заполнению поля