«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop»

в 8:28, , рубрики: devops, Hadoop, zookeeper, распределенные системы

Предлагаю ознакомиться с расшифровкой лекции "Hadoop. ZooKeeper" из серии "Методы распределенной обработки больших объемов данных в Hadoop"

Что такое ZooKeeper, его место в экосистеме Hadoop. Неправда о распределённых вычислениях. Схема стандартной распределённой системы. Сложность координации распределённых систем. Типичные проблемы координации. Принципы, заложенные в дизайн ZooKeeper. Модель данных ZooKeeper. Флаги znode. Сессии. Клиентский API. Примитивы (configuration, group membership, simple locks, leader election, locking без herd effect). Архитектура ZooKeeper. ZooKeeper DB. ZAB. Обработчик запросов.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 1

Сегодня поговорим про ZooKeeper. Эта штука очень полезная. У него, как у любого продукта Apache Hadoop, есть логотип. На нем изображен человек.

До этого мы говорили в основном, как там можно обрабатывать данные, как их хранить, т. е. уже ими как-то пользоваться и как-то с ними работать. А сегодня хотелось бы немножко поговорить про построение распределенных приложений. И ZooKeeper – это одна из тех вещей, которая позволяет это дело упростить. Это некий сервис, который предназначен для некой координации взаимодействия процессов в распределенных системах, в распределенных приложениях.

Потребность в таких приложениях становится все больше и больше с каждым днем, это то, о чем наш курс. С одной стороны, MapReduce и этот готовый фреймворк позволяет нивелировать эту сложность и освободить программиста от написания примитивов таких, как взаимодействие, координация процессов. Но с другой стороны, никто не гарантирует, что этого все равно не придется делать. Не всегда MapReduce или другие готовые фреймворки полностью заменяют какие-то случаи, которые нельзя реализовать на этом. В том числе и сам MapReduce и куча других проектов апачевых, они, по сути, тоже являются распределенными приложениями. И для того чтобы писать было проще, написали ZooKeeper.

Как все приложения, связанные с Hadoop, он был разработан в компании Yahoo! Сейчас это также официальное приложение Apache. Оно не настолько активно развивается как HBase. Если вы зайдете в JIRA HBase, то там каждый день появляется куча тасков на баги, куча предложений что-то оптимизировать, т. е. постоянно идет жизнь в проекте. А ZooKeeper, с одной стороны, относительно простой продукт, а с другой стороны, это обеспечивает его надежность. И его довольно легко использовать, поэтому он стал стандартом в приложениях в рамках экосистемы Hadoop. Поэтому я подумал, что было бы полезно его рассмотреть, чтобы понять, как он работает и как им пользоваться.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 2

Это картинка, из какой-то лекции, которая у нас была. Можно сказать, что он ортогонален всему тому, что мы до этого рассматривали. И все, что здесь указано, в той или иной степени работает с ZooKeeper, т. е. – это сервис, который использует все эти продукты. Ни HDFS, ни MapReduce не пишут своих похожих сервисов, которые для них специально работали бы. Соответственно, используется ZooKeeper. И это упрощает разработку и какие-то вещи, связанные с ошибками.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 3

Откуда все это появляется? Казалось бы, запустили два приложения параллельно на разных компьютерах, соединили их шнурком или в сетку, и все работает. Но проблема в том, что Сеть ненадежна, а если бы вы сниффили трафик или посмотрели, что там происходит на низком уровне, как взаимодействуют клиенты в Сети, то можно часто увидеть, что какие-то пакеты теряются, перепосылаются. Не даром придумали протоколы TCP, которые позволяют установить некую сессию, гарантировать доставку сообщений. Но в любом случае не всегда даже TCP может спасать. У всего есть таймаут. Сеть может просто на какое-то время отваливаться. Она может просто мигать. И это все приводит к тому, что нельзя полагаться на то, что Сеть надежна. Это основное отличие от написания параллельных приложений, которые работают на одном компьютере или на каком-то одном суперкомпьютере, где Сети нет, где есть более надежная шина обмена данными в памяти. И это принципиальное отличие.

Кроме всего прочего, используя Сеть, есть всегда некая latency. У диска она тоже есть, но у Сети она больше. Latency – это какое-то время задержки, которое может быть как небольшим, так и довольно существенным.

Топология Сети меняется. Что такое топология – это размещение нашего сетевого оборудования. Есть дата-центры, есть стойки, которые там стоят, есть свечи. Все это может переподключаться, переезжать и т. д. Это все тоже нужно учитывать. Меняются айпишники, меняются роутинг, по которому у нас ходит трафик. Это тоже нужно учитывать.

Сеть также может меняться в плане оборудования. Из практики могу сказать, что наши сетевые инженеры очень любят периодически обновлять что-то на свечах. Внезапно вышла новая прошивка, и их не особенно интересует какой-то кластер Hadoop. У них своя работа. Для них главное, чтобы Сеть работала. Соответственно, они хотят там что-то перезалить, сделать перепрошивку на своем железе, при этом железки тоже периодически меняются. Все это каким-то образом нужно учитывать. Все это влияет на наше распределенное приложение.

Обычно люди, которые начинают работать с большим объемом данных, считают почему-то, что Сеть безграничная. Если там лежит файл в несколько терабайт, то можно его взять к себе на сервер или на компьютер, открыть с помощью cat и смотреть. Еще одна из ошибок – это в Vim смотреть логи. Никогда такого не делайте, потому что это плохо. Потому что Vim все старается буферизовать, все грузить в память, особенно, когда мы начинаем по этому логу перемещаться и что-то искать. Это такие вещи, о которых забывают, но их стоит учитывать.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 4

Проще написать одну программу, которая работает на одном компьютере с одним процессором.

Когда у нас наша система растет, мы хотим все это параллелить, причем параллелить не только на компьютере, но и на кластере. Возникает вопрос – как это дело координировать? Наши приложения могут даже не взаимодействовать между собой, но мы параллельно на нескольких серверах запустили несколько процессов. И как мониторить, что все у них идет хорошо? Они, например, что-нибудь отправляют по Сети. Они должны куда-то писать о своем состоянии, например, в какую-то базу данных или в лог, потом этот лог агрегировать и потом где-то его анализировать. Плюс нужно учитывать, что процесс работал-работал, внезапно в нем появилась какая-то ошибка или он упал, то как быстро мы об этом узнаем?

Понятно, что все это быстро можно замониторить. Это тоже хорошо, но мониторинг – это ограниченная вещь, которая позволяет мониторить какие-то вещи на самом высшем уровне.

Когда мы хотим, чтобы наши процессы начали взаимодействовать между собой, например, отправлять друг другу какие-то данные, то тут тоже возникает вопрос – как это будет происходить? Не будет ли какого-то race condition, не будут ли они перезаписывать друг друга, доходят ли данные правильно, не теряется ли что-то по дороге? Надо разрабатывать какой-то протокол и т. д.

Координация всех эти процессов – вещь нетривиальная. И заставляет разработчика опускаться на уровень еще ниже, и писать системы либо с нуля, либо не совсем с нуля, но это не так просто.

Если вы придумали криптографический алгоритм или даже его реализовали, то возьмите и выкиньте его сразу, потому что, скорее всего, он у вас не будет работать. В нем, скорее всего, найдется куча ошибок, которые вы забыли предусмотреть. Ни в коем случае не используйте его для чего-то серьезного, потому что, скорее всего, он неустойчивым будет. Потому что все алгоритмы, которые есть, они очень долго проверяются временем. В нем ищутся ошибки сообществом. Это отдельная тема. И то же самое и здесь. Если есть возможность не реализовывать какую-то синхронизацию процессов самостоятельно, то лучше этого не делать, потому что это довольно сложно и заводит вас на зыбкий путь постоянных поисков ошибок.

Мы сегодня говорим о ZooKeeper. С одной стороны – это фреймворк, с другой стороны, это сервис, который позволяет облегчить разработчику жизнь и максимально упростить реализацию логики и координацию наших процессов.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 5

Вспоминаем, как может выглядеть стандартная распределенная система. Это то, о чем мы говорили – это HDFS, HBase. Есть Мастер-процесс, который управляет воркерами, slave-процессами. Он занимается координацией и распределением задач, перезапуском воркеров, запуском новых, распределением нагрузки.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 6

Более продвинутая вещь – это Coordination Service, т. е. саму задачу координации вынести в отдельный процесс, плюс параллельно запускать какой-то backup или stanby Мастера, потому что Мастер может упасть. А если Мастер упадет, то система наша не будет работать. Мы запускаем backup. Какие-то состояния, которые есть у Мастера, нужно реплицировать на backup. Это также можно поручить Coordination Service. Но на этой схеме координацией воркеров занимается сам Мастер, здесь сервис занимается координацией действий по репликации данных.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 7

Более продвинутый вариант – это когда всей координацией занимается наш сервис, как обычно и делается. Он берет на себя ответственность за то, что все работает. А если что-то не работает, мы об этом узнаем и пытаемся эту ситуацию обойти. В любом случае у нас остается Мастер, который как-то взаимодействует еще со slaves, через некий сервис может отправлять данные, информацию, сообщения и т. д.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 8

Есть еще более продвинутая схема, когда у нас нет Мастера, все ноды являются мастер-slaves, разнозначными в своем поведении. Но все равно им нужно между собой взаимодействовать, поэтому все равно остается некий сервис для координации этих действий. Наверное, под эту схему подходит Cassandra, которая по такому принципу работает.

Сложно сказать, какая из этих схем лучше работает. У каждой есть свои минусы и плюсы.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 9

И не нужно бояться каких-то вещей с Мастером, потому что, как показывает практика, не так он подвержен тому, чтобы постоянно подать. Тут главное – выбирать правильное решение для размещения этого сервиса на отдельной ноде мощной, чтобы у нее было достаточно ресурсов, чтобы по возможности у пользователей не было туда доступа, чтобы они случайно не убили этот процесс. Но при этом в такой схеме намного проще из Мастер-процесса управлять воркерами, т. е. эта схема проще с точки зрения реализации.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 10

А эта схема (выще), наверное, более сложная, но более надежная.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 11

Основная проблема – это partial failures. Например, когда мы отправляем сообщение по Сети, происходит какая-то авария, и тот, кто отправил сообщение, не узнает дошло ли его сообщение и что произошло на стороне приемника, не узнает правильно ли сообщение обработалось, т. е. он не получит никакого подтверждения.

Соответственно, мы должны эту ситуацию обработать. И самое простое – это переотправить это сообщение и ждать, пока нам придет ответ. При этом нигде не учитывается изменилось ли состояние приемника. Возможно, мы отправим сообщение и два раза добавили одни и те же данные.

ZooKeeper предлагает способы борьбы с такими отказами, что тоже упрощает нам жизнь.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 12

Как чуть раньше говорилось, это похоже на написание многопоточных программ, но основное отличие в том, что в распределенных приложениях, которые мы строим на разных машинах, способом взаимодействия является только Сеть. По сути, это архитектура shared-nothing. У каждого процесса или сервиса, который работает на одной машине, есть своя память, свой диск, свой процессор, которым он ни с кем не делится.

Если мы пишем многопоточную программу на одном компьютере, то мы можем использовать shared memory для обмена данными. У нас там есть context switch, процессы могут переключаться. Это влияет на производительность. С одной стороны, в программе на кластере такого нет, но есть проблемы с Сетью.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 13

Соответственно, основные проблемы, которые возникают при написании распределенных систем, это конфигурация. Мы пишем какое-то приложение. Если оно простенькое, то мы хардкодим всякие циферки в коде, но это неудобно, потому что если мы решаем, что вместо таймаута в полсекунды мы хотим таймаут в одну секунду, то надо перекомпилировать приложение, все заново раскатать. Одно дело, когда это на одной машине, когда можно просто перезапустить, а когда у нас много машин, то надо все постоянно копировать. Надо стараться, чтобы приложение было конфигурируемо.

Тут говориться о статической конфигурации для системных процессов. Это не совсем, может быть, с точки зрения операционной системы, это может быть статическая конфигурация для наших процессов, т. е. это конфигурация, которую нельзя просто взять и обновить.

А также есть динамическая конфигурация. Это параметры, которые мы хотим изменять на лету, чтобы они там подхватывались.

В чем здесь проблема? Обновили мы конфиг, выкатили и что? Проблема может быть в том, что с одной стороны мы выкатили конфиг, а про обновку забыли, там остался стался конфиг. Во-вторых, пока мы выкатывали, где-то конфигурация обновилась, а где-то нет. И какие-то процессы нашего приложения, которые работают на одной машине перезапустились с новым конфигом, а где-то со старым. Это может приводить к тому, что наше распределенное приложение будет не в консистентном состоянии с точки зрения конфигурации. Эта проблема общая. Для динамической конфигурации она более актуальная, потому что подразумевается, что ее можно изменять на лету.

Еще одна проблема – это group membership. Всегда у нас есть какой-то набор воркеров, мы всегда хотим знать – кто из них жив, кто из них мертв. Если есть Мастер, то он должен понимать, на какие воркеры можно перенаправлять клиентов, чтобы они запускали вычисления или работали с данными, а на какие нельзя. Постоянно возникает проблема, что нужно знать, кто в нашем кластере работает.

Еще одна типичная проблема – это leader election, когда мы хотим знать, кто ответственный. Один из примеров – это репликация, когда у нас есть какой-то процесс, который принимает операции на запись и потом реплицирует их по остальным процессам. Он будет лидером, все остальные будут ему подчиняться, будут за ним идти. Нужно выбирать такой процесс, чтобы он был однозначным для всех, чтобы не получилось так, что выбралось двое лидеров.

Еще есть – mutually exclusive access. Тут проблема более сложная. Есть такое понятие как мьютекс, когда вы пишите многопоточные программы и хотите, чтобы доступ к какому-то ресурсу, например, к ячейке памяти, был ограничен и осуществлялся только одним потоком. Здесь ресурсом может быть что-то более абстрактное. И разные приложения с разных нод нашей Сети должны получать только эксклюзивный доступ для данного ресурса, а не так, чтобы все могли его изменять или писать туда что-то. Это, так называемые, locks.

Все эти проблемы ZooKeeper позволяет в той или иной степени решить. И я покажу на примерах, как он позволяет это сделать.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 14

Нет блокирующих примитивов. Когда мы начинаем что-то использовать, этот примитив не будет ждать наступления какого-либо события. Скорее всего, эта штука будет работать асинхронно, тем самым позволяя процессам не зависать в состоянии того что они чего-то ждут. Это очень полезная вещь.

Все клиентские запросы обрабатываются в порядке общей очереди.

И клиенты имеют возможность получать нотификацию об изменениях какого-то состояния, об изменениях данных, прежде чем клиент увидит сами измененные данные.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 15

ZooKeeper может работать в двух режимах. Первый – это standalone, на одной ноде. Это удобно для тестирования. А также он может работать в режиме кластера, на любом количестве серверов. Если у нас кластер – 100 машин, то необязательно, чтобы он работал на 100 машинах. Достаточно выделить несколько машинок, где можно запустить ZooKeeper. И исповедует принцип high availability. На каждом запущенном инстансе ZooKeeper хранит всю копию данных. Позже расскажу, как он это делает. Он не шардит данные, не партиционирует их. С одной стороны – это минус, что мы не можем хранить много, с другой стороны – это делать и не нужно. Он не для этого предназначен, это не база данных.

Данные можно кешировать на стороне клиента. Это стандартный принцип, чтобы нам не дергать сервис, не нагружать его одинаковыми запросами. Умный клиент обычно знает об этом и кеширует у себя.

Например, что-то у нас изменилось. Есть какое-то приложение. Выбрали нового лидера, который отвечает, например, за обработку операций записи. И мы хотим реплицировать данные. Одно из решений – поставить циклик. И постоянно опрашиваем наш сервис – не изменилось ли что-то? Второй вариант более оптимальный. Это watch mechanism, который позволяет уведомлять клиентов о том, что что-то изменилось. Это менее затратный способ по ресурсам и более удобный для клиентов.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 16

Client – это пользователь, который пользуется ZooKeeper.

Server – это сам процесс ZooKeeper.

Znode – это ключевая штука в ZooKeeper. Все znode хранятся в памяти у ZooKeeper и организовываются в виде иерархической схемы, в виде дерева.

Есть два типа операций. Первый – это update/write, когда какая-то операция изменяет состояние нашего дерево. Дерево общее.

И возможно, что клиент не выполняет один запрос и отрубается, а может установить сессию, с помощью которой взаимодействует с ZooKeeper.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 17

Модель данных ZooKeeper напоминает файловую систему. Есть стандартный корень и дальше мы пошли как будто по каталогам, которые идут от корня. И дальше каталог первого уровня, второго уровня. Это все является znodes.

Каждая znode может хранить в себе какие-то данные, обычно не очень большого объема, например, 10 килобайт. И у каждой znode может быть какое-то количество детей.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 18

Znode бывают нескольких типов. Их можно создавать. И при создании znode мы указываем тип, к которому она должна относиться.

Есть два типа. Первый – это ephemeral flag. Znode живет в рамках какой-то сессии. Например, клиент установил сессию. И пока эта сессия жива, она будет существовать. Это нужно для того, чтобы не плодить чего-то лишнего. Это также подходит для таких моментов, когда нам важно в рамках сессии хранить примитивы данных.

Второй тип – это sequential flag. Он увеличивает счетчик на пути к znode. Например, у нас был каталог с приложением 1_5. И когда мы создали первую ноду, она получила p_1, вторая – p_2. И когда мы каждый раз вызываем этот метод, то мы передаем полный путь, указывав только часть пусти, а этот номер автоматически инкрементится, потому что мы указываем тип ноды – sequential.

Регулярная znode. Она будет жить всегда и иметь то имя, которое мы ей скажем.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 19

Еще одна полезная вещь – это watch flag. Если мы его устанавливаем, то клиент может подписываться на какие-то события для определенной ноды. Покажу потом на примере, как это делается. Сам ZooKeeper уведомляет клиента о том, что данные на ноде изменились. При этом уведомления не гарантируют, что это какие-то новые данные приехали. Они просто говорят о том, что что-то изменилось, поэтому сравнивать данные потом придется все равно отдельными вызовами.

И как я уже говорил, порядок данных определяется килобайтами. Не нужно там хранить большие текстовые данные, потому что это не база данных, это сервер координации действий.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 20

Немножко расскажу про сессии. Если у нас несколько серверов, то мы можем прозрачно переходить от сервера к серверу, используя идентификатор сессии. Это довольно-таки удобно.

У каждой сессии есть какой-то таймаут. Сессия определяется тем, передает ли что-то клиент во время этой сессии серверу. Если он за время таймаута ничего не передал, сессия отваливается, либо клиент сам ее может закрыть.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 21

У него нет так много функций, но с помощью этого API можно делать различные штуки. Тот вызов, который мы видели, create создает znode и принимает три параметра. Это путь к znode, причем его нужно указывать полным от корня. А также это какие-то данные, которые мы хотим туда передать. И тип флага. И после создания он возвращает путь к znode.

Второе – можно удалить. Тут фишка в том, что вторым параметром, кроме пути к znode можно указать версию. Соответственно, будет удалена та znode, если ее версия, которую мы передали, будет эквивалентна той, которая на самом деле.

Если мы хотим не проверять эту версию, то мы просто передаем аргумент «-1».

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 22

Третье – осуществляет проверку существования znode. Возвращает true, если нода существует, иначе false.

И тут появляется flag watch, который позволяет установить наблюдение за этой нодой.

Можно установить этот flag даже на несуществующую ноду и получить уведомление в том случае, когда она появится. Это тоже бывает полезно.

Еще парочка вызовов – это getData. Понятно, что мы по znode можем получить данные. Также можно использовать flag watch. В данном случае он не установится, если ноды нет. Поэтому нужно понять, что она существует, а потом уже получать данные.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 23

А также есть SetData. Здесь мы передаем version. И если мы это передадим, то будут обновлены данные на znode определенной версии.

Также можно указать «-1», чтобы эту проверку исключить.

Еще один полезный метод – это getChildren. Также мы можем получить список всех znode, которые принадлежат ей. Можем за этим понаблюдать, установив flag watch.

И метод sync позволяет все изменения отправить сразу, тем самым гарантируя то, что они сохранились и все данные полностью изменились.

Если проводить аналогии с обычным программированием, то, когда вы используете методы такие, как write, которые пишут что-то на диск, и после того как он вернул вам ответ, нет гарантии, что у вас данные записались на диск. И даже когда операционная система уверена, что все записалось, в самом диске есть механизмы, где процесс проходит через уровни буферов, и только после этого данные помещаются на диск.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 24

В основном используются асинхронные вызовы. Это позволяет клиенту работать параллельно с разными запросами. Можно использовать синхронный подход, но он менее производителен.

Две операции, о которых мы говорили, это update/write, которые изменяют данные. Это create, setData, sync, delete. И read – это exists, getData, getChildren.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 25

Теперь несколько примеров, как можно сделать примитивы работы в распределенной системе. Например, связанный с конфигурацией чего-то. Появился новый воркер. Мы добавили машину, запустили процесс. И есть следующие три вопроса. Как он запрашивает конфигурацию у ZooKeeper? И если мы хотим изменить конфигурацию, то как мы ее изменяем? И после того, как мы ее изменили, как те воркеры, которые у нас были, получают ее?

В ZooKeeper это можно сделать относительно просто. Например, есть наше дерево znode. Здесь есть нода для нашего приложения, мы создаем в ней дополнительную ноду, где содержатся данные из конфигурации. Это могут быть как отдельные параметры, так и нет. Так как размер небольшой, то обычно размер конфигурации тоже небольшой, поэтому здесь вполне можно ее хранить.

Вы используете метод getData для получения конфигурации для воркера из ноды. Устанавливаем true. Если этой ноды по какой-то причине нет, нас об этом проинформируют, когда она появится, либо, когда она изменится. Если мы хотим знать, что что-то изменилось, то ставим true. И в случае изменении данных в этой ноде, мы об этом узнаем.

SetData. Задаем данные, устанавливаем «-1», т. е. не проверяем версию, считаем, что конфигурация у нас всегда одна, нам не нужно хранить много конфигураций. Если нужно много хранить, то нужно будет добавить еще один уровень. Здесь считаем, что она одна, поэтому обновляем только последнюю, поэтому версию не проверяем. В этот момент все клиенты, которые до этого подписались, они получают уведомление о том, что что-то изменилось в этой ноде. И после того, как они его получили, они также должны запросить данные еще раз. Уведомление заключается в том, что они получают не сами данные, а только уведомление об изменениях. После этого они должны попросить новые данные.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 26

Второй вариант применения примитива – это group membership. У нас распределенное приложение, есть куча воркеров и мы хотим понимать, что все они на месте. Поэтому они должны себя зарегистрировать, что они работают в нашем приложении. И также мы хотим либо из Мастер-процесса, либо еще откуда-то узнать обо всех активных воркерах, которые у нас сейчас есть.

Как мы это делаем? Для приложения создаем ноду workers и добавляем туда подуровень с помощью метода create. У меня ошибка на слайде. Тут нужно sequential указывать, тогда все воркеры будут создаваться по одному. И приложение запрашивая все данные о children этой ноды, получает все активные воркеры, которые есть.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 27

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 28

Вот такая страшная имплементация того, как это может быть сделано в Java-коде. Начнем с конца, с метода main. Вот это наш класс, создаем его метод. В качестве первого аргумента используем host, куда подсоединяемся, т. е. задаем его аргументом. А в качестве второго аргумента – имя группы.

Как происходит соединение? Это простой пример API, который используется. Тут все относительно просто. Есть стандартный класс ZooKeeper. Мы передаем ему hosts. И задаем таймаут, например, в 5 секунд. И у нас есть такой член, как connectedSignal. По сути, мы создаем группу по передаваемому пути. Данные туда не пишем, хотя что-нибудь можно было написать. И нода тут имеет тип persistent. По сути, это обычная регулярная нода, которая будет существовать все время. Здесь создается сессия. Это имплементация самого клиента. Наш клиент будет осуществлять периодически сообщения о том, что сессия жива. А когда мы сессию завершаем, то мы вызываем close и все, сессия отваливается. Это на тот случай, если у нас что-то отвалится, чтобы ZooKeeper об этом узнал и сессию отрубил.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 29

Как залочить какой-то ресурс? Здесь все несколько сложнее. У нас есть набор воркеров, есть какой-то ресурс, который мы хотим залочить. Для этого мы создаем отдельную ноду, например, под названием lock1. Если мы смогли ее создать, значит, мы получили lock сюда. А если мы не смогли ее создать, то воркер пытается получить getData отсюда, а т. к. нода уже создана, то мы ставим сюда watcher и в тот момент, когда состояние этой ноды изменится, мы об этом узнаем. И мы можем постараться успеть ее пересоздать. Если мы забрали эту ноду, забрали этот lock, то после того, как lock нам больше не нужен, мы от него откажемся, т. к. нода существует только в рамках сессии. Соответственно, она исчезнет. И другой клиент в рамках другой сессии сможет забрать lock на эту ноду, точнее он получит уведомление о том, что что-то изменилось и он может попытаться успеть это успеть это сделать.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 30

Другой пример, как можно выбирать основного лидера. Тут немножко посложнее, но тоже относительно просто. Что у нас здесь происходит? Есть основная нода, которая агрегирует в себе всех воркеров. Мы пытаемся получить данные про лидера. Если это успешно произошло, т. е. мы получили какие-то данные, то наш воркер начинает следовать за этим лидером. Он считает, что лидер уже есть.

Если лидер по какой-то причине умер, например, отвалился, то мы пытаемся создать нового лидера. И если у нас это получилось, то наш воркер становится лидером. А если кто-то в этот момент успел создать нового лидера, то мы пытаемся понять, кто это и потом за ним последовать.

Тут возникает, так называемый, herd effect, т. е. эффект стада, потому что, когда лидер умирает, то тот, кто первый успеет, тот и станет лидером.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 31

При захвате ресурса можно попробовать использовать несколько другой подход, который заключается в следующем. Например, мы хотим получить lock, но без hert effect. Заключаться он будет в том, что наше приложение запрашивает списки всех id нод для уже существующей ноды с lock. И если перед этим ту ноду, lock для которой мы создали, она будет минимальной из того набора, который мы получили, то это означает, что мы захватили lock. Мы проверяем, что мы получили lock. В качестве проверки будет условие, что тот id, который мы получили при создании нового lock, он является минимальным. И если мы получили, то мы работаем дальше.

Если существует некий id, который меньше нашего lock, то мы ставим на это событие watcher и ждем нотификации, пока что-то не изменится. Т. е. мы получили этот lock. И пока тот не отвалится, мы не станем минимальным id и не получим минимальный lock, и тем самым сможем залочиться. А если это условие не выполняется, тогда мы сразу идем сюда и пытаемся еще раз получить этот lock, потому что за это время могло что-то измениться.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 32

Из чего состоит ZooKeeper? Есть 4 основные вещи. Это обработка процессов – Request. А также ZooKeeper Atomic Broadcast. Есть Commit Log, куда заносятся все операции. И сама In-memory Replicated DB, т. е. сама база данных, где хранится все это дерево.

Стоит отметить, что все операции на запись проходят через Request Processor. А операции на чтение идут сразу в In-memory базу.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 33

Сама база полностью реплицирована. На всех instances ZooKeeper хранится полная копия данных.

Для того чтобы восстанавливить бд после падения, существует Commit log. Стандартная практика, когда данные прежде чем попадают в память, они пишутся туда, чтобы в случае падения этот log можно было проиграть и восстановить состояние системы. И также применяются периодические снапшоты базы.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 34

ZooKeeper Atomic Broadcast – это такая штука, которая используется для поддержания реплицированных данных.

ZAB внутри себя выбирает лидера с точки зрения ноды ZooKeeper. Другие ноды становятся ее фоловерами, ждут от нее каких-то действий. Если на них приходят записи, то они все их перенаправляют лидеру. Тот предварительно выполняет операцию записи и затем рассылает сообщение о том, что изменилось своим фоловерам. Это, по сути, должно выполняться атомарно, т. е. операция записи и broadcasting всего этого дела должны выполняться атомарно, тем самым гарантируется консистентность данных.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 35 Он обрабатывает только write request. Основная его задача в том, что он трансформирует операцию в transactional update. Это специально сформированный запрос.

И тут стоит отметить, что гарантируется идемпотентность апдейтов для одной и той же операции. Что это такое? Эта вещь, если которую выполнить два раза, она будет иметь одно и то же состояние, т. е. сам запрос от этого не изменится. И сделать это нужно для того, чтобы в случае падения можно было заново запустить операцию, тем самым накатить изменения, которые отвалились на данный момент. При этом состояние системы станет таким же, т. е. не должно быть такого, что череда одних и тех же, например, процессов обновлений, приводила к различным итоговым состояниям системы.

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 36

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 37

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 38

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 39

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 40

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 41

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 42

«Hadoop. ZooKeeper» из серии Технострима Mail.Ru Group «Методы распределенной обработки больших объемов данных в Hadoop» - 43

Автор: Пацев Антон

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js