- PVSM.RU - https://www.pvsm.ru -
[1]В предыдущей части серии [2] мы (в 100500й раз) попытались рассказать про основные приемы и стадии подхода Google MapReduce, должен признаться, что первая часть была намерено "капитанской", чтобы дать знать о MapReduce целевой аудитории последующих статей. Мы не успели показать ни строчки того, как всё это мы собираемся реализовывать в Caché ObjectScript. И про это наша рассказ сегодня (и в последующие дни).
Напомним первоначальный посыл нашего мини-проекта: вы всё еще планируем реализовать MapReduce алгоритм используя те подручные средства, что есть в Caché ObjectScript. При создании интерфейсов, мы попытаемся придерживаться того API, что мы описали в предыдущей статье про оригинальную реализацию Google MapReduce, любые девиации будут озвучены соответствующе.
Начнем с реализации абстрактных интерфейсов Mapper и Reducer.
Class MR.Base.Mapper
{
Method Map(MapInput As MR.Base.Iterator, MapOutput As MR.Base.Emitter) [ Abstract ] { }
}
Class MR.Base.Reducer
{
Method Reduce(ReduceInput As MR.Base.Iterator, ReduceOutput As MR.Base.Emitter) [ Abstract ] { }
}
Изначально, как и в канонической реализации, мы сделали 2 отдельных интерфейса MapInput и ReduceInput. Но сразу стало очевидным, что они служат одной и той же цели, и предоставляют одни и те же методы – их цель пройтись по потоку данных до конца, т.ч. они оба являются итераторами. Потому, в итоге, редуцируем их в общий интерфейс MR.Base.Iterator:
Class MR.Base.Iterator
{
Method GetNext() As %String [Abstract ] { }
Method IsAtEnd() As %Boolean [Abstract ] { }
}
Оригинальная реализация Google MapReduce использовала файловую систему Google GFS как транспорт между узлами и стадиями алгоритма. В Caché есть свой механизм распространения (когерентных) данных между узлами (если не пользоваться голым TCP/UDP) – это протокол ECP (Enterprise Caсhe Protocol [3]). Обычно он используется серверами приложений для получения данных от удаленных серверов баз данных. Но ничего не останавливает нас от построения на базе таких peer-to-peer соединений ECP некоей виртуальной управляющей шины, куда мы будем складывать данные в виде пар <ключ, значение> или похожие данные. Эти данные будут будут пересылаться между акторами, участвующими в конвейерах алгоритма (т.е. emit, посылаемый объектом Mapper, будет писаться в шину ECP и читаться объектом Reducer). Если акторы будут работать в рамках одного узла, то они, например могут использовать быстрые глобалы, отображенные в CACHETEMP, или обычные глобалы, если реализуемый алгоритм многостадийный и нужна надежность и журналирование.
В любом случае, будь то локальные (для конфигурации на одном узле) глобалы, или глобалы удаленного узла, подключенного через ECP, глобалы являются удобным и хорошо зарекомендовавшим себя транспортом для передачи данных между узлами кластера Caché, в данном случае, между вовлеченными в MapReduce функциями и классами.
Посему, естественным решением, позволяющим упростить нашу систему будет использование в среде Caché для передачи данных между узлами кластера протокола ECP вместо файловых систем GFS или HDFS. Функциональные характеристики ECP позволят сделать и другие упрощения (но об этом несколько позже).
Как мы уже рассказывали в предыдущей серии [2], с момента когда данные уходят от объекта Mapper, и до момента как они поступают на вход Reducer, в классической реализации на мастере проходит тяжелая операция перемешения и сортировки.
В окружении, использующем глобалы к качестве транспорта, в MUMPS/Caché ObjectScript среде, мы можем полностью избежать дополнительных расходов на такую сортировку, т.к. агрегация и сортировка будут сделаны нижележащим btree* хранилищем.
Имея такие требования к дизайну, создадим базовый интерфейс эмиттера:
Class MR.Base.Emitter Extends MR.Base.Iterator
{
/// emit $listbuild(key,value(s))
Method Emit(EmitList... As %String) [Abstract ] { }
}
Эмиттер должен быть похож на интерфейс входного итератора, показанного выше (потому мы и пронаследовались от MR.Base.Iterator), но в дополнение к интерфейсу прохода по данным, эмиттер должен уметь еще и посылать данные в своё промежуточное хранилище (т.е. добавляем функцию Emit).
Первоначально, наша функоция Emit была очень похожа на классическую релизацию и принимала только 2 аргумента как пару <ключ, значение>, но потом мы натолкнулись на (редкую) необходимость передавать что-то более многомерное, длиннее чем пара значений (например, кортеж любой арности), потому, в настоящий момент, Emit стал функцией принимающей переменное число аргументов.
Заметим, что в большинстве случаев, на практике, сюда будет поступать только пара аргументов <ключ, значение> как мы и видели в классической реализации.
Это всё ещё абстрактный интерфейс, больше мяса будет добавлено очень скоро.
Если бы нам, при обработке, надо было сохранять порядок поступивших элементов, то мы бы использовали реализацию ниже:
/// Emitter which maintains the order of (key,value(s))
Class MR.Emitter.Ordered Extends (%RegisteredObject, MR.Base.Emitter)
{
/// global name serving as data channel
Property GlobalName As %String;
Method %OnNew(initval As %String) As %Status
{
$$$ThrowOnError($length(initval)>0)
set ..GlobalName = initval
quit $$$OK
}
Parameter AUTOCLEANUP = 1;
Method %OnClose() As %Status
{
if ..#AUTOCLEANUP {
if $data(@i%GlobalName) {
kill @i%GlobalName
}
}
Quit $$$OK
}
...
}
Заметим на полях, что в Caché глобалы – в общем-то, глобальны :), и не будут очищены автоматически по завершении процессов их создавших. В отличие, например, от PPG (process-private globals) [4]. Но иногда все же хочется, чтобы наши промежуточные каналы, созданные для взаимодействия между стадиями конвейера MapReduce удалялись по завершении подпрограммы их создавшей. Поэтому и был добавлен режим "автоочистки" (параметр класса #AUTOCLEANUP) при котором глобал, имя которого хранится в свойстве GlobalName, будет удален при закрытии объекта (в момент вызова %OnClose).
Обратите внимание, что мы форсируем один обязательный параметр в метода %New (в %OnNew генерируем $$$ThrowOnError если имя в Initval не определено). Конструктор класса ожидает получить название глобала с которым он будет работать в качестве транспорта данных.
Class MR.Emitter.Ordered Extends MR.Base.Emitter
{
/// ...
Method IsAtEnd() As %Boolean
{
quit ($data(@i%GlobalName)10)=0
}
/// emit $listbuild(key,value)
Method Emit(EmitList... As %String)
{
#dim list As %String = ""
for i=1:1:$get(EmitList) {
set $li(list,i) = $get(EmitList(i))
}
#dim name As %String = ..GlobalName
set @name@($seq(@name)) = list
}
/// returns emitted $lb(key,value)
Method GetNext() As %String
{
#dim value As %String
#dim index As %String = $order(@i%GlobalName@(""), 1, value)
if index '= "" {
kill @i%GlobalName@(index)
quit value
} else {
kill @i%GlobalName
quit ""
}
}
Method Dump()
{
zwrite @i%GlobalName
}
}
Надеемся, вы еще помните, что наш Emitter является наследником итератора Iterator? Посему ему нужно реализовать пару функций итератора – IsAtEnd и GetNext.
IsAtEnd – простой: если наш служебный глобал не содержит данных (т.е. $data(..GlobalName) [5] не возвращает 10 или 11, что означает что там в поддереве есть еще узлы с данными), то мы достигли конца потока данных;
Как известно, и как хорошо написал Саша Коблов, $SEQUENCE [7] может быть использована почти во всех местах, где использовался $INCREMENT [8], обеспечивая при этом лучшие скорости при работе в многопроцессорном или многосерверном режиме (через ECP). В силу меньшего количества коллизий при обращении к одном узлу глобала. Потому в коде выше мы используем $sequence для выделения индекса следующего элемента упорядоченного списка.
Обращаем внимание, что данный вариант удаления элемента из списка/глобала не очень совместим с параллельным режимом [10], и нужно было бы добавить блокировки или сменить структуру данных. Но т.к. на ближайшие серии у нас будет только один Reducer, на всё множество Mapper ов, то мы отложим решение данной проблемы на будущее, когда приступим к много-серверной реализации.
Заметим, что структура данных, реализованная MR.Emitter.Ordered по сути реализуют классическую коллекцию FIFO ("FirstIn – FirstOut"). Мы помещаем новый элемент в конец списка и вытаскиваем из головы списка.
Если вы посмотрите на те данные, что мы посылаем в между стадиями конвейера в примере word-count (ок, не сейчас, а когда мы вам покажем такую реализацию) то вы быстро осознаете, что:
На самом деле нам не интересен порядок, в котором мы "эмиттим" пары <ключ, значение>. Более того, нижележащее хранилище btree* всегда держит список ключей отсортированным для быстрого поиска, избавляя нас от необходимости сортировки на мастере, как произошло бы в классической реализации;
Так зачем посылать такой большой трафик ненужных данных, если мы можем их агрегировать еще в момент посылки?
Именно так и работает MR.Emitter.Sorted, который является наследником MR.Emitter.Ordered (показанного выше):
/// Emitter which sorts by keys all emitted pairs or tuples (key, value(s))
Class MR.Emitter.Sorted Extends MR.Emitter.Ordered
{
Property AutoIncrement As %Boolean [ InitialExpression = 1 ];
/// emit $listbuild(key,value)
Method Emit(EmitList... As %String)
{
#dim name As %String = ..GlobalName
#dim key As %String
#dim value As %String
if $get(EmitList)=1 {
// special case - only key name given, no value
set key = $get(EmitList(1))
quit:key=""
if ..AutoIncrement {
#dim dummyX As %Integer = $increment(@name@(key)) ; $seq is non-deterministic
} else {
set @name@(key) = 1
}
} else {
set value = $get(EmitList(EmitList))
set EmitList = EmitList - 1
for i=1:1:$get(EmitList) {
#dim index As %String = $get(EmitList(i))
quit:index=""
set name = $name(@name@(index))
}
if ..AutoIncrement {
#dim dummyY As %Integer = $increment(@name,value)
} else {
set @name = value
}
}
}
/// ...
}
Для самого простого случая, выдачи пары <key,1> или, когда значение опущено, и имеет один ключ <key> мы реализовали локальную оптимизацию, когда в режиме автоинкремента (AutoIncrement = 1) мы при вызове сразу инкрементируем соответствующий счетчик для ключа. Если же не включен автоинкремент, то мы просто (пере)определяем узел ключа в 1, фиксируя факт передачи ключа.
Для более общего случая, с двумя элементами, пары ключ-значение <key,value> или даже с большим количеством элементов <key,key2,key3,…keyn,value> (кортеж любой арности) у нас опять же реализовано 2 режима работы:
при автоинкременте мы сразу суммируем значение соответствующего узла, адресуемого ключом(ключами) с переданным значением;
Обращаем внимание, что кортеж мы передаем посредством массива, аккумулирующего переменное количество аргументов. Все элементы этого массива кроме последнего, пойдут как адреса подындексов [11]. Последний элемент кортежа будет считаться значением.
Такое необычное расширение пары «ключ-значение» в кортежи любой мощности, по нашим сведениям, является нетипичным или может быть уникальным. Нам не надо работать со строгим key-value хранилищем или bigtable хранилищем, и мы с легкостью можем работать с многомерными ключами в передаваемых элементах ("потому что можем"), что может сильно облегчить некоторые реализации алгоритмов, требующих дополнительной размерности данных, что сильно улучшает читабельность кода и упрощает понимание. В теории...
Заметим, что мы не переопределили IsAtEnd и он пронаследовал реализацию из MR.Emitter.Ordered, таким образом он по-прежнему будет возвращать ненулевое значение по окончании данных в подузлах промежуточного хранилища.
Но GetNext нам надо переопределить, т.к. мы больше не пытаемся запомнить порядок посланных данных и формат его внутреннего хранилища поменялся:
Class MR.Emitter.Sorted Extends MR.Emitter.Ordered
{
/// ...
/// returns emitted $lb(key,value)
Method GetNext() As %String
{
#dim name As %String = ..GlobalName
#dim value As %String
#dim ref As %String = $query(@name,1,value)
if ref'="" {
zkill @ref
#dim i As %Integer
#dim refLen As %Integer = $qlength(ref)
#dim baseLen As %Integer = $qlength(name)
#dim listbuild = ""
for i=baseLen+1:1:refLen {
set $li(listbuild,i-baseLen)=$qs(ref,i)
}
set $li(listbuild,*+1)=value
quit listbuild
}
quit ""
}
}
На выходе из GetNext() мы ожидаем $LISTBUILD<> [12] список, но внутри хранилища данные пар/кортежей разбросаны по узлам иерархического хранилища. Функция $QUERY [13] позволяет обойти узлы с данными (значениями пар/кортежей) в массиве для последующей их перепаковки в $LISTBUILD формат, индексы из массива последовательно добавляются следующим элементом списка (посредством присваивания элементу через функцию $LIST [14]. Само же значение узла хранилища (значение в паре «ключ-значение» или последний элемент кортежа) будет добавлено в конец сформированного списка через ту же функцию $LIST(listbuild,*+1). В данном случае *+1 как раз и обозначат номер элемента списка, следующий за текущим концом.
На этом неожиданном месте мы прервем наш рассказ про MapReduce в Caché. Во второй части данного повествования мы показали базовые интерфейсы инфраструктуры, которые будут использованы в дальнейшем при реализации конкретных примеров. Уже в следующей серии мы соберем это всё воедино и реализуем классический пример WordCount, но уже на ObjectScript. Не уходите далеко!
Автор: InterSystems
Источник [15]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/diy/195654
Ссылки в тексте:
[1] Image: http://fineartamerica.com/featured/take-it-like-a-man-joan-pollak.html
[2] В предыдущей части серии: https://habrahabr.ru/company/intersystems/blog/310180/
[3] Enterprise Caсhe Protocol: http://docs.intersystems.com/documentation/cache/latest/pdfs/GDDM.pdf
[4] PPG (process-private globals): http://docs.intersystems.com/latest/csp/docbook/DocBook.UI.Page.cls?KEY=GCOS_variables#GCOS_variables_procprivglbls
[5] $data(..GlobalName): http://docs.intersystems.com/latest/csp/docbook/DocBook.UI.Page.cls?KEY=RVBS_fdata
[6] $(listbuild(...)): http://docs.intersystems.com/latest/csp/docbook/DocBook.UI.Page.cls?KEY=RSQL_d_listbuild
[7] Саша Коблов, $SEQUENCE: https://habrahabr.ru/company/intersystems/blog/263793/
[8] $INCREMENT: http://docs.intersystems.com/latest/csp/docbook/DocBook.UI.Page.cls?KEY=RCOS_fincrement
[9] $ORDER(@i%GlobalName("")): http://docs.intersystems.com/latest/csp/docbook/DocBook.UI.Page.cls?KEY=RCOS_forder
[10] не очень совместим с параллельным режимом: https://community.intersystems.com/post/cach%C3%A9-mapreduce-basic-interfaces-mapreduce-implementation-part-ii#comments
[11] подындексов: http://docs.intersystems.com/latest/csp/docbook/DocBook.UI.Page.cls?KEY=GGBL_structure#GGBL_structure_nodes_and_subscripts
[12] $LISTBUILD<>: http://docs.intersystems.com/latest/csp/docbook/DocBook.UI.Page.cls?KEY=RCOS_flistbuild
[13] $QUERY: http://docs.intersystems.com/latest/csp/docbook/DocBook.UI.Page.cls?KEY=RCOS_fquery
[14] $LIST: http://docs.intersystems.com/latest/csp/docbook/DocBook.UI.Page.cls?KEY=RCOS_flist
[15] Источник: https://habrahabr.ru/post/310196/?utm_source=habrahabr&utm_medium=rss&utm_campaign=best
Нажмите здесь для печати.