- PVSM.RU - https://www.pvsm.ru -

Очередь задач в PostgreSQL

Очередь слонов - pixabay.com

Для организации обработки потока задач используются очереди. Они нужны для накопления и распределения задач по исполнителям. Также очереди могут обеспечивать дополнительные требования к обработке задач: гарантия доставки, гарантия однократного исполнения, приоритезация и т. д.

Как правило, используются готовые системы очередей сообщений (MQ — message queue), но иногда нужно организовать ad hoc очередь или какую-нибудь специализированную (например, очередь с приоритетом и отложенным перезапуском не обработанных из-за исключений задач). О создании таких очередей и пойдёт речь ниже.

Ограничения применимости

Предлагаемые решения предназначены для обработки потока однотипных задач. Они не подходят для организации pub/sub или обмена сообщениями между слабо связанными системами и компонентами.

Очередь поверх реляционной БД хорошо работает при малых и средних нагрузках (сотни тысяч задач в сутки, десятки-сотни исполнителей), но для больших потоков лучше использовать специализированное решение.

Суть метода в пяти словах

select ... for update skip locked

Базовая очередь

Для простоты здесь и далее в таблице будут храниться только уникальные идентификаторы задач. Добавление какой-нибудь полезной нагрузки не должно составить труда.

Таблица для простейшей очереди содержит саму задачу и её статус:

create table task
(
    id          bigint not null primary key,
    status      integer not null default 0      -- 0 - новая, 1 - в работе, 2 - выполнена
);

create index task__status__idx on task (status);

Добавление задачи:

insert into task (id) values ($1) on conflict (id) do nothing;

Получение следующей задачи:

with next_task as (
    select id from task
    where status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;

Завершение задачи:

update task
set
    status = 2
where id = $1;

Очередь с приоритетами

В простом случае id задачи является её приоритетом. Меняется только запрос на получение следующей задачи — добавляется условие сортировки order by id с требуемым порядком обработки задач. Также нужно создать составной индекс по (status, id).

Либо для приоритета добавляется отдельный столбец:

create table task
(
    id          bigint not null primary key,
    priority    integer not null,
    status      integer not null default 0      -- 0 - новая, 1 - в работе, 2 - выполнена
);

create index task__status__priority__idx on task (status, priority);

Добавление задачи:

insert into task (id, priority) values ($1, $2) on conflict (id) do nothing;

Получение следующей задачи:

with next_task as (
    select id from task
    where status = 0
    order by priority
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;

Выделенный столбец позволяет менять приоритет задачи "на лету".

Очередь с повтором "упавших" задач

В процессе выполнения задачи может произойти ошибка или исключительная ситуация. В таких случаях задачу нужно поставить в очередь повторно. Иногда ещё требуется отложить и время её повторного исполнения на некоторое время, например, если исключение связано с временной недоступностью стороннего сервиса.

create table task
(
    id          bigint not null primary key,
    status      integer not null default 0,     -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
    attempt     integer not null default 0,
    delayed_to  timestamp null,
    error_text  text null
);

create index task__status__delayed_to__idx on task (status, delayed_to);

Как видно, расширился список статусов и добавились новые столбцы:

  • attempt — номер попытки; нужен для принятия решения о необходимости повтора (ограничение количества попыток) и для выбора задержки перед повтором (например, каждая следующая попытка откладывается на 10 * attempt минут);
  • delayed_to — время следующей попытки выполнения задачи;
  • error_text — текст ошибки.

Текст ошибки нужен для группировки по типам ошибки.

Пример. Система мониторинга сообщает, что в очереди скопились тысячи задач со статусом "ошибка". Выполняем запрос:

select error_text, count(*) from task where status = 3 group by 1 order by 2 desc;

За подробностями идём в логи исполнителей. Исправляем ситуацию, вызвавшую ошибки (если это возможно). При необходимости ускоряем перезапуск задач установкой статуса в 0 или сдвигом времени следующей попытки.

Получение следующей новой задачи:

with next_task as (
    select id from task
    where status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1,
    attempt = attempt + 1,
    delayed_to = null,
    error_text = null
from next_task
where task.id = next_task.id
returning task.id;

Получение следующей отложенной из-за ошибки задачи:

with next_task as (
    select id from task
    where status = 3
      and delayed_to < localtimestamp
    limit 1
    for update skip locked
)
update task
set
    status = 1,
    attempt = attempt + 1,
    delayed_to = null,
    error_text = null
from next_task
where task.id = next_task.id
returning task.id;

Успешное завершение задачи:

update task
set
    status = 2,
    delayed_to = null,
    error_text = null
where id = $1;

Задача завершилась с ошибкой, будет повтор через (5 * количество попыток) минут:

update task
set
    status = 3,
    delayed_to = localtimestamp + make_interval(mins => 5 * attempt),
    error_text = $2
where id = $1;

Задача завершилась с фатальной ошибкой, повтора не будет:

update task
set
    status = 4,
    delayed_to = null,
    error_text = $2
where id = $1;

Запрос получения следующей задачи разделён на два, чтобы СУБД могла построить эффективный план запроса для очереди с приоритетом. Условие отбора с or может очень плохо сочетаться с сортировкой order by.

Сбор метрик

Добавляем такие атрибуты:

  • время создания задачи;
  • время изменения задачи;
  • время начала и завершения выполнения задачи.

create table task
(
    id          bigint not null primary key,
    status      integer not null default 0,     -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
    attempt     integer not null default 0,
    begin_time  timestamp null,
    end_time    timestamp null,
    delayed_to  timestamp null,
    error_text  text null,
    created     timestamp not null default localtimestamp,
    updated     timestamp not null default localtimestamp
);

create index task__status__delayed_to__idx on task (status, delayed_to);
create index task__updated__idx on task (updated);

Учитываем добавленные столбцы во всех запросах.

Получение следующей новой задачи:

with next_task as (
    select id from task
    where status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1,
    attempt = attempt + 1,
    begin_time = localtimestamp,
    end_time = null,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
from next_task
where task.id = next_task.id
returning task.id;

Получение следующей отложенной из-за ошибки задачи:

with next_task as (
    select id from task
    where status = 3
      and delayed_to < localtimestamp
    limit 1
    for update skip locked
)
update task
set
    status = 1,
    attempt = attempt + 1,
    begin_time = localtimestamp,
    end_time = null,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
from next_task
where task.id = next_task.id
returning task.id;

Успешное завершение задачи:

update task
set
    status = 2,
    end_time = localtimestamp,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
where id = $1;

Задача завершилась с ошибкой, будет повтор через (5 * количество попыток) минут:

update task
set
    status = 3,
    end_time = localtimestamp,
    delayed_to = localtimestamp + make_interval(mins => 5 * attempt),
    error_text = $2,
    updated = localtimestamp
where id = $1;

Задача завершилась с фатальной ошибкой, повтора не будет:

update task
set
    status = 4,
    end_time = localtimestamp,
    delayed_to = null,
    error_text = $2,
    updated = localtimestamp
where id = $1;

Примеры, для чего это может быть нужно

Поиск и перезапуск повисших задач:

update task
set
    status = 3,
    end_time = localtimestamp,
    delayed_to = localtimestamp,
    error_text = 'hanged',
    updated = localtimestamp
where status = 1
  and updated < localtimestamp - interval '1 hour';

Удаление старых задач:

delete from task
where updated < localtimestamp - interval '30 days';

Статистика по выполнению задач:

select
    date_trunc('hour', end_time),
    count(*),
    sum(end_time - begin_time),
    avg(end_time - begin_time)
from task
where status = 2
  and end_time >= '2019-12-16'
group by 1
order by 1;

Повторный запуск ранее выполненных задач

Например, обновился документ, нужно его переиндексировать для полнотекстового поиска.

create table task
(
    id              bigint not null primary key,
    task_updated_at timestamp not null default localtimstamp,
    status          integer not null default 0,     -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
    begin_time      timestamp null,
    end_time        timestamp null,
    delayed_to      timestamp null,
    error_text      text null,
    created         timestamp not null default localtimestamp,
    updated         timestamp not null default localtimestamp
);

Здесь для времени обновления задачи добавлен столбец task_updated_at, но можно было бы использовать поле created.

Добавление или обновление (перезапуск) задачи:

insert into task (id, task_updated_at) values ($1, $2)
on conflict (id) do update
set
    task_updated_at = excluded.task_updated_at,
    status = case when status = 1 then 1 else 0 end,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
where task_updated_at < excluded.task_updated_at;

Что здесь происходит. Задача становится "новой", если она сейчас не исполняется.

В запросе завершения задачи также будет проверка, была ли она изменена во время исполнения.

Запросы на получение следующей задачи такие же, как в очереди со сбором метрик.

Успешное завершение задачи:

update task
set
    status = case when begin_time >= updated then 2 else 0 end,
    end_time = localtimestamp,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
where id = $1;

Завершение задачи с ошибкой: в зависимости от задачи. Можно сделать безусловное откладывание перезапуска, можно при обновлении ставить статус "новая".

Pipeline

Задача проходит несколько стадий. Можно для каждой стадии сделать отдельную очередь. А можно в таблицу добавить соответствующий столбец.

Пример на основе базовой очереди, чтобы не загромождать код. Все ранее описанные модификации без проблем применимы и к этой очереди.

create table task
(
    id      bigint not null primary key,
    stage   integer not null default 0,
    status  integer not null default 0
);

create index task__stage__status__idx on task (stage, status);

Получение следующей задачи на заданной стадии:

with next_task as (
    select id from task
    where stage = $1
      and status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;

Завершение задачи с переходом на указанную стадию:

update task
set
    stage = $2,
    status = 2
where id = $1;

Или переход на следующую по порядку стадию:

update task
set
    stage = stage + 1,
    status = 2
where id = $1;

Задачи по расписанию

Это вариация очереди с повтором.

У каждой задачи может быть своё расписание (в простейшем варианте — периодичность запуска).

create table task
(
    id              bigint not null primary key,
    period          integer not null,               -- периодичность запуска в секундах
    status          integer not null default 0,     -- 0 - новая, 1 - в работе
    next_run_time   timestamp not null default localtimestamp
);

create index task__status__next_run_time__idx on task (status, next_run_time);

Добавление задачи:

insert into task (id, period, next_run_time) values ($1, $2, $3);

Получение следующей задачи:

with next_task as (
    select id from task
    where status = 0
      and next_run_time <= localtimestamp
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;

Завершение задачи и планирование следующего запуска:

update task
set
    status = 0,
    next_run_time = next_run_time + make_interval(secs => period)
where id = $1

Вместо заключения

В создании специализированной очереди задач средствами РСУБД нет ничего сложного.

"Самопальная" очередь будет отвечать даже самым диким практически любым требованиям бизнеса/предметной области.

Ну и не следует забывать, что как и любая другая БД, очередь требует вдумчивого тюнинга сервера, запросов и индексов.

Автор: yakov-bakhmatov

Источник [1]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/programmirovanie/340914

Ссылки в тексте:

[1] Источник: https://habr.com/ru/post/481556/?utm_source=habrahabr&utm_medium=rss&utm_campaign=481556