Простой сервер задач с очередью в MySQL (без проблем с блокировками)

в 16:47, , рубрики: mysql, php, метки: ,

Простой сервер задач с очередью в MySQL (без проблем с блокировками) Почти в каждом более менее динамическом проекте бывает возникает необходимость выполнять очереди задач в фоне (отправка email, обновления кеша, реиндексация поиска и т.д.). Job сервера (Gearman и т.п.) хороши, но для большинства простых задач они избыточны. Классическая реализация очередей в MySQL (при помощи SELECT … LOCK FOR UPDATE) при росте нагрузки со временем начинает приводить к проблемам с блокировкой. Потому, как это обычно бывает, пришлось написать свой «велосипед» для работы с фоновыми задачами, который бы «точно работал» и был предельно прост.

Основа: Cron, PHP 5.3 (mysqli), MySQL > 5.1 — легко «влепить» почти на любой хостинг.
Операция получения (захвата) задачи — атомарна (один UPDATE запрос). Никаких проблем с блокировкой и RC.
Возможность распределения воркерам задач по группам и приоритетам, передача массива данных в исполняемый метод (функцию).
Три режима обработки завершенных задач: переместить запись в отдельную таблицу, удалить запись, оставить запись и отметить как успешно обработанная.
Обработка незавершенных задач или задач, обработанных с ошибкой — на совести разработчика.
На всё про всё 400 строк кода (с полными PHPDOC).
Ограничения: текущая реализация не подходит для persistent соединений, но если кому-то потребуется, несложно допилить. Даже при желании переписать на другой язык :)

Возможность неблокирующей работы с очередью реализована через использование пользовательских переменных в UPDATE запросе с их последующей выборкой. Посвящать этому приему целую статью — глупо. Гораздо приятнее конечная реализация, которую можно применить в дело (Мы же с вами практики, не так ли?). Во всём остальном исключительно классическая очередь с группами и приоритетами.

Пример использования (клиент):

$task_server = DBTaskServer::create('localhost', 'root', '', 'testDB', 'jobs_queue');
$task_server->addTask('mywork', $data);

mywork — функция, которая должна быть доступна воркеру. В нее будет передан массив $data. Также возможно указывать вызов статических методов класса.

$task_server->addTask('MyWork::doWork', $data);

Пример воркера:

DBTaskServer::create('localhost', 'root', '', 'testDB', 'jobs_queue') // Создаем сервер.
		->setByCLIAgruments($argv) // Устанавливаем параметры вызова из консоли.
		->setMode(DBTaskServer::MODE_MARK_AS_COMPLETED) // Выбираем режим обработки.
		->run(); // Запускам воркера.

Запуск воркера из консоли с параметрами:

/path/to/script/worker.php [max_tasks_per_lifecycle] [comma_separated_group_ids]

Как понятно из названия, первая опция говорит о том сколько максимум задач может выполнить воркер прежде чем завершит работу (если конечно таковые для него будут доступны), вторая опция — это значения group_id заданий, которые данный воркер должен обрабатывать. Если группы не указаны, то воркер обрабатывает любые группы.

Например:

/path/to/script/worker.php 100 3,5,6

Выполнить 100 заданий из групп 3, 5 и 6.
Если заданий не будет найдено, то воркер сразу завершит свою работу.

Добавляем воркера в крон:

0-59/5 * * * * /path/to/script/worker.php 5 3 >/dev/null 2>&1

Каждые 5 минут обрабатывать по 5 заданий с group_id=3.

В архиве примеры клиента, воркера, сам класс сервера (задокументирован), sql файл с таблицей задач.
Качать тут (аж целых 5kB).

Приятного вам кода.

Автор: evgenyl

  1. Евгений:

    Здравствуйте! Заинтересовала ваша разработка, не могли бы вы пожалуйста выложить файл с архивом заново. Яндекс удалил уже за истечением срока давности.
    Заранее спасибо!

* - обязательные к заполнению поля