Что умеет планировщик заданий в Postgres Pro

в 15:05, , рубрики: open source, postgresql, sql, базы данных, Блог компании Postgres Professional

Планировщик заданий (scheduler) не во все времена считался обязательным инструментом в мире баз данных. Все зависело от назначения и происхождения СУБД. Классические коммерческие СУБД (Oracle, DB2, MS SQL) представить себе без планировщика решительно невозможно. С другой стороны, трудно вообразить потенциального пользователя MongoDB, который откажется от выбора этой модной NoSQL-СУБД из-за отсутствия планировщика. (Кстати, термин «планировщик заданий» в русском контексте СУБД употребляют, чтобы отличить его от планировщика запросов — query planner, мы же для краткости будем звать его здесь планировщиком).

PostgreSQL, будучи Open Source и впитав традиции сообщества с образом жизни DIY («сделай сам»), в наше время регулярно претендует на место как минимум заместителя коммерческой СУБД. Из этого автоматически следует, что PostgreSQL просто обязана иметь планировщик, и что этот планировщик должен быть удобен для администратора базы и для пользователя. И что желательно воспроизвести полностью функциональные возможности коммерческих СУБД, хотя неплохо было бы и добавить что-то свое.

Необходимость в планировщике очевиднее всего проявляется при работе с базой в промышленной эксплуатации. Разработчику, которому выделили сервер для экспериментов с БД, планировщик, в общем-то и ни к чему: если нужно, он сам, средствами ОС (cron или at в Unix) распланирует все необходимые операции. Но к рабочей базе его в серьезной фирме не подпустят на пушечный выстрел. Есть и важный административный нюанс, то есть уже не нюанс, а серьезная, если не решающая причина: администратор базы данных и сисадмин не просто разные люди с разными задачами. Не исключено, что они принадлежат к разным подразделениям компании, и, может быть, даже сидят на разных этажах. В идеале администратор базы поддерживает ее жизнеспособность и следит за ее эволюцией, а зона ответственности сисадмина — жизнеспособность ОС и сети.

Следовательно, у администратора базы должен быть инструмент для выполнения необходимого набора возможных работ на сервере. Недаром в материалах о планировщике Oracle сказано, что «Oracle Scheduler отменяет необходимость использовать специфичные для разных платформ планировщики заданий ОС (cron, at) при построении БД-центричного приложения». То есть админ базы может всё, тем более, что трудно себе представить админа Oracle, не ориентирующегося в механизмах ОС. Ему не надо каждый раз бежать к сисадмину или писать ему письма, когда требуются рутинные операции средствами ОС.

Вот требования к планировщику, типичные для коммерческих СУБД, таких как Oracle, DB2, MS SQL:

Планировщик должен уметь

  • запускать работы по расписанию,
  • контролировать выполнение работ, уметь снимать задания, если это необходимо,
  • запускать задания в ограниченном промежутке времени (в окне),
  • выстраивать последовательности заданий (следующее начинает выполняться после завершения предыдущего),
  • уметь выполнить несколько запросов в одной транзакции,
  • задание, определенное в одной БД, запускать на нескольких,
  • использовать (основные) возможности ОС,
  • оповещать администратора, если какие-то задания из расписания не были завершены,
  • исполнять разовые задания.

Последний пункт как будто не очевиден: есть же немало других, штатных средств кроме планировщика, способных запускать разовые задания. Но речь идет о не совсем обычном режиме выполнения. Например, режиме detached job: мы говорим о задании, которое как бы отключается (на время или навсегда) от вызвавшего его процесса. Сделав работу, отключившийся процесс может вновь связываться с запустившим его процессом (послав ему сигнал об успешном или неудачном завершении), сообщить ему результат или записать результат (в файл или таблицу БД). Некоторые СУБД-планировщики умеют останавливать и запускать саму СУБД (мы такой задачи не ставили).

PostgreSQL и его агент

Решать поставленные задачи можно по-разному: «вне» и «внутри» самой СУБД. Самая серьезная попытка сделать полнофункциональный планировщик — это pgAgent, распространяемый вместе с pgAdmin III/IV. В коммерческом варианте — в дистрибутиве EnterpriseDB — он интегрирован в графический интерфейс pgAdmin и может использоваться кроссплатформенно.

pgAgent умеет:

  • запускать задания,
  • запускать последовательности заданий, состоящих из SQL-скриптов (в т.ч. на разных БД) и/или shell/batch-скриптов,
  • задавать нерабочие окна (например, НЕ совершать некоторое действие по выходным).

Этот планировщик работает как расширение PostgreSQL, но исполняет задания не «внутри» СУБД, а создавая собственных, «внешних» демонов.

У такого подхода есть недостатки. Среди них важные:

Все задания, запущенные pgAgent, будут исполняться с правами пользователя, запустившего агента. SQL-запросы будут исполняться с правами пользователя, соединившегося с базой. Скрипты shell будут исполняться с правами пользователя, от имени которого запущен демон (или сервис в Windows) pgAgent. Поэтому для безопасности придется контролировать пользователей, которые могут создавать и запускать задания. Кроме того, пароль нельзя включать в строку конфигурации соединения (connection string), так как в Unix он будет виден в выводе команды ps и в скрипте старта БД, а в Windows будет хранится в реестре как незашифрованный текст.
(из документации pgAdmin 4 1.6).

В этом решении pgAgent через заданные промежутки времени опрашивает сервер базы (поскольку информация о работах хранится в таблицах базы): нет ли в наличии работ. Поэтому, если по каким-то причинам агент не будет работать в момент, когда работа должна запуститься, она не запустится до тех пор, пока агент не заработает.

К тому же любое подключение к серверу расходует пул возможных соединений, максимальное количество которых определяется конфигурационным параметром max_connections. Если агент порождает много процессов, а администратор не проявил должную бдительность, это может стать проблемой.

Создание планировщика целиком интегрированного в СУБД («внутри» СУБД) избавляет от этих проблем. И особенно удобно это тем пользователям, которые привыкли к минималистским интерфейсам для обращения к базе, таким как psql.

pgpro_scheduler и его расписание

В конце 2016 года в компании Postgres Professional приступили к созданию собственного планировщика, полностью интегрированного в СУБД. Сейчас он используется заказчиками и подробно документирован. Планировщик был создан как расширение (дополнительный модуль), получил название pgpro_scheduler и поставляется в составе коммерческой версии Postgres Pro Enterprise начиная с первой же ее версии. Разработчик — Владимир Ершов.

При его установке в конфигурационных файлах СУБД надо не забыть включить в конфигурационный файл shared_preload_libraries = 'pgpro_scheduler'. Установив расширение (CREATE EXTENSION pgpro_scheduler;), надо включить его строкой в конфигурационном файле (schedule.enabled = on) и дать перечислить, какие базы подпадут под действие планировщика (например schedule.database = 'database1,database2').

С самого начала решено было создавать pgpro_scheduler в современном стиле, органичном для компании — с записью конфигурации в JSON. Это удобно, например, для создателей Web-сервисов, которые смогут интегрировать планировщик в свои приложения. Но для не желающих использовать JSON, есть функции, принимающие параметры в виде обычных переменных. Планировщик поставляется вместе с дистрибутивом СУБД и он кросс-платформенный.

pgpro_scheduler не запускает внешних демонов или сервисов, а создает дочерние по отношению к postmaster процессы background worker — фоновые процессы. Количество «рабочих» задается в конфигурации pgpro_scheduler, но ограничивается общей конфигурацией сервера. На вход планировщика фактически поступают самые обычные команды SQL, без всяких ограничений, поэтому можно запускать функции на любых доступных Postgres языках. Если в структуру JSON входит несколько SQL-запросов, то они могут (при следовании определенному синтаксису) исполняться внутри единой транзакции:

SELECT schedule.create_job('{"commands": [ "SELECT 1", "SELECT 2", "SELECT 3"], "cron": "23 23 */2 * *" }');

это эквивалентно:

SELECT schedule.create_job('{"commands": [ "SELECT 1", "SELECT 2", "SELECT 3"], "cron": "23 23 */2 * *","use_same_transaction": true}');
а если каждый запрос в своей транзакции, то:

SELECT schedule.create_job('{"commands": [ "SELECT 1", "SELECT 2", "SELECT 3" ], "cron": "23 23 */2 * *" }'); — то есть без последнего параметра, по умолчанию.

Допустим, во второй по списку команде произойдет ошибка (конечно, в SELECT 2 она произойдет вряд ли, но вообразим себе какой-нибудь «стремный» запрос). В случае исполнения в одной транзакции все результаты откатятся, но в логе планировщика появится сообщение о крахе второй команды. То же сообщение появится и в случае исполнения раздельных транзакций, но результат первой будет сохранен (третья транзакция не будет исполнена).

При запуске pgpro_scheduler всегда приступает к работе группа фоновых процессов-рабочих (background workers) со своей иерархией: один рабочий, в чине супервизора планировщика, контролирует рабочих в чине менеджеров баз данных — по одному на каждую базу данных, прописанную в строке конфигурации. Менеджеры, в свою очередь, контролируют рабочих, непосредственно обслуживающих задания. Супервизор и менеджеры довольно легкие процессы, поэтому если планировщик обслуживают даже десятки баз, это не сказывается на общей загрузке системы. И далее рабочие запускаются в каждой базе по потребностям в обработке запросов. В сумме они должны укладываться в ограничение СУБД max_worker_processes. Группа команд для мгновенного исполнения заданий пользуется ресурсами по-другому, но об этом позже.

Что умеет планировщик заданий в Postgres Pro - 1
Рис.1 Основной режим работы pgpro_scheduler

pgpro_scheduler это расширение (extension) Postgres. Следовательно, оно устанавливается на конкретную базу данных. При этом создается несколько системных таблиц в схеме schedule, по умолчанию они не видны пользователю. База данных знает теперь 2 новых, специальных типа данных: cron_rec и cron_job, с которыми можно будет работать через SQL-запросы. Есть таблица-лог, которая не дублирует журнал СУБД. Информация об успешной или неуспешной работе заданий планировщика доступна только через функции расширения pgpro_scheduler. Это сделано для того, чтобы один пользователь планировщика не знал о деятельности другого пользователя планировщика. Функции дают возможность избирательного просмотра лога, начиная с определенной даты, например:

SELECT * from schedule.get_user_log() WHERE started > now() - INTERVAL '1 day';

Создать задание, используя JSON, можно при помощи функции schedule.create_job(data jsonb). В единственном аргументе этой функции передаётся объект JSONB с информацией о задании. Примеры будут дальше.

Этот объект может содержать следующие ключи, некоторые из которых могут быть опущены:

  • name — имя задания;
  • node — имя узла (на случай работы в архитектуре multimaster);
  • command — набор SQL-запросов, которые будут выполнены, задаются в виде массива;
  • run_as — пользователь, от имени которого будут выполняться команды;

Возможны на выбор виды представления расписания: для тех, кто привык к cron — строка в стиле crontab, задающая график выполнения. Но можно воспользоваться rule — тогда расписание будет представлено в виде объекта JSONB (см. описание ниже). Еще один вариант: date — набор определённых дат, на которые запланировано выполнение команд. Их можно комбинировать, но хотя бы один вариант нужно задействовать. Выглядеть это будет, например, так: "cron":"55 7 * * *" — из примера, который можно увидеть ниже.
Кроме того есть еще немало полезных параметров, о которых можно прочитать в документации. Среди них:

  • start_date и end_date — начало и конец интервала, в котором возможно выполнение запланированной команды (может быть NULL);
  • max_instances — максимальное число экземпляров задания, которые могут быть запущены одновременно. 1 по умолчанию;
  • max_run_time — определяет максимальную длительность выполнения задания. Задаётся в формате типа interval. Если это поле содержит NULL или не задано, время не ограничивается. Значение по умолчанию — NULL;
  • onrollback — SQL-запрос, который будет выполняться при сбое основной транзакции. По умолчанию запрос не определён;
  • next_time_statement — SQL-запрос, который будет выполнен для вычисления следующего времени запуска задания. Он должен обязательно возвращать значение в формате timestamp with time zone;

Расписание можно задать в виде строки в стиле crontab (ключ cron) или в виде объекта JSONB (ключ rule). Они могут содержать следующие ключи:

  • minutes — минуты; массив целых чисел в диапазоне 0… 59;
  • hours — часы; массив целых чисел в диапазоне 0… 23;
  • days — дни месяца; массив целых чисел в диапазоне 1… 31;
  • months — месяцы; массив целых чисел в диапазоне 1… 12;
  • wdays — дни недели; массив целых чисел в диапазоне 0… 6 (0 — воскресенье);
  • onstart — целое значение 0 или 1; если это значение равно 1, задание будет выполняться только один раз при запуске планировщика.

Задание может также быть запланировано на конкретную дату или на набор дат. То есть в принципе задание может быть одноразовым, хотя для одноразовых заданий можно использовать и специальный режим one-time job с другими вызовами функций.

Что умеет планировщик заданий в Postgres Pro - 2
Схема 1. Иерархия процессов планировщика

Поле next_time_statement может содержать SQL-запрос, который будет выполняться после основной транзакции для вычисления времени следующего запуска. Если этот ключ определён, время первого запуска задания будет рассчитано по методам, описанным выше, но следующий запуск будет запланирован на то время, которое вернёт этот запрос. Данный запрос должен вернуть запись, содержащую в первом поле значение типа timestamp with time zone. Если возвращаемое значение имеет другой тип или при выполнении запроса происходит ошибка, задание помечается как давшее сбой, и дальнейшее его выполнение отменяется.

Этот запрос будет выполняться при любом состоянии завершения основной транзакции. Получить состояние завершения транзакции в нём можно из переменной Postgres Pro Enterprise schedule.transaction_state:

  • success — транзакция завершилась успешно
  • failure — транзакция завершилась с ошибкой

Как видно даже из сокращенного описания, набор возможностей богат. Всего же функций, работающих с приложением pgpro_scheduler около 40. Можно формировать задания, отменять их, смотреть их статус, фильтровать информацию о заданиях по пользователям и по другим критериям.

Один раз, зато без очереди

Как говорилось, есть еще важный класс задач для планировщика: формирование отдельных, непериодических заданий, использующих механизм one-time job. Если не задан параметр run_after, то в этом режиме планировщик умеет приступать к выполнению задания сразу в момент поступления — с точностью до временного интервала опроса таблицы, в которую записывается задание. В текущей реализации интервал фиксирован и равен 1 секунде. background worker-ы запускаются заранее и ждут «под парАми» появления задания, а не запускаются по мере необходимости, как в режиме расписания. Их количество определено параметром schedule.max_parallel_workers. Соответствующее число запросов может обрабатываться параллельно.

Что умеет планировщик заданий в Postgres Pro - 3
Рис.2 Режим one-time job.

Основная функция, формирующая задание, выглядит так:

schedule.submit_job(query text [options...])

У этой функции есть, в соответствии с ее спецификой, о которой было в начале, тонкие настройки. Параметр max_duration задает максимальное время исполнения. Если за отведенное время работа не сделана, задание снимается (по умолчанию время исполнения неограниченно). max_wait_interval относится не к времени работы, а к времени ожидания начала работы. Если СУБД не находит за этот промежуток времени «рабочих», готовых взяться за исполнение, задание снимается. Интересный параметр depends_on задает массив работ (в режиме one-time), после завершения которых надо запустить данную работу.

Полезный параметр — resubmit_limit — устанавливает максимальное количество попыток перезапуска. Скажем, задание запускает процедуру, которая начинает высылать сообщение на почту. Почтовый сервер, однако, не торопится его принимать, и по таймауту или вообще из-за отсутствия связи процесс завершается, чтобы возобновиться вновь сразу или через заданное время. Без ограничения в resubmit_limit попытки будут продолжаться до победного конца.

Приправы и десерты

В начале были упомянуты отсоединенные задания — detached jobs. В текущей версии процесс, запустивший одноразовое задание, влачит свое существование в ожидании результата. Накладные расходы на работу background worker невелики, останавливать его нет смысла. Важно, что исполнение или неисполнение задания не пройдет бесследно, о его судьбе мы сможем узнать из запроса к логу планировщика, доступного нам, а не только администратору базы. Это не единственный способ выследить транзакцию даже в случае ее отката: в Postgres Pro Enterprise работает механизм автономных транзакций, который можно использовать для тех же целей. Но в этом случае результат запишут в лог СУБД, а не в «личный» лог пользователя, запустившего планировщик.

Если пользователю планировщика понадобится запланировать или просто запустить некоторые команды ОС с теми правами, которые доступны ему внутри ОС, он может легко сделать это через планировщик, воспользовавшись доступными ему языками программирования. Допустим, он решил воспользоваться untrusted Perl:

CREATE LANGUAGE plperlu;

После этого в можно записать как обычный запрос такую, например, функцию:

DO LANGUAGE 'plperlu' $$
system('cat /etc/postgresql/9.6/main/pg_hba.conf > $HOME/conf_tmp');
$$;

Пример из жизни: 1. складирование неактуальных логов

Для начала упрощенный пример управления секциями (партициями) из планировщика. Допустим, мы разбили логи посещения сайта на секции по месяцам. Мы не хотим хранить на дорогих быстрых дисках секции двухлетней свежести и моложе, а остальные сбрасываем в другое табличное пространство, соответствующее другим, более дешевым носителям, сохраняя, однако, полноценные возможности поиска и других операций по всем логам (с не поделенной на секции таблицей такое невозможно). Используем удобные функции управления секциями в расширении pg_pathman. В файле postgresql.conf должна быть строка shared_preload_libraries = 'pg_pathman, pgpro_scheduler'.

CREATE EXTENSION pg_pathman; CREATE EXTENSION pgpro_scheduler;

Конфигурируем:

ALTER SYSTEM SET schedule.enabled = on;
ALTER SYSTEM SET schedule.database = 'test_db';

Баз может быть несколько. В этом случае они перечисляются через запятую внутри кавычек.
SELECT pg_reload_conf(); — перечитать изменения в конфигурации, не перезапуская Postgres.

CREATE TABLE partitioned_log(id int NOT NULL, visit timestamp NOT NULL);

Только что мы создали родительскую таблицу, которую будем разбивать на секции. Это дань традиционному синтаксису PostgreSQL, основанному на наследовании таблиц. Сейчас, в Postgres Pro Enterprise можно создавать секции не в 2 этапа (сначала пустую родительскую таблицу, потом задавать секции), а сразу определять секции. В данном случае мы воспользуемся удобной функцией pg_pathman, позволяющей сначала задавать приблизительное количество секций. По мере заполнения нужные секции будут создаваться автоматически:

SELECT create_range_partitions('partitioned_log','visit', '2015-01-01'::date, '1 month'::interval, 10);

Мы задали 10 начальных секций по одной на месяц, начиная с 1 янв. 2015. Заполним их каким-то количеством данных.

INSERT INTO partitioned_log SELECT i, '2015-01-01'::date + 60*60*i*random()::int*'1 second'::interval visit FROM generate_series(1,24*365) AS g(i);

Следить за количеством секций можно так:

SELECT count(*) FROM pathman_partition_list WHERE parent='partitioned_log'::regclass;

Запуская INSERT, «подкручивая» начальную дату и/или множители перед random, сделайте число секций немногим больше 24 (2 года).

Создаем каталог в ОС и соответствующее табличное пространство, куда будут складироваться устаревшие логи:

CREATE TABLESPACE archive LOCATION '/tmp/archive';

И, наконец, функцию, которую ежедневно будет запускать планировщик:

CREATE OR REPLACE FUNCTION move_oldest_to_archive(parent_name text, suffix text, tblsp_name text, months_hot int) RETURNS int AS
$$
DECLARE
       i int;
       part_rename_sql text;
       part_chtblsp_sql text;
       part_name text;
BEGIN
       i=0;
       FOR part_name IN SELECT partition FROM pathman_partition_list WHERE parent=parent_name::regclass and partition::text NOT LIKE '%'||suffix ORDER BY range_max OFFSET months_hot LOOP
              i:=i+1;
              part_rename_sql:=format('ALTER TABLE %I RENAME to %I', part_name, part_name||'_'||suffix);
              part_chtblsp_sql:=format('ALTER TABLE %I SET TABLESPACE %I', part_name, tblsp_name);
              EXECUTE part_chtblsp_sql;
              EXECUTE part_rename_sql;
              RAISE NOTICE 'executed %, %',part_rename_sql,part_chtblsp_sql;
       END LOOP;
       RETURN i;
END;
$$ LANGUAGE plpgsql;

Она принимает как параметры: название секционированной таблицы (partitioned_log), суффикс, который прибавится к названию перемещенной секции (archived), табличное пространство (archive) и количество месяцев — граница логов 1-й свежести (24).

Для разминки поставим одноразовое задание:

SELECT schedule.submit_job(query := $$select move_oldest_to_archive('partitioned_log','archived', 'archive', 24);$$);

Исполнив, планировщик выведет id задания. Статус его можно посмотреть в представлениях schedule.job_status и schedule.all_job_status. В лог планировщика задания, назначенные функцией submit_job(), не попадают.

Чтобы удобней было играть с планировщиком и секциями, можно создать функцию unarchive(parent_name text, suffix text), откатывающую обратно изменения (не приводим для экономии места).

Ее можно запустить тоже из планировщика, но используя параметр run_after, который задает время задержки в секундах — чтобы у нас осталось время подумать, правильно ли мы поступили:

SELECT schedule.submit_job(query := $$'select unarchive('partitioned_log','archived');',run_after='10'$$);

а если неправильно, то можно отменить ее функцией schedule.cancel_job(id);
Убедившись, что всё работает так, как задумано, можно поместить задание (теперь в синтаксисе JSON) уже в расписание:

SELECT schedule.create_job($${"commands":"SELECT move_oldest_to_archive('partitioned_log','archived', 'archive', 24);","cron":"55 7 * * *"}$$);

То есть кажд​ое утро в без пяти минут восемь планировщик будет проверять, не пора ли переместить устаревшие партиции в «холодный» архив и перемещать, если пора. Статус на этот раз можно проверять по логу планировщика: schedule.get_log();

Пример из жизни: 2. раскладываем баннеры по серверам

Покажем, как решается одна из типичных задач, в которых требуется выполнение работ по расписанию и используются одноразовые задания.

У нас есть сеть доставки контента (CDN). Мы собираемся разложить по нескольким входящим в нее сайтам баннеры, которые пользователи из рекламных агентств автоматически сгрузили в отведенный для них каталог.

DROP SCHEMA IF EXISTS banners CASCADE;
CREATE SCHEMA banners;

SET search_path TO 'banners';

CREATE TYPE banner_status_t AS enum ('submitted', 'distributing', 'ready', 'error');
CREATE TYPE cdn_dist_status_t AS enum ('submitted', 'processing', 'ready', 'error');

CREATE TABLE banners (
      id SERIAL PRIMARY KEY,
      title text,
      file text,
      status banner_status_t DEFAULT 'submitted'
);

CREATE TABLE cdn_servers(
      id SERIAL PRIMARY KEY,
      title text,
      address text,
      active boolean
);

CREATE TABLE banner_on_cdn(
      banner_id int,
      server_id int,
      created timestamp with time zone DEFAULT now(),
      started timestamp with time zone,
      finished timestamp with time zone,
      url text,
      error text,
      status cdn_dist_status_t DEFAULT 'submitted'
);

CREATE INDEX banner_on_cdn_banner_server_idx ON banner_on_cdn (banner_id, server_id);
CREATE INDEX banner_on_cdn_url_idx ON banner_on_cdn (url);

Создадим функцию, которая инициализирует загрузку баннера на сервера. Для каждого сервера она создает задачу загрузки, а также задачу, которая ожидает все созданные загрузки и проставляет правильный статус баннеру, когда загрузки завершатся:

CREATE FUNCTION start_banner_upload(bid int) RETURNS bigint AS
$BODY$
DECLARE
      job_id bigint;
      r record;
      dep bigint[];
      sql text;
      len int;
BEGIN
      UPDATE banners SET status = 'distributing' WHERE id = bid;
      dep := '{}'::bigint[];
      FOR r IN SELECT * FROM cdn_servers WHERE active is TRUE LOOP
             -- для каждого сервера создаем задачу для загрузки
             INSERT INTO banner_on_cdn (banner_id, server_id) VALUES (bid, r.id);
             sql := format('select banners.send_banner_to_server(%s, %s)', bid, r.id);
             job_id := schedule.submit_job(
                    sql,
                    name := format('send banner id = %s to server %s', bid, r.title)
             );
             -- собираем идетификаторы созданных задач в массив
             dep := array_append(dep, job_id);
      END LOOP;
      len := array_length(dep, 1);
      IF len = 0 THEN
             UPDATE banners SET status = error WHERE id = bid;
             RETURN NULL;
      END IF;
      -- создаем задачу, которая будет выполненна сразу после завершения задач,
      -- идентификаторы которых мы собрали в массив dep
      job_id = schedule.submit_job(
             format('SELECT banners.finalize_banner(%s)', bid),
             depends_on := dep,
             name := format('finalization of banner %s', bid)
      );
      RETURN job_id;
END
$BODY$
LANGUAGE plpgsql SET search_path FROM CURRENT;

А эта функция имитирует посылку баннера на сервер (на самом деле просто какое-то время спит):

CREATE FUNCTION send_banner_to_server(bid int, sid int)
RETURNS boolean AS
$BODY$
DECLARE
      banner record;
      server record;
BEGIN
      SELECT * from banners WHERE id = bid LIMIT 1 INTO banner;
      SELECT * from cdn_servers WHERE id = sid LIMIT 1 INTO server;

      UPDATE banner_on_cdn SET
             status = 'processing',
             started = now()
      WHERE
             banner_id = bid AND server_id = sid;
             
      PERFORM pg_sleep((random()*10)::int);
      UPDATE banner_on_cdn SET
             url = 'http://' || server.address || '/' || banner.file,
             status = 'ready',
             finished = now()
      WHERE
             banner_id = bid AND server_id = sid;

      RETURN TRUE;
END;
$BODY$
LANGUAGE plpgsql set search_path FROM CURRENT;

Эта функция на основе статусов загрузок баннера на сервер будет определять какой статус поставить баннеру:

CREATE FUNCTION finalize_banner(bid int)
RETURNS boolean AS
$BODY$
DECLARE
      N int;
BEGIN
      SELECT count(*) FROM banner_on_cdn WHERE banner_id = bid AND status IN ('submitted', 'processing') INTO N;
      IF N > 0 THEN -- не все загрузки еще завершились
             RETURN FALSE;
      END IF;
      SELECT count(*) FROM banner_on_cdn WHERE banner_id = bid AND status IN ('error') INTO N;
      IF N > 0 THEN -- загрузки прошли с ошибками
             UPDATE banners SET status = 'error' WHERE id = bid;
             RETURN FALSE;
      END IF;
      -- все хорошо
      UPDATE banners SET status = 'ready' WHERE id = bid;
      RETURN TRUE;
END;
$BODY$
LANGUAGE plpgsql set search_path FROM CURRENT;

Эта функция будет по расписанию проверять, есть ли необработанные баннеры. И, если нужно, запускать обработку баннера:

CREATE FUNCTION check_banners() RETURNS int AS
$BODY$
DECLARE
      r record;
      N int;
BEGIN
      N := 0;
      FOR r IN SELECT * from banners WHERE status = 'submitted' FOR UPDATE LOOP
             PERFORM start_banner_upload(r.id);
             N := N + 1;
      END LOOP;

      RETURN N;
END;
$BODY$
LANGUAGE plpgsql SET search_path FROM CURRENT;

Теперь займемся данными. Создадим список серверов:

INSERT INTO cdn_servers (title, address, active)
      VALUES ('server #1', 'cdn1.local', true);
INSERT INTO cdn_servers (title, address, active)
      VALUES ('server #2', 'cdn2.local', true);
INSERT INTO cdn_servers (title, address, active)
      VALUES ('server #3', 'cdn3.local', true);
INSERT INTO cdn_servers (title, address, active)
      VALUES ('server #4', 'cdn4.local', true);

Создадим пару баннеров:

INSERT INTO banners (title, file) VALUES ('banner #1', 'bbb1.jpg');
INSERT INTO banners (title, file) VALUES ('banner #2', 'bbb2.jpg');

И, наконец, поставим в расписание задачу по проверке вновь поступивших баннеров, которые надо разложить на сервера. Задача будет выполняться каждую минуту:

SELECT schedule.create_job('* * * * *', 'select banners.check_banners()');

RESET search_path;

Вот и всё, картинки будут разложены по сайтам, можно отдохнуть.

Послесловие

В качестве Post Scriptum сообщаем, что планировщик pgpro_scheduler работает не только на отдельном сервере, но и в конфигурации кластера multimaster. Но это тема отдельного разговора.

А в качестве Post Post Scriptum — что в дальнейших планах встраивание планировщика в создаваемую сейчас графическую оболочку администрирования.

Автор: Igor_Le

Источник

Поделиться

* - обязательные к заполнению поля