Реализация многопоточного сервера на PHP

в 8:10, , рубрики: php, php socket server, метки:

Данная публикация не претендует на полноту решения поставленного вопроса. Сервер разрабатывается исключительно в ознакомительных целях. Многие важные вопросы, такие как, например, обработка ошибок сокетов, опущены. Для реализации многопоточного сервера мы будем использовать, конечно же, потоки. Очень часто приходится видеть фразу, что, мол, в PHP потоков нет. Так вот это неправда. Потоки есть, но реализованы в отдельном расширении pthreads.

Для начала нам понадобится сборка PHP, скомпилированная с флагом thread safety. Я использую Windows для работы, поэтому скачал готовый пакет здесь. Нужно лишь правильно выбрать разрядность ОС, нужную версию PHP и, конечно же, Thread Safe версию. На протяжении статьи будет предполагаться, что архив с PHP мы распаковали в C:php директорию. Далее нам нужно установить расширение pthreads. Идем сюда и выбираем версию, соответствующую скачанной версии PHP и разрядности системы. Из архива копируем файл php_pthreads.dll в директорию C:phpext и файл pthreadVC2.dll в директории C:php и C:WindowsSystem32. В директории C:php переименовываем файл php.ini-development в php.ini и добавляем в него такую строку:

extension=php_pthreads.dll

Также находим и раскоменчиваем директиву extension_dir и выставляем ей значение «C:phpext» (у меня в версии PHP7 относительные пути не заработали). Открываем командную строку и проверяем:

C:phpphp.exe -v

В конце первой строки вывода мы должны увидеть пометку (ZTS). Переходим непосредственно к реализации сервера. Создаем файл (в моём случае он будет располагаться по адресу C:server.php. Для начала создадим сокет, который будет слушать порт 8080 на нашей локальной машине.

$server = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_bind($server, '127.0.0.1', 8080);
socket_listen($server);

Далее создаём пул воркеров.

$pool = new Pool(10, Worker::class);

Первый аргумент устанавливает максимальное количество действующих потоков, второй имя класса воркера. Для каких либо более определенных задач можно описать свой класс, унаследовав его от класса Worker. Мы же будем использовать оригинальный класс. Забегая вперед скажу, что в классе потока установленный воркер можно получить через $this->worker.

Далее реализуем класс, который будет выполняться в отдельном потоке. Класс должен наследоваться от Threaded.

class Task extends Threaded
{
    protected $socket;

    public function __construct($socket)
    {
        $this->socket = $socket;
    }

    public function run()
    {
        if (!empty($this->socket)) {
            $response = "HTTP/1.1 200 OKrnContent-Type: text/htmlrnContent-Length: 12rnrnHello world!";
            socket_write($this->socket, $response, strlen($response));
            // при попытке закрытия сокета я получаю ошибку zend_mm_heap currupted, поэтому эту часть в тестовом решении опускаю 
            //socket_close($this->socket);
        }
    }
}

Наш класс принимает в конструкторе сокет соединения с клиентом. Так же действия, выполняемые в потоке, должны быть описаны в методе run(). В моем случае это ответ клиенту базовых заголовков и текста «Hello world!».

Далее мы будем циклично пытаться принимать соединения от клиента, и, в случае успеха, создавать отдельный поток и передавать туда дескриптор сокета.


$servers = [$server];

while (true) {
    $read = $servers;

    if (socket_select($read, $write, $except, 0) >= 0 && in_array($server, $read)) {
        $task = new Task(socket_accept($server));
        $pool->submit($task);
    }
}

Поскольку мы используем бесконечный цикл, я зарегистрирую функцию, которая выполнится при завершении работы скрипта и остановит работу пула. Функцию следует регистрировать до начала цикла.


register_shutdown_function(function () use ($server, $pool) {
    if (!empty($server)) {
        socket_close($server);
    }

    $pool->shutdown();
});

Собственно всё. Запускаем сервер в командной строке и пробуем открыть в браузере localhost:8080.

cd C:
C:phpphp.exe server.php

Ниже привожу полный код сервера.


<?php

$server = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_bind($server, '127.0.0.1', 8080);
socket_listen($server);

$pool = new Pool(10, Worker::class);

class Task extends Threaded
{
    protected $socket;

    public function __construct($socket)
    {
        $this->socket = $socket;
    }

    public function run()
    {
        if (!empty($this->socket)) {
            $response = "HTTP/1.1 200 OKrnContent-Type: text/htmlrnContent-Length: 12rnrnHello world!";
            socket_write($this->socket, $response, strlen($response));
        }
    }
}

register_shutdown_function(function () use ($server, $pool) {
    if (!empty($server)) {
        socket_close($server);
    }

    $pool->shutdown();
});

$servers = [$server];

while (true) {
    $read = $servers;

    if (socket_select($read, $write, $except, 0) >= 0 && in_array($server, $read)) {
        $task = new Task(socket_accept($server));
        $pool->submit($task);
    }
}

Спасибо за внимание!

Автор: beldeveloper

Источник

Поделиться новостью

* - обязательные к заполнению поля