Написание многопоточных программ на PHP

в 21:23, , рубрики: fork, php, высокая производительность, параллельное программирование, метки: , ,

Параллельные программы на PHP

В PHP есть ровно один «нормальный» способ писать приложения, которые используют несколько ядер/процессоров — это fork(). О прикладном использовании системного вызова fork() в языке PHP и расширения pcntl я и расскажу. В качестве примера мы напишем достаточно быструю параллельную реализацию grep (со скоростью работы, аналогичной find . -type f -print0 | xargs -0 -P $NUM_PROCS grep $EXPR).

Реализация

Реализация этого системного вызова в PHP очень проста:

PHP_FUNCTION(pcntl_fork) {
	pid_t id;
	id = fork();
	if (id == -1) {
		PCNTL_G(last_error) = errno;
		php_error_docref(NULL TSRMLS_CC, E_WARNING, "Error %d", errno);
	}
	RETURN_LONG((long) id);
}

Что такое системный вызов fork()

Системный вызов fork() в *nix-системах представляет из себя такой системный вызов, который делает полную копию текущего процесса. Системный вызов fork() возвращает своё значение два раза: родитель получает PID потомка, а потомок получает 0. Как ни странно, во многих случаях только этого достаточно для того, чтобы писать приложения, использующие несколько CPU.

$ php -r '$pid = pcntl_fork(); echo posix_getpid() . ": Fork returned $pidn";'
9545: Fork returned 9546
9546: Fork returned 0

Подводные камни при использовании fork()

На самом деле fork() делает свою работу не задумываясь о том, что находится у пользовательского процесса в памяти — он копирует всё, например функции, которые зарегистрированы через atexit (register_shutdown_function). Пример:

$ php -r 'register_shutdown_function(function() { echo "Exited!n"; }); pcntl_fork();'
Exited!
Exited!

К сожалению, PHP в конце выполнения скрипта осуществляет вызов деструкторов (в том числе и внутренних деструкторов ресурсов соединений с базой данных). Пример для расширения mysqli:

<?php
/* test.php */
$conn = new mysqli(..., "mysql") or die("Cannot connectn");
$pid = pcntl_fork();
if ($pid > 0) {
    echo "Parent exitingn";
    exit(0);
}
echo "Sending queryn";
$res = $conn->query("SHOW TABLES") or die("Cannot get query resultn");
print_r($res->fetch_all());

/*
$ php test.php
Parent exiting
Sending query
Warning: mysqli::query(): MySQL server has gone away in test.php on line 9
Warning: mysqli::query(): Error reading result set's header in test.php on line 9
Cannot get query result
*/

Вывод программы будет не обязательно таким, как написано. Иногда потомок «успевает» до исполнения процедуры закрытия соединения в родителе и всё работает, как надо.

Боремся с отложенным исполнением функций / деструкторов

На самом деле, проблему с отложенным исполнением можно решить, если вы точно знаете, что хотите. Например, в Си есть функция _exit(), которая выходит, не запуская никаких установленных обработчиков. К сожалению, в PHP такой функции нет, но её поведение можно частично эмулировать с использованием сигналов:

function _exit() {
    posix_kill(posix_getpid(), SIGTERM);
}

Этого «хака» нам будет достаточно, чтобы соединение с базой оставалось активным для двух PHP-процессов одновременно, хотя лучше, конечно, так на практике не делать :):

<?php
/* test.php */
$conn = new mysqli(..., "mysql") or die("Cannot connectn");
function _exit() {
    posix_kill(posix_getpid(), SIGTERM);
}
function show_tables() {
    global $conn;
    echo "Sending queryn";
    $res = $conn->query("SHOW TABLES") or die("Cannot get query resultn");
    echo "Tables count: " . $res->num_rows . "n";
}
$pid = pcntl_fork();

if ($pid > 0) {
    show_tables();
    _exit();
}

sleep(1);
show_tables();
/*
$ php test.php
Sending query
Tables count: 24
Terminated: 15     <--- это вставляет командный интерпретатор
$ Sending query
Tables count: 24
*/

Пишем grep

Давайте теперь, для примера, напишем простенькую версию grep, которая будет искать по маске в текущей директории.

<?php
/* Пример использования:
$ php grep.php argv
./grep.php:$pattern = "/$argv[1]/m";
*/
exec("find . -type f", $files, $retval); // получаем список всех файлов в директории
$pattern = "/$argv[1]/m";
foreach($files as $file) {
    $fp = fopen($file, "rb");
    // файл с очень большой вероятностью является двоичным, если в нём встречаются нулевые байты
    $is_binary = strpos(fread($fp, 1024), "") !== false;
    fseek($fp, 0);
    if ($is_binary) {
        if (preg_match($pattern, file_get_contents($file))) echo "$file: binary matchesn";
    } else {
        while (false !== ($ln = fgets($fp))) if (preg_match($pattern, $ln)) echo "$file:$ln";
    }
    fclose($fp);
}

Пишем параллельную версию grep

Теперь подумаем, как же можно ускорить данную программу, распараллелив её. Можно легко заметить, что мы можем разделить массив $files (список файлов) на несколько частей и обработать эти части независимо. Причём мы можем так делать во всех случаях, когда у нас есть какой-то большой список задач: просто берем каждый N-ный в соответствующем процессе и обрабатываем его. Поэтому, напишем более-менее общую функцию для этого:

define('PROCESSES_NUM', 2); // задаем количество потоков для обработки
function parallelForeach($arr, $func)
{
    for ($proc_num = 0; $proc_num < PROCESSES_NUM; $proc_num++) {
        $pid = pcntl_fork();
        if ($pid == 0) break;
    }

    if ($pid) {
        for ($i = 0; $i < PROCESSES_NUM; $i++) pcntl_wait($status);
        return;
    }

    // обходим каждый PROCESSES_NUM элемент массива и обрабатываем его
    $l = count($arr);
    for ($i = $proc_num; $i < $l; $i += PROCESSES_NUM) $func($arr[$i]);
    exit(0);
}

Осталось заменить foreach() на использование нашей функции parallelForeach и добавить обработку ошибок:

Полный исходный текст

<?php
/* parallel-grep.php */
define('PROCESSES_NUM', 2);

if ($argc != 2) {
    fwrite(STDERR, "Usage: $argv[0] <pattern>n");
    exit(1);
}

grep($argv[1]);

function grep($pattern)
{
    exec("find . -type f", $files, $retval);
    if ($retval) exit($retval);

    $pattern = "/$pattern/m";
    if (false === preg_match($pattern, '123')) {
        fwrite(STDERR, "Incorrect regular expressionn");
        exit(1);
    }

    parallelForeach($files, function($f) use ($pattern) { grepFile($pattern, $f); });
    exit(0);
}

function grepFile($pattern, $file)
{
    $fp = fopen($file, "rb");
    if (!$fp) {
        fwrite(STDERR, "Cannot read $filen");
        return;
    }

    $binary = strpos(fread($fp, 1024), "") !== false;
    fseek($fp, 0);
    if ($binary) {
        if (preg_match($pattern, file_get_contents($file))) echo "$file: binary matchesn";
    } else {
        while (false !== ($ln = fgets($fp))) {
            if (preg_match($pattern, $ln)) echo "$file:$ln";
        }
    }

    fclose($fp);
}

function parallelForeach($arr, $func)
{
    for ($proc_num = 0; $proc_num < PROCESSES_NUM; $proc_num++) {
        $pid = pcntl_fork();
        if ($pid < 0) {
            fwrite(STDERR, "Cannot forkn");
            exit(1);
        }
        if ($pid == 0) break;
    }

    if ($pid) {
        for ($i = 0; $i < PROCESSES_NUM; $i++) {
            pcntl_wait($status);
            $exitcode = pcntl_wexitstatus($status);
            if ($exitcode) exit(1);
        }
        return;
    }

    $l = count($arr);
    for ($i = $proc_num; $i < $l; $i += PROCESSES_NUM) $func($arr[$i]);
    exit(0);
}

Проверим работу нашего грепа на исходном коде PHP 5.3.10:

$ php ~/parallel-grep.php '^PHP_FUNCTION' | head
./ext/calendar/calendar.c:PHP_FUNCTION(cal_info)
./ext/calendar/calendar.c:PHP_FUNCTION(cal_days_in_month)
./ext/calendar/calendar.c:PHP_FUNCTION(cal_to_jd)
./ext/calendar/calendar.c:PHP_FUNCTION(cal_from_jd)
./ext/calendar/calendar.c:PHP_FUNCTION(jdtogregorian)
./ext/calendar/calendar.c:PHP_FUNCTION(gregoriantojd)
./ext/calendar/calendar.c:PHP_FUNCTION(jdtojulian)
./ext/calendar/calendar.c:PHP_FUNCTION(juliantojd)
./ext/calendar/calendar.c:PHP_FUNCTION(jdtojewish)
./ext/calendar/calendar.c:PHP_FUNCTION(jewishtojd)

$ time php ~/parallel-grep.php '^PHP_FUNCTION' | wc -l
    4056

real	0m2.073s
user	0m3.265s
sys	0m0.550s

$ time grep -R '^PHP_FUNCTION' . | wc -l
    4056

real	0m3.646s
user	0m3.415s
sys	0m0.209s

$ time find . -type f -print0 | xargs -0 -P 2 grep '^PHP_FUNCTION' | wc -l
    4056

real	0m1.895s
user	0m3.247s
sys	0m0.249s

Работает! Я описал один из часто используемых паттернов при параллельном программировании на PHP — параллельная обработка очереди из задач. Надеюсь, моя статья кому-нибудь поможет перестать бояться писать многопоточные приложения на PHP, если задача допускает такую декомпозицию, как в примере с грепом. Спасибо.

Автор: youROCK

* - обязательные к заполнению поля