- PVSM.RU - https://www.pvsm.ru -
Привет! После долгой паузы мы наконец-то возвращаемся к разбору Apache Flume [1]. В предыдущих статьях мы познакомились с Flume (Часть 1 [2]) и разобрались, как настраивать основные его компоненты (Часть 2 [3]). В этой, заключительной, части цикла мы рассмотрим следующие вопросы:
Итак, мы настроили и запустили все узлы, проверили их работоспособность — данные успешно доставляются до пункта назначения. Но проходит какое-то время, мы смотрим на результат работы нашей транспортной сети (например, папку с файлами, в которые упаковываются данные) и понимаем, что возникла проблема — начиная с какого-то момента новые файлы не появляются в нашей папке. Следующий шаг кажется очевидным — открываем логи, ищем причину. Беда только в том, что узлов в нашей транспортной сети может быть много, а значит необходимо вручную просматривать логи всех узлов, что, мягко говоря, не очень удобно. Когда подобные проблемы возникают, реагировать на них хотелось бы максимально оперативно, а еще лучше — вообще не допускать таких критичных ситуаций.
Компоненты Flume в процессе работы пишут метрики, которые позволяют оценить состояние узла. По значениям этих метрик довольно легко определить, что с узлом не всё в порядке.
Для хранения счетчиков и других атрибутов своих компонентов Flume использует java.lang.management.ManagementFactory [4], регистрируя собственные bean-классы для ведения метрик. Все эти классы унаследованы от MonitoredCounterGroup [5] (для любопытных — ссылка на исходный код [6]).
Если вы не планируете разрабатывать собственные компоненты Flume, то закапываться в механизм ведения метрик совершенно необязательно, достаточно разобраться, как их достать. Сделать это можно довольно просто с помощью утилитарного класса JMXPollUtil [7]:
package ru.test.flume.monitoring;
import java.util.Map;
import org.apache.flume.instrumentation.util.JMXPollUtil;
public class FlumeMetrics {
public static Map<String, Map<String, String>> getMetrics() {
Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
return metricsMap;
}
}
В результате вы получите метрики, сгруппированные по компонентам узла, которые выглядят примерно так:
{
"SOURCE.my-source": {
"EventReceivedCount": "567393607",
"AppendBatchAcceptedCount": "5689696",
"Type": "SOURCE",
"EventAcceptedCount": "567393607",
"AppendReceivedCount": "0",
"StartTime": "1467797931288",
"AppendAcceptedCount": "0",
"OpenConnectionCount": "1",
"AppendBatchReceivedCount": "5689696",
"StopTime": "0"
},
"CHANNEL.my-channel": {
"ChannelCapacity": "100000000",
"ChannelFillPercentage": "5.0E-4",
"Type": "CHANNEL",
"ChannelSize": "500",
"EventTakeSuccessCount": "567393374",
"StartTime": "1467797930967",
"EventTakeAttemptCount": "569291443",
"EventPutSuccessCount": "567393607",
"EventPutAttemptCount": "567393607",
"StopTime": "0"
},
"SINK.my-sink": {
"ConnectionCreatedCount": "1",
"ConnectionClosedCount": "0",
"Type": "SINK",
"BatchCompleteCount": "2",
"EventDrainAttemptCount": "567393374",
"BatchEmptyCount": "959650",
"StartTime": "1467797930968",
"EventDrainSuccessCount": "567393374",
"BatchUnderflowCount": "938419",
"StopTime": "0",
"ConnectionFailedCount": "0"
}
}
Метрики получили, теперь необходимо их куда-то отправить. Здесь можно пойти двумя путями.
Flume предоставляет API, позволяющей задать способ мониторинга — для этого используются реализации интерфейса MonitorService [8]. Для того, чтобы подключить мониторинг, необходимо указать класс, реализующий MonitorService
, в качестве системного свойства при запуске узла (или в коде).
java -Dflume.monitoring.type=org.apache.flume.instrumentation.http.HTTPMetricsServer ...
System.setProperty("flume.monitoring.type", "org.apache.flume.instrumentation.http.HTTPMetricsServer");
Класс HTTPMetricsServer
предлагает стандартный способ отслеживания состояния узла. Он представляет собой небольшой web-сервер, который по запросу отдает полный список метрик узла в виде JSON (как в примере выше). Чтобы указать порт, на котором этот сервер будет слушать запросы, достаточно добавить в конфигурацию Flume параметр (по умолчанию использует порт 41414):
flume.monitoring.port = 61509
Запрос к этому серверу выглядит так: localhost:61509/metrics
.
Если же такого способа следить за метриками недостаточно, то придется пойти вторым путём и написать собственную реализацию MonitorService
. Именно так мы и поступили, чтобы наблюдать за состоянием наших узлов с помощью Graphite. Ниже приведен простой пример такой реализации.
package ru.dmp.flume.monitoring;
import com.google.common.base.CaseFormat;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flume.Context;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.util.JMXPollUtil;
public class FlumeGraphiteMonitor implements MonitorService {
// нормализованные имена метрик, которые не нужно отправлять в Graphite
private static final Set<String> EXCLUDED_METRICS = new HashSet<String>() {{
add("start-time");
add("stop-time");
}};
private volatile long period = 60 * 1000; // интервал отправки, 1 минута
private volatile boolean switcher = true;
private Thread scheduler = new Thread(this::schedule);
@Override
public void configure(Context context) {
// Здесь можно достать какие-нибудь настройки из файла конфигурации
}
private void schedule() {
while (switcher) {
send();
synchronized (this) {
try {
wait(period);
} catch (InterruptedException ex) {}
}
}
}
@Override
public void start() {
scheduler.start();
}
@Override
public void stop() {
switcher = false;
synchronized (this) {
notifyAll();
}
try {
scheduler.join();
} catch (InterruptedException ex) {}
}
private void send() {
Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
for (Map.Entry<String, Map<String, String>> e: metricsMap.entrySet()) {
if (e.getValue() != null) {
// все метрики от узлов Flume начинаем с префикса "flume"
String group = "flume." + normalize(e.getKey().toLowerCase()) + ".";
for (Map.Entry<String, String> metric : e.getValue().entrySet()) {
try {
Double value = Double.valueOf(metric.getValue());
String metricName = normalize(metric.getKey());
if (!EXCLUDED_METRICS.contains(metricName)) {
String fullName = group + normalize(metric.getKey());
// Отправляем данные в графит или куда-то еще
// Graphite.send(metricName, value);
}
} catch (NumberFormatException ex) {
// так отсеиваем значения, не являющиеся числом
}
}
}
}
}
// приводим к виду EventReceivedCount -> event-received-count (необязательно)
private static String normalize(String str) {
return CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, str).replaceAll("_", "-");
}
В итоге получаем аккуратную ветку Graphite со всеми метриками узла.
Ниже приведены описания графиков и метрик, которые мы используем для одного из наших сервисов.
Если значение на этом графике падает до нуля, значит клиент по каким-то причинам не может отправить сообщения во Flume. Чтобы диагностировать, кто виноват в таких ситуациях, мы отдельно отображаем график ошибок, возникающих на стороне клиента. Соответственно, если он отличен от нуля — проблема на узле Flume, источник не может принять данные. Если же падение интенсивности не влечет роста числа ошибок — значит проблема на стороне сервиса, он перестал отправлять сообщения.
flume.channel.{CHANNEL-NAME}.channel-fill-percentage
flume.sink.{SINK-NAME}.event-drain-success-count
Падение интенсивности любого из стоков до нуля говорит о потенциальной проблеме на следующем, принимающем, узле. Как следствие, канал, опустошаемый «поломанным» стоком, начнет заполняться. Также возможна ситуация, при которой принимающие узлы работают нормально, но просто не успевают обрабатывать входные данные — в этом случае графики стоков будут ненулевыми, но каналы будут постепенно заполняться.
Несмотря на то, что набор стандартных компонентов Flume довольно обширен, довольно часто возникают ситуации, разрешить которые этим стандартными компонентами невозможно. В этом случае можно написать свой компонент Flume и задействовать его в узлах. Свою реализацию можно написать для любого из компонентов Flume — sink, source, channel, interceptor и т.п.
Первое, что бросилось в глаза при изучении стоков Flume — это отсутствие гибкого стока для файловой системы. Да, есть File-Roll Sink [9], возможности которого описывались во 2й части цикла [3]. Но этот сток полностью лишен возможности как-либо влиять на имя файла, что не очень удобно.
Мы решили разработать свой сток, позволяющий формировать файлы в локальной файловой системе. При разработке руководствовались следующими соображениями.
Исходя из этих тезисов, мы пришли к выводу, что задачу формирования имени файла лучше оставить клиенту (т.е. сервисам, генерирующим данные), иначе конфигурация стока будет слишком громоздкой, а для каждого нового «клиента» придется добавлять отдельный сток с индивидуальными настройками.
Примечание. Здесь уместно сравнение с HDFS-стоком, о котором мы говорили в предыдущей статье. Этот сток позволяет выполнить очень тонкую настройку ротации и именования файлов. Но эта гибкость настройки имеет и обратную сторону — например, для файлов, ротирующихся раз в 15 и раз в 30 минут приходится делать различные стоки, даже если во всем остальном параметры идентичны.
Итого, решение по функциональности файлового стока было принято следующее:
Схематично процесс обработки данных этим стоком выглядит так:
Что это дало в итоге:
Исходный код файлового стока и пример его конфигурации выложены на GitHub [10]. Подробно разбирать процесс разработки этого стока, я думаю, смысла нет, ограничусь несколькими тезисами:
Общие приемы управления данными мы рассмотрели в предыдущих частях цикла — события можно делить между узлами, дублировать, выбирать «направление движения», используя заголовки и т.п. Попробуем теперь использовать все эти приемы для того, чтобы построить надежную транспортную сеть. Предположим, задача выглядит следующим образом.
На условии 3 остановлюсь подробнее. Предположим, что задача состоит в подсчете уникальных пользователей сайта за последний час. В этом случае мы не можем позволить себе распараллелить поток данных с машин или вычислять это значение отдельно на каждом веб-сервисе — вести подсчет уникальных пользователей по их кукам необходимо на едином потоке данных со всех машин. В противном случае каждый инстанс будет иметь свой набор уникальных пользователей, которые нельзя «взять и сложить» для получения конечного результата.
Примечание. Разумеется, пример немного надуман — эту задачу можно решить и другими, более эффективными способами. Суть примера сводится к ситуации, когда вам необходимо пропускать некоторый поток данных централизовано через один обработчик и разделить нагрузку невозможно из-за особенностей задачи.
Итак, для начала подготовим клиентские и конечные узлы:
Для каждого из веб-сервисов ставим свой, индивидуальный узел — на той же машине, что и веб-сервис. Делается это по следующим причинам:
Остается вопрос — как доставить данные так, чтобы ничего не потерять, если что-то сломается? Ряд мер мы уже предприняли — данные для HDFS и для FS пишутся на несколько машин. При этом данные не дублируются, а делятся. Таким образом, если одна из конечных машин выходит из строя, вся нагрузка пойдет на оставшуюся в живых. Следствием такой поломки будет дизбаланс по записанному объему данных на различных машинах, но с этим вполне можно жить.
Чтобы обеспечить бОльшую стабильность, добавим несколько промежуточных узлов Flume, которые будут заниматься непосредственно распределением данных:
Получилась довольно жуткая паутина. Что здесь происходит:
Если с клиентскими узлами всё относительно просто — они просто делят события между Splitter'ами, то структуру отдельно взятого Splitter'a стоит рассмотреть более детально.
Здесь видно, что конечный пункт для данных определяется с помощью заголовка dist. При этом, события, по которым считаются уникальные пользователи, не зависят от заголовка dist — они ориентируются на заголовок uniq. Это значит, что некоторые события могут быть продублированы в несколько каналов, например HDFS и UNQ.
Ранее я специально не указал направления от Splitter'ов к узлам UNQ. Дело в том, что эти узлы не принимают распределенные данные, как HDFS или FS. Учитывая специфику задачи подсчета уникальных пользователей, весь поток данных должен проходить только через одну машину. Закономерный вопрос — зачем нам тогда 2 узла для подсчета уникальных пользователей? Ответ — потому что если один узел сломается, его будет некому заменить. Как здесь быть — делить события между узлами мы не можем, оставить один — тоже нельзя?
Здесь нам может помочь еще один инструмент Flume, позволяющий работать стокам в группе по принципу «Если сток 1 сломался, используй сток 2». Этот компонент называется Failover Sink Processor [11]. Его конфигурация выглядит следующим образом:
# Сами по себе стоки описываются как обычно
agent.sinks.sink-unq-1.type = avro
agent.sinks.sink-unq-1.batch-size = 5000
agent.sinks.sink-unq-1.channel = memchannel
agent.sinks.sink-unq-1.hostname = unq-counter-1.my-company.com
agent.sinks.sink-unq-1.port = 50001
agent.sinks.sink-unq-2.type = avro
agent.sinks.sink-unq-2.batch-size = 5000
agent.sinks.sink-unq-2.channel = memchannel
agent.sinks.sink-unq-2.hostname = unq-counter-2.my-company.com
agent.sinks.sink-unq-2.port = 50001
# Объединяем их в группу
agent.sinkgroups = failover-group
agent.sinkgroups.failover-group.sinks = sink-unq-1 sink-unq-2
# Тип процессинга указываем как failover
agent.sinkgroups.failover-group.processor.type = failover
# Приоритеты для стоков - сток с высоким значением будет задействован первым
agent.sinkgroups.failover-group.processor.priority.sink-unq-1 = 10
agent.sinkgroups.failover-group.processor.priority.sink-unq-2 = 1
# Как часто проверять - вернулся ли сток в строй (мс)
agent.sinkgroups.failover-group.processor.maxpenalty = 10000
Приведенная выше настройка группы потоков позволяет использовать только один сток, но при этом иметь «запасной» на случай аварии. Т.е. покуда сток с высоким приоритетом исправно работает, стоки с низким приоритетом будут простаивать.
Таким образом, поставленная задача выполнена — данные распределяются между HDFS и FS, счетчики уникальных пользователей работают корректно. При этом выход из строя любой машины не приведет к потере данных:
Для такой схемы задачи масштабирования сводятся к простому изменению конфигурации узлов Flume для соответствующего уровня узлов (Client, Splitter или Final):
На этом мы завершаем цикл статей про Apache Flume. Мы осветили все самые ходовые его компоненты, разобрались как управлять потоком данных и рассмотрели пример полноценной транспортной сети. Тем не менее, возможности Flume не исчерпываются всем этим — в стандартном пакете есть еще довольно много не рассмотренных нами компонентов, которые могут значительно облегчить жизнь при решении определенных задач. Надеемся, что этот цикл статей помог вам познакомиться с Flume и получить достаточно полное представление о нём.
Спасибо за внимание!
Автор: DCA (Data-Centric Alliance)
Источник [12]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/big-data/171529
Ссылки в тексте:
[1] Apache Flume: https://flume.apache.org/
[2] Часть 1: https://habrahabr.ru/company/dca/blog/280386/
[3] Часть 2: https://habrahabr.ru/company/dca/blog/281933/
[4] java.lang.management.ManagementFactory: https://docs.oracle.com/javase/7/docs/api/java/lang/management/ManagementFactory.html
[5] MonitoredCounterGroup: https://flume.apache.org/releases/content/1.4.0/apidocs/org/apache/flume/instrumentation/MonitoredCounterGroup.html
[6] ссылка на исходный код: http://grepcode.com/file/repo1.maven.org/maven2/org.apache.flume/flume-ng-core/1.4.0/org/apache/flume/instrumentation/MonitoredCounterGroup.java
[7] JMXPollUtil: https://flume.apache.org/releases/content/1.4.0/apidocs/org/apache/flume/instrumentation/util/JMXPollUtil.html
[8] MonitorService: https://flume.apache.org/releases/content/1.4.0/apidocs/org/apache/flume/instrumentation/MonitorService.html
[9] File-Roll Sink: https://flume.apache.org/FlumeUserGuide.html#file-roll-sink
[10] выложены на GitHub: https://github.com/yakischik/flume-file-sink/tree/master
[11] Failover Sink Processor: https://flume.apache.org/FlumeUserGuide.html#failover-sink-processor
[12] Источник: https://habrahabr.ru/post/305932/?utm_source=habrahabr&utm_medium=rss&utm_campaign=best
Нажмите здесь для печати.