Синхронизация процессов при распараллеливании задачи средствами Caché Event API

в 6:01, , рубрики: cache, dbms cache, intersystems cache, nosql, Блог компании InterSystems, параллельное программирование, субд Caché, метки: , , ,

Сегодня наличие многоядерных, многопроцессорных и многоузловых систем является уже нормой при обработке большого объёма данных.
Как же можно задействовать все эти вычислительные мощности? Ответ очевиден — распараллелив задачу.
Но тут же встаёт другой вопрос: а как синхронизировать сами подзадачи?

Сразу стоит отметить, что команда JOB в версии СУБД Caché для Windows порождает не поток, а процесс. Поэтому правильнее было бы говорить не о многопоточном, а о многопроцессном приложении.
Отсюда же следует, что для Caché более важно в процессоре количество ядер, чем наличие технологии Hyper-Threading, что следует учитывать при выборе железа.

Этапы распараллеливания: Map и Reduce

Вначале кратко рассмотрим этапы распараллеливания задач на примере биометрической идентификации.

Допустим, есть база данных с биометрической информацией, например фотографиями.
И вы, имея фотографию какого-то человека, хотите его по этой базе идентифицировать (поиск «один-ко-многим»).

Для начала нужно определиться с тем что, куда и как мы будем «параллелить».
Это может зависеть от множества факторов: количества ядер, процессоров на одном узле, количества самих узлов в grid-системе (ECP), распределения самих данных по узлам и т.д.
Другими словами на данном этапе (Map) мы должны определиться со стратегией, по которой наша задача будет параллелиться. Ведь одна задача может быть распределена на множество более мелких задач, которые в свою очередь тоже могут быть распараллелены и так далее.

На следующем этапе (Reduce) мы должны собрать данные от наших подзадач, агрегировать их и выдать окончательный результат.

Применительно к нашему примеру стратегия Map может существенно варьироваться.
Например, от количества людей на исходной фотографии.

Одно лицо

Если на фотографии только один человек, то каждому процессу можно поручить идентифицировать его в рамках своей части данных, которые могут быть, как размазаны по узлам, так и быть общими для всех узлов.

Несколько лиц

Если же на фотографии запечатлено сразу несколько людей, то каждому процессу можно поручить идентифицировать какого-то одного человека сразу по всем данным.

На этапе Reduce, получив список похожих лиц и коэффициент «похожести», нам остаётся лишь отсортировать его и выдать топ наиболее похожих.

Caché Event API

На этапе Reduce попутно с получением результатов от каждой из подзадач мы должны мочь определить какие из них уже выполнены, а какие нет, в чём нам и поможет класс %SYSTEM.Event.
В документации достаточно подробно описан механизм обработки очереди событий, поэтому останавливаться подробно не имеет смысла.

Отмечу лишь два основных метода:

  1. Wait/WaitMsg — ожидание пробуждения ресурса с/без получения сообщения
  2. Signal — отсылка сигнала на пробуждение ресурса с передачей сообщения

Пример приложения

  • создадим три дочерних процесса, передав в каждый свои данные
  • в каждом из процессов сымитируем бурную деятельность и вернём некий результат родительскому
  • выведем полученные результаты на экран

Итак, создадим следующую программу:

main() {

  ; удаляем временные данные с предыдущего раза
  
kill ^tmp
  
  
; запускаем три подзадачи, они же процессы
  
job job(1, "яблоко", 5)
  
job job(2, "груша", 6)
  
job job(3, "слива", 7)
  
  
; выводим результат на экран
  
zwrite ^tmp

}

job(a,b,c)

  hang ; имитируем бурную деятельность задержкой в 1 сек.

  set ^tmp(a)=b_"-"_(c*2)  // формируем результат

Запустим из терминала нашу программу:

TEST>do ^main

TEST>

В итоге мы не видим никакого результата, потому что запущенные процессы живут своей жизнью (выполняются асинхронно) и мы из основного процесса не дождались их завершения.

Давайте попробуем это исправить, вставив задержку, как показано ниже:

main() {

  ; …
  
job job(3, "слива", 7)
  
  
hang 1
  
  
; выводим результат на экран
  ; …

}

Ещё раз запустим:

TEST>do ^main
^tmp(1)="яблоко-10"
^tmp(2)="груша-12"
^tmp(3)="слива-14"

TEST>

Теперь результат получен.
Но реализовано это крайне неэффективно и негибко, поскольку мы заранее не знаем сколько времени будут выполняться подзадачи.
Можно воспользоваться проверкой наличия данных или блокировками с таймаутом. Но это тоже всё неоптимально.

В этой ситуации нас спасает встроенный механизм "Event Queueing".

Перепишем наше приложение, дополнительно назначив каждому процессу свой приоритет.

main() {

  ; создаём три процесса со своим приоритетом
  
job job(3, -7, "яблоко", 5)
  
job job(2, 0, "груша", 6)
  
job job(1, 8, "слива", 7)
  
  
; ожидаем сигнала пробуждения и
  ; выводим результат на экран
  
write $list($system.Event.WaitMsg(), 2),!
  
write $list($system.Event.WaitMsg(), 2),!
  
write $list($system.Event.WaitMsg(), 2),!
  
}

job(x,delta,a,b)
  
  
; меняем приоритет текущему процессу на delta
  
do $system.Util.SetPrio(delta)
  
  
hang ; имитируем бурную деятельность задержкой в x сек.

  // посылаем сигнал пробуждения родительскому процессу
  // одновременно с результатом
  
do $system.Event.Signal($zparent,a_"-"_(b*2))

Вывод результата:

TEST>do ^main
слива-14
груша-12
яблоко-10

TEST>

Тот же самый код, но уже в виде класса

Class test.task
{

ClassMethod Test()
{

  ; запускаем асинхронно три процесса со своим приоритетом
  
job ..SubTask(3, -7, "яблоко", 5)
  
job ..SubTask(2, 0, "груша", 6)
  
job ..SubTask(1, 8, "слива", 7)

  ; ожидаем сигнала пробуждения и
  ; выводим результат на экран
  
write $list($system.Event.WaitMsg(), 2),!
  
write $list($system.Event.WaitMsg(), 2),!
  
write $list($system.Event.WaitMsg(), 2),!
}

ClassMethod SubTask(
  
x,
  
delta,
  
a,
  
b)
{
  
; меняем приоритет текущему процессу на delta
  
do $system.Util.SetPrio(delta)

  hang ; имитируем бурную деятельность задержкой в x сек.

  // посылаем сигнал пробуждения родительскому процессу
  // одновременно с результатом
  
do $system.Event.Signal($zparent,a_"-"_(b*2))
}

}

Для запуска метода класса в терминале вызовите
do ##class(test.task).Test()


Некоторые полезные ссылки можно найти в справочнике классов:

  1. класс %SYSTEM.CPU — предоставляет информацию о процессорах
  2. класс %SYSTEM.Util — содержит разные полезные методы.
    Например: NumberOfCPUs, SetBatch, SetPrio
  3. параметр JobServers — управляет размером пула процессов

Автор: servitRM

Источник

Поделиться

* - обязательные к заполнению поля