- PVSM.RU - https://www.pvsm.ru -

Делаем вебсокеты на PHP с нуля

Некоторое время назад я выбирал библиотеку для работы с вебсокетами. На просторах интернета я натыкался на статьи по интеграции node.js с yii, а почти все статьи о вебсокетах на хабре ограничивались инструкциями к тому, как использовать phpdaemon.

Я изучал библиотеки phpdaemon [1] и ratchet [2], они достаточно монструозны (причём используя ratchet для отправки сообщения конкретному пользователю рекомендовано дополнительно использовать wamp [3]). Мне не совсем было понятно для чего использовать таких монстров, которые требуют установку других монстров. Почитав исходники этих, а также других библиотек, я разобрался как всё устроено и мне захотелось написать простой вебсокет-сервер на php самостоятельно. Это помогло мне закрепить изученный материал и наткнуться на некоторые подводные камни, о которых я не имел представления.

Так я решил написать необходимый для меня функционал с нуля.

Получившийся код и ссылка на демонстрационный чат в конце статьи.

Поставленные цели:

1) разобраться с серверными сокетами в php
2) разобраться с протоколом вебсокетов
3) написать с нуля простой сервер вебсокетов

1) Серверные сокеты в php

До этого момента я имел смутные представления о серверных сокетах. Почитав исходники нескольких библиотек для работы с вебсокетами я столкнулся с двумя схемами их реализаций:

используя расширение php «socket»:

$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);//создаём сокет
socket_bind($socket, '127.0.0.1', 8000);//привязываем его к указанным ip и порту
socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1);//разрешаем использовать один порт для нескольких соединений
socket_listen($socket);//слушаем сокет

или используя расширение php «stream»:

$socket = stream_socket_server("tcp://127.0.0.1:8000", $errno, $errstr);

Я предпочёл второй вариант ввиду его краткости.

Итак, мы создали серверный сокет и теперь хотим обрабатывать новые соединения к нему, для этого опять же есть два варианта

while ($connect = stream_socket_accept($socket, -1)) {//ожидаем новое соединение (без таймаута)
    ...обрабатываем $connect
}

Пример простого http сервера, который на все запросы отвечает: Привет

#!/usr/bin/env php
<?php

$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);

if (!$socket) {
    die("$errstr ($errno)n");
}

while ($connect = stream_socket_accept($socket, -1)) {
    fwrite($connect, "HTTP/1.1 200 OKrnContent-Type: text/htmlrnConnection: closernrnПривет");
    fclose($connect);
}

fclose($socket);

или с использованием stream_select

$connects = array();
while (true) {
    //формируем массив прослушиваемых сокетов:
    $read = $connects;
    $read[] = $socket;
    $write = $except = null;
    
    if (!stream_select($read, $write, $except, null)) {//ожидаем сокеты доступные для чтения (без таймаута)
        break;
    }

    if (in_array($socket, $read)) {//есть новое соединение
        $connect = stream_socket_accept($socket, -1);//принимаем новое соединение
        $connects[] = $connect;//добавляем его в список необходимых для обработки
        unset($read[ array_search($socket, $read) ]);
    }

    foreach($read as $connect) {//обрабатываем все соединения
        ...обрабатываем $connect
        unset($connects[ array_search($connect, $connects) ]);
    }
}

Пример простого http сервера с использованием stream_select, который на все запросы отвечает: Привет

#!/usr/bin/env php
<?php

$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);

if (!$socket) {
    die("$errstr ($errno)n");
}

$connects = array();
while (true) {
    //формируем массив прослушиваемых сокетов:
    $read = $connects;
    $read []= $socket;
    $write = $except = null;

    if (!stream_select($read, $write, $except, null)) {//ожидаем сокеты доступные для чтения (без таймаута)
        break;
    }

    if (in_array($socket, $read)) {//есть новое соединение
        $connect = stream_socket_accept($socket, -1);//принимаем новое соединение
        $connects[] = $connect;//добавляем его в список необходимых для обработки
        unset($read[ array_search($socket, $read) ]);
    }

    foreach($read as $connect) {//обрабатываем все соединения
        $headers = '';
        while ($buffer = rtrim(fgets($connect))) {
            $headers .= $buffer;
        }
        fwrite($connect, "HTTP/1.1 200 OKrnContent-Type: text/htmlrnConnection: closernrnПривет");
        fclose($connect);
        unset($connects[ array_search($connect, $connects) ]);
    }
}

fclose($server);

Т.к. нам в дальнейшем нужно будет одновременно обрабатывать и серверный сокет на предмет новых соединений, и уже существующие подключения, на предмет новых сообщений, то остановимся на втором варианте.

2) Протокол вебсокетов

В этой статье [4] хорошо описан протокол взаимодействия.
Нас интересует два момента:

«Рукопожатие» или handshake:

Считываем значение Sec-WebSocket-Key из пришедшего заголовка от клиента, рассчитываем на его основе Sec-WebSocket-Accept и отправляем итоговый ответ:

$SecWebSocketAccept = base64_encode(pack('H*', sha1($SecWebSocketKey . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
$response = "HTTP/1.1 101 Web Socket Protocol Handshakern" .
    "Upgrade: websocketrn" .
    "Connection: Upgradern" .
    "Sec-WebSocket-Accept:$SecWebSocketAcceptrnrn";

Пример функции, которая это делает

function handshake($connect) {
    $info = array();

    $line = fgets($connect);
    $header = explode(' ', $line);
    $info['method'] = $header[0];
    $info['uri'] = $header[1];

    //считываем заголовки из соединения
    while ($line = rtrim(fgets($connect))) {
        if (preg_match('/A(S+): (.*)z/', $line, $matches)) {
            $info[$matches[1]] = $matches[2];
        } else {
            break;
        }
    }

    $address = explode(':', stream_socket_get_name($connect, true)); //получаем адрес клиента
    $info['ip'] = $address[0];
    $info['port'] = $address[1];

    if (empty($info['Sec-WebSocket-Key'])) {
        return false;
    }

    //отправляем заголовок согласно протоколу вебсокета
    $SecWebSocketAccept = base64_encode(pack('H*', sha1($info['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
    $upgrade = "HTTP/1.1 101 Web Socket Protocol Handshakern" .
	"Upgrade: websocketrn" .
	"Connection: Upgradern" .
	"Sec-WebSocket-Accept:$SecWebSocketAcceptrnrn";
    fwrite($connect, $upgrade);

    return $info;
}

обмен сообщениями

После получения данных из вебсокета нам нужно их раскодировать, а при отправке закодировать.
Всё в той же статье [4] хорошо описано кодирование сообщений, но нам по-сути нужны только две функции: decode и encode.

Пример реализации функций decode и encode

function decode($data)
{
$unmaskedPayload = '';
$decodedData = array();

// estimate frame type:
$firstByteBinary = sprintf('%08b', ord($data[0]));
$secondByteBinary = sprintf('%08b', ord($data[1]));
$opcode = bindec(substr($firstByteBinary, 4, 4));
$isMasked = ($secondByteBinary[0] == '1')? true: false;
$payloadLength = ord($data[1]) & 127;

// unmasked frame is received:
if (!$isMasked) {
return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)');
}

switch ($opcode) {
// text frame:
case 1:
$decodedData['type'] = 'text';
break;

case 2:
$decodedData['type'] = 'binary';
break;

// connection close frame:
case 8:
$decodedData['type'] = 'close';
break;

// ping frame:
case 9:
$decodedData['type'] = 'ping';
break;

// pong frame:
case 10:
$decodedData['type'] = 'pong';
break;

default:
return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)');
}

if ($payloadLength === 126) {
$mask = substr($data, 4, 4);
$payloadOffset = 8;
$dataLength = bindec(sprintf('%08b', ord($data[2])). sprintf('%08b', ord($data[3]))) + $payloadOffset;
} elseif ($payloadLength === 127) {
$mask = substr($data, 10, 4);
$payloadOffset = 14;
$tmp = '';
for ($i = 0; $i < 8; $i++) {
$tmp .= sprintf('%08b', ord($data[$i + 2]));
}
$dataLength = bindec($tmp) + $payloadOffset;
unset($tmp);
} else {
$mask = substr($data, 2, 4);
$payloadOffset = 6;
$dataLength = $payloadLength + $payloadOffset;
}

/**
* We have to check for large frames here. socket_recv cuts at 1024 bytes
* so if websocket-frame is > 1024 bytes we have to wait until whole
* data is transferd.
*/
if (strlen($data) < $dataLength) {
return false;
}

if ($isMasked) {
for ($i = $payloadOffset; $i < $dataLength; $i++) {
$j = $i — $payloadOffset;
if (isset($data[$i])) {
$unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
}
}
$decodedData['payload'] = $unmaskedPayload;
} else {
$payloadOffset = $payloadOffset — 4;
$decodedData['payload'] = substr($data, $payloadOffset);
}

return $decodedData;
}

function encode($payload, $type = 'text', $masked = false)
{
$frameHead = array();
$payloadLength = strlen($payload);

switch ($type) {
case 'text':
// first byte indicates FIN, Text-Frame (10000001):
$frameHead[0] = 129;
break;

case 'close':
// first byte indicates FIN, Close Frame(10001000):
$frameHead[0] = 136;
break;

case 'ping':
// first byte indicates FIN, Ping frame (10001001):
$frameHead[0] = 137;
break;

case 'pong':
// first byte indicates FIN, Pong frame (10001010):
$frameHead[0] = 138;
break;
}

// set mask and payload length (using 1, 3 or 9 bytes)
if ($payloadLength > 65535) {
$payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
$frameHead[1] = ($masked === true)? 255: 127;
for ($i = 0; $i < 8; $i++) {
$frameHead[$i + 2] = bindec($payloadLengthBin[$i]);
}
// most significant bit MUST be 0
if ($frameHead[2] > 127) {
return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)');
}
} elseif ($payloadLength > 125) {
$payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
$frameHead[1] = ($masked === true)? 254: 126;
$frameHead[2] = bindec($payloadLengthBin[0]);
$frameHead[3] = bindec($payloadLengthBin[1]);
} else {
$frameHead[1] = ($masked === true)? $payloadLength + 128: $payloadLength;
}

// convert frame-head to string:
foreach (array_keys($frameHead) as $i) {
$frameHead[$i] = chr($frameHead[$i]);
}
if ($masked === true) {
// generate a random mask:
$mask = array();
for ($i = 0; $i < 4; $i++) {
$mask[$i] = chr(rand(0, 255));
}

$frameHead = array_merge($frameHead, $mask);
}
$frame = implode('', $frameHead);

// append payload to frame:
for ($i = 0; $i < $payloadLength; $i++) {
$frame .= ($masked === true)? $payload[$i] ^ $mask[$i % 4]: $payload[$i];
}

return $frame;
}

Простой сервер вебсокетов

Итак, у нас есть вся необходимая информация.
Используя код простого http сервера из первой части, а также функции handshake, decode и encode из второй мы можем собрать простой сервер вебсокетов.

Пример реализации простого сервера вебсокетов

#!/usr/bin/env php
<?php

$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);

if (!$socket) {
    die("$errstr ($errno)n");
}

$connects = array();
while (true) {
    //формируем массив прослушиваемых сокетов:
    $read = $connects;
    $read []= $socket;
    $write = $except = null;

    if (!stream_select($read, $write, $except, null)) {//ожидаем сокеты доступные для чтения (без таймаута)
        break;
    }

    if (in_array($socket, $read)) {//есть новое соединение
        //принимаем новое соединение и производим рукопожатие:
        if (($connect = stream_socket_accept($socket, -1)) && $info = handshake($connect)) {
            $connects[] = $connect;//добавляем его в список необходимых для обработки
            onOpen($connect, $info);//вызываем пользовательский сценарий
        }
        $connects[] = $connect;//добавляем его в список необходимых для обработки
        unset($read[ array_search($socket, $read) ]);
    }

    foreach($read as $connect) {//обрабатываем все соединения
        $data = fread($connect, 100000);

        if (!$data) { //соединение было закрыто
            fclose($connect);
            unset($connects[ array_search($connect, $connects) ]);
            onClose($connect);//вызываем пользовательский сценарий
            continue;
        }

        onMessage($connect, $data);//вызываем пользовательский сценарий
    }
}

fclose($server);

function handshake($connect) {
    $info = array();

    $line = fgets($connect);
    $header = explode(' ', $line);
    $info['method'] = $header[0];
    $info['uri'] = $header[1];

    //считываем заголовки из соединения
    while ($line = rtrim(fgets($connect))) {
        if (preg_match('/A(S+): (.*)z/', $line, $matches)) {
            $info[$matches[1]] = $matches[2];
        } else {
            break;
        }
    }

    $address = explode(':', stream_socket_get_name($connect, true)); //получаем адрес клиента
    $info['ip'] = $address[0];
    $info['port'] = $address[1];

    if (empty($info['Sec-WebSocket-Key'])) {
        return false;
    }

    //отправляем заголовок согласно протоколу вебсокета
    $SecWebSocketAccept = base64_encode(pack('H*', sha1($info['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
    $upgrade = "HTTP/1.1 101 Web Socket Protocol Handshakern" .
        "Upgrade: websocketrn" .
        "Connection: Upgradern" .
        "Sec-WebSocket-Accept:$SecWebSocketAcceptrnrn";
    fwrite($connect, $upgrade);

    return $info;
}

function encode($payload, $type = 'text', $masked = false)
{
    $frameHead = array();
    $payloadLength = strlen($payload);

    switch ($type) {
        case 'text':
            // first byte indicates FIN, Text-Frame (10000001):
            $frameHead[0] = 129;
            break;

        case 'close':
            // first byte indicates FIN, Close Frame(10001000):
            $frameHead[0] = 136;
            break;

        case 'ping':
            // first byte indicates FIN, Ping frame (10001001):
            $frameHead[0] = 137;
            break;

        case 'pong':
            // first byte indicates FIN, Pong frame (10001010):
            $frameHead[0] = 138;
            break;
    }

    // set mask and payload length (using 1, 3 or 9 bytes)
    if ($payloadLength > 65535) {
        $payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
        $frameHead[1] = ($masked === true) ? 255 : 127;
        for ($i = 0; $i < 8; $i++) {
            $frameHead[$i + 2] = bindec($payloadLengthBin[$i]);
        }
        // most significant bit MUST be 0
        if ($frameHead[2] > 127) {
            return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)');
        }
    } elseif ($payloadLength > 125) {
        $payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
        $frameHead[1] = ($masked === true) ? 254 : 126;
        $frameHead[2] = bindec($payloadLengthBin[0]);
        $frameHead[3] = bindec($payloadLengthBin[1]);
    } else {
        $frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength;
    }

    // convert frame-head to string:
    foreach (array_keys($frameHead) as $i) {
        $frameHead[$i] = chr($frameHead[$i]);
    }
    if ($masked === true) {
        // generate a random mask:
        $mask = array();
        for ($i = 0; $i < 4; $i++) {
            $mask[$i] = chr(rand(0, 255));
        }

        $frameHead = array_merge($frameHead, $mask);
    }
    $frame = implode('', $frameHead);

    // append payload to frame:
    for ($i = 0; $i < $payloadLength; $i++) {
        $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
    }

    return $frame;
}

function decode($data)
{
    $unmaskedPayload = '';
    $decodedData = array();

    // estimate frame type:
    $firstByteBinary = sprintf('%08b', ord($data[0]));
    $secondByteBinary = sprintf('%08b', ord($data[1]));
    $opcode = bindec(substr($firstByteBinary, 4, 4));
    $isMasked = ($secondByteBinary[0] == '1') ? true : false;
    $payloadLength = ord($data[1]) & 127;

    // unmasked frame is received:
    if (!$isMasked) {
        return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)');
    }

    switch ($opcode) {
        // text frame:
        case 1:
            $decodedData['type'] = 'text';
            break;

        case 2:
            $decodedData['type'] = 'binary';
            break;

        // connection close frame:
        case 8:
            $decodedData['type'] = 'close';
            break;

        // ping frame:
        case 9:
            $decodedData['type'] = 'ping';
            break;

        // pong frame:
        case 10:
            $decodedData['type'] = 'pong';
            break;

        default:
            return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)');
    }

    if ($payloadLength === 126) {
        $mask = substr($data, 4, 4);
        $payloadOffset = 8;
        $dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
    } elseif ($payloadLength === 127) {
        $mask = substr($data, 10, 4);
        $payloadOffset = 14;
        $tmp = '';
        for ($i = 0; $i < 8; $i++) {
            $tmp .= sprintf('%08b', ord($data[$i + 2]));
        }
        $dataLength = bindec($tmp) + $payloadOffset;
        unset($tmp);
    } else {
        $mask = substr($data, 2, 4);
        $payloadOffset = 6;
        $dataLength = $payloadLength + $payloadOffset;
    }

    /**
     * We have to check for large frames here. socket_recv cuts at 1024 bytes
     * so if websocket-frame is > 1024 bytes we have to wait until whole
     * data is transferd.
     */
    if (strlen($data) < $dataLength) {
        return false;
    }

    if ($isMasked) {
        for ($i = $payloadOffset; $i < $dataLength; $i++) {
            $j = $i - $payloadOffset;
            if (isset($data[$i])) {
                $unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
            }
        }
        $decodedData['payload'] = $unmaskedPayload;
    } else {
        $payloadOffset = $payloadOffset - 4;
        $decodedData['payload'] = substr($data, $payloadOffset);
    }

    return $decodedData;
}

//пользовательские сценарии:

function onOpen($connect, $info) {
    echo "openn";
    fwrite($connect, encode('Привет'));
}

function onClose($connect) {
    echo "closen";
}

function onMessage($connect, $data) {
    echo decode($data)['payload'] . "n";
}

В приведённом примере можно менять пользовательские сценарии onOpen, onClose и onMessage для реализации необходимого функционала.

Поставленные цели достигнуты.
Если этот материал вам покажется интересным, то в следующей статье я опишу как можно запускать несколько процессов для обработки соединений (один мастер и несколько воркеров), межпроцессное взаимодействие, интеграцию с вашим фреймворком на примере компонента yii.

демонстрационный чат с вышеописанными функциями [5]

Код демонстрационного чата

#!/usr/bin/env php
<?php

class WebsocketServer
{
    public function __construct($config) {
        $this->config = $config;
    }

    public function start() {
        //открываем серверный сокет
        $server = stream_socket_server("tcp://{$this->config['host']}:{$this->config['port']}", $errorNumber, $errorString);

        if (!$server) {
            die("error: stream_socket_server: $errorString ($errorNumber)rn");
        }

        list($pid, $master, $workers) = $this->spawnWorkers();//создаём дочерние процессы

        if ($pid) {//мастер
            fclose($server);//мастер не будет обрабатывать входящие соединения на основном сокете
            $WebsocketMaster = new WebsocketMaster($workers);//мастер будет пересылать сообщения между воркерами
            $WebsocketMaster->start();
        } else {//воркер
            $WebsocketHandler = new WebsocketHandler($server, $master);
            $WebsocketHandler->start();
        }
    }

    protected function spawnWorkers() {
        $master = null;
        $workers = array();
        $i = 0;
        while ($i < $this->config['workers']) {
            $i++;
            //создаём парные сокеты, через них будут связываться мастер и воркер
            $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);

            $pid = pcntl_fork();//создаём форк
            if ($pid == -1) {
                die("error: pcntl_forkrn");
            } elseif ($pid) { //мастер
                fclose($pair[0]);
                $workers[$pid] = $pair[1];//один из пары будет в мастере
            } else { //воркер
                fclose($pair[1]);
                $master = $pair[0];//второй в воркере
                break;
            }
        }

        return array($pid, $master, $workers);
    }
}

class WebsocketMaster
{
    protected $workers = array();
    protected $clients = array();

    public function __construct($workers) {
        $this->clients = $this->workers = $workers;
    }

    public function start() {
        while (true) {
            //подготавливаем массив всех сокетов, которые нужно обработать
            $read = $this->clients;
            //$read[] = $service;

            stream_select($read, $write, $except, null);//обновляем массив сокетов, которые можно обработать

            if ($read) {//пришли данные от подключенных клиентов
                foreach ($read as $client) {
                    $data = fread($client, 100000);

                    if (!$data) { //соединение было закрыто
                        unset($this->clients[intval($client)]);
                        @fclose($client);
                        continue;
                    }

                    foreach ($this->workers as $worker) {//пересылаем данные во все воркеры
                        if ($worker !== $client) {
                            fwrite($worker, $data);
                        }
                    }
                }
            }
        }
    }
}

abstract class WebsocketWorker
{
    protected $clients = array();
    protected $server;
    protected $master;
    protected $pid;

    public function __construct($server, $master) {
        $this->server = $server;
        $this->master = $master;
        $this->pid = posix_getpid();
    }

    public function start() {
        while (true) {
            //подготавливаем массив всех сокетов, которые нужно обработать
            $read = $this->clients;
            $read[] = $this->server;
            $read[] = $this->master;

            stream_select($read, $write, $except, null);//обновляем массив сокетов, которые можно обработать

            if (in_array($this->server, $read)) { //на серверный сокет пришёл запрос от нового клиента
                //подключаемся к нему и делаем рукопожатие, согласно протоколу вебсокета
                if (($client = stream_socket_accept($this->server, -1)) && $info = $this->handshake($client)) {
                    $this->clients[intval($client)] = $client;
                    $this->onOpen($client, $info);//вызываем пользовательский сценарий
                }

                //удаляем сервеный сокет из массива, чтобы не обработать его в этом цикле ещё раз
                unset($read[array_search($this->server, $read)]);
            }

            if (in_array($this->master, $read)) { //пришли данные от мастера
                $data = fread($this->master, 100000);

                $this->onSend($data);//вызываем пользовательский сценарий

                //удаляем мастера из массива, чтобы не обработать его в этом цикле ещё раз
                unset($read[array_search($this->master, $read)]);
            }

            if ($read) {//пришли данные от подключенных клиентов
                foreach ($read as $client) {
                    $data = fread($client, 100000);

                    if (!$data) { //соединение было закрыто
                        unset($this->clients[intval($client)]);
                        @fclose($client);
                        $this->onClose($client);//вызываем пользовательский сценарий
                        continue;
                    }

                    $this->onMessage($client, $data);//вызываем пользовательский сценарий
                }
            }

        }
    }

    protected function handshake($client) {
        $info = array();

        $line = fgets($client);

        if (!$line) {
            return false;
        }

        $header = explode(' ', $line);
        $info['method'] = $header[0];
        $info['uri'] = $header[1];

        //считываем загаловки из соединения
        while ($line = rtrim(fgets($client))) {
            if (preg_match('/A(S+): (.*)z/', $line, $matches)) {
                $info[$matches[1]] = $matches[2];
            } else {
                break;
            }
        }

        $address = explode(':', stream_socket_get_name($client, true)); //получаем адрес клиента
        $info['ip'] = $address[0];
        $info['port'] = $address[1];

        if (empty($info['Sec-WebSocket-Key'])) {
            return false;
        }

        //отправляем заголовок согласно протоколу вебсокета
        $SecWebSocketAccept = base64_encode(pack('H*', sha1($info['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
        $upgrade = "HTTP/1.1 101 Web Socket Protocol Handshakern" .
            "Upgrade: websocketrn" .
            "Connection: Upgradern" .
            "Sec-WebSocket-Accept:$SecWebSocketAcceptrnrn";
        fwrite($client, $upgrade);

        return $info;
    }

    protected function encode($payload, $type = 'text', $masked = false)
    {
        $frameHead = array();
        $payloadLength = strlen($payload);

        switch ($type) {
            case 'text':
                // first byte indicates FIN, Text-Frame (10000001):
                $frameHead[0] = 129;
                break;

            case 'close':
                // first byte indicates FIN, Close Frame(10001000):
                $frameHead[0] = 136;
                break;

            case 'ping':
                // first byte indicates FIN, Ping frame (10001001):
                $frameHead[0] = 137;
                break;

            case 'pong':
                // first byte indicates FIN, Pong frame (10001010):
                $frameHead[0] = 138;
                break;
        }

        // set mask and payload length (using 1, 3 or 9 bytes)
        if ($payloadLength > 65535) {
            $payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
            $frameHead[1] = ($masked === true) ? 255 : 127;
            for ($i = 0; $i < 8; $i++) {
                $frameHead[$i + 2] = bindec($payloadLengthBin[$i]);
            }
            // most significant bit MUST be 0
            if ($frameHead[2] > 127) {
                return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)');
            }
        } elseif ($payloadLength > 125) {
            $payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
            $frameHead[1] = ($masked === true) ? 254 : 126;
            $frameHead[2] = bindec($payloadLengthBin[0]);
            $frameHead[3] = bindec($payloadLengthBin[1]);
        } else {
            $frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength;
        }

        // convert frame-head to string:
        foreach (array_keys($frameHead) as $i) {
            $frameHead[$i] = chr($frameHead[$i]);
        }
        if ($masked === true) {
            // generate a random mask:
            $mask = array();
            for ($i = 0; $i < 4; $i++) {
                $mask[$i] = chr(rand(0, 255));
            }

            $frameHead = array_merge($frameHead, $mask);
        }
        $frame = implode('', $frameHead);

        // append payload to frame:
        for ($i = 0; $i < $payloadLength; $i++) {
            $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
        }

        return $frame;
    }

    protected function decode($data)
    {
        $unmaskedPayload = '';
        $decodedData = array();

        // estimate frame type:
        $firstByteBinary = sprintf('%08b', ord($data[0]));
        $secondByteBinary = sprintf('%08b', ord($data[1]));
        $opcode = bindec(substr($firstByteBinary, 4, 4));
        $isMasked = ($secondByteBinary[0] == '1') ? true : false;
        $payloadLength = ord($data[1]) & 127;

        // unmasked frame is received:
        if (!$isMasked) {
            return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)');
        }

        switch ($opcode) {
            // text frame:
            case 1:
                $decodedData['type'] = 'text';
                break;

            case 2:
                $decodedData['type'] = 'binary';
                break;

            // connection close frame:
            case 8:
                $decodedData['type'] = 'close';
                break;

            // ping frame:
            case 9:
                $decodedData['type'] = 'ping';
                break;

            // pong frame:
            case 10:
                $decodedData['type'] = 'pong';
                break;

            default:
                return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)');
        }

        if ($payloadLength === 126) {
            $mask = substr($data, 4, 4);
            $payloadOffset = 8;
            $dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
        } elseif ($payloadLength === 127) {
            $mask = substr($data, 10, 4);
            $payloadOffset = 14;
            $tmp = '';
            for ($i = 0; $i < 8; $i++) {
                $tmp .= sprintf('%08b', ord($data[$i + 2]));
            }
            $dataLength = bindec($tmp) + $payloadOffset;
            unset($tmp);
        } else {
            $mask = substr($data, 2, 4);
            $payloadOffset = 6;
            $dataLength = $payloadLength + $payloadOffset;
        }

        /**
         * We have to check for large frames here. socket_recv cuts at 1024 bytes
         * so if websocket-frame is > 1024 bytes we have to wait until whole
         * data is transferd.
         */
        if (strlen($data) < $dataLength) {
            return false;
        }

        if ($isMasked) {
            for ($i = $payloadOffset; $i < $dataLength; $i++) {
                $j = $i - $payloadOffset;
                if (isset($data[$i])) {
                    $unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
                }
            }
            $decodedData['payload'] = $unmaskedPayload;
        } else {
            $payloadOffset = $payloadOffset - 4;
            $decodedData['payload'] = substr($data, $payloadOffset);
        }

        return $decodedData;
    }

    abstract protected function onOpen($client, $info);

    abstract protected function onClose($client);

    abstract protected function onMessage($client, $data);

    abstract protected function onSend($data);

    abstract protected function send($data);
}

//пример реализации чата
class WebsocketHandler extends WebsocketWorker
{
    protected function onOpen($client, $info) {//вызывается при соединении с новым клиентом

    }

    protected function onClose($client) {//вызывается при закрытии соединения клиентом

    }

    protected function onMessage($client, $data) {//вызывается при получении сообщения от клиента
        $data = $this->decode($data);
        //var_export($data);
        //шлем всем сообщение, о том, что пишет один из клиентов
        $message = 'пользователь #' . intval($client) . ' (' . $this->pid . '): ' . $data['payload'];
        $this->send($message);

        $this->sendHelper($message);
    }

    protected function onSend($data) {//вызывается при получении сообщения от мастера
        $this->sendHelper($data);
    }

    protected function send($message) {//отправляем сообщение на мастер, чтобы он разослал его на все воркеры
        @fwrite($this->master, $message);
    }

    private function sendHelper($data) {
        $data = $this->encode($data);
        foreach ($this->clients as $client) {
            @fwrite($client, $data);
        }
    }
}

$config = array(
    'host' => '0.0.0.0',
    'port' => 8000,
    'workers' => 16,
);

$WebsocketServer = new WebsocketServer($config);
$WebsocketServer->start();

Автор: morozovsk

Источник [6]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/php-2/53299

Ссылки в тексте:

[1] phpdaemon: http://daemon.io/

[2] ratchet: http://socketo.me/

[3] wamp: http://wamp.ws/

[4] этой статье: http://habrahabr.ru/post/179585/

[5] демонстрационный чат с вышеописанными функциями: http://sharoid.ru/chat.html

[6] Источник: http://habrahabr.ru/post/209864/