- PVSM.RU - https://www.pvsm.ru -

Flume — управляем потоками данных. Часть 3

Привет! После долгой паузы мы наконец-то возвращаемся к разбору Apache Flume [1]. В предыдущих статьях мы познакомились с Flume (Часть 1 [2]) и разобрались, как настраивать основные его компоненты (Часть 2 [3]). В этой, заключительной, части цикла мы рассмотрим следующие вопросы:

  • Как настроить мониторинг компонентов узла.
  • Как написать собственную реализацию компонента Flume.
  • Проектирование полноценной транспортной сети.

Flume — управляем потоками данных. Часть 3 - 1

Мониторинг состояния компонентов узла

Итак, мы настроили и запустили все узлы, проверили их работоспособность — данные успешно доставляются до пункта назначения. Но проходит какое-то время, мы смотрим на результат работы нашей транспортной сети (например, папку с файлами, в которые упаковываются данные) и понимаем, что возникла проблема — начиная с какого-то момента новые файлы не появляются в нашей папке. Следующий шаг кажется очевидным — открываем логи, ищем причину. Беда только в том, что узлов в нашей транспортной сети может быть много, а значит необходимо вручную просматривать логи всех узлов, что, мягко говоря, не очень удобно. Когда подобные проблемы возникают, реагировать на них хотелось бы максимально оперативно, а еще лучше — вообще не допускать таких критичных ситуаций.

Компоненты Flume в процессе работы пишут метрики, которые позволяют оценить состояние узла. По значениям этих метрик довольно легко определить, что с узлом не всё в порядке.

Для хранения счетчиков и других атрибутов своих компонентов Flume использует java.lang.management.ManagementFactory [4], регистрируя собственные bean-классы для ведения метрик. Все эти классы унаследованы от MonitoredCounterGroup [5] (для любопытных — ссылка на исходный код [6]).

Если вы не планируете разрабатывать собственные компоненты Flume, то закапываться в механизм ведения метрик совершенно необязательно, достаточно разобраться, как их достать. Сделать это можно довольно просто с помощью утилитарного класса JMXPollUtil [7]:

package ru.test.flume.monitoring;

import java.util.Map;
import org.apache.flume.instrumentation.util.JMXPollUtil;

public class FlumeMetrics {

    public static Map<String, Map<String, String>> getMetrics() {
        Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
        return metricsMap;
    }    
}

В результате вы получите метрики, сгруппированные по компонентам узла, которые выглядят примерно так:

Метрики Flume-компонентов (JSON)

{
  "SOURCE.my-source": {
    "EventReceivedCount": "567393607",
    "AppendBatchAcceptedCount": "5689696",
    "Type": "SOURCE",
    "EventAcceptedCount": "567393607",
    "AppendReceivedCount": "0",
    "StartTime": "1467797931288",
    "AppendAcceptedCount": "0",
    "OpenConnectionCount": "1",
    "AppendBatchReceivedCount": "5689696",
    "StopTime": "0"
  },
  "CHANNEL.my-channel": {
    "ChannelCapacity": "100000000",
    "ChannelFillPercentage": "5.0E-4",
    "Type": "CHANNEL",
    "ChannelSize": "500",
    "EventTakeSuccessCount": "567393374",
    "StartTime": "1467797930967",
    "EventTakeAttemptCount": "569291443",
    "EventPutSuccessCount": "567393607",
    "EventPutAttemptCount": "567393607",
    "StopTime": "0"
  },
  "SINK.my-sink": {
    "ConnectionCreatedCount": "1",
    "ConnectionClosedCount": "0",
    "Type": "SINK",
    "BatchCompleteCount": "2",
    "EventDrainAttemptCount": "567393374",
    "BatchEmptyCount": "959650",
    "StartTime": "1467797930968",
    "EventDrainSuccessCount": "567393374",
    "BatchUnderflowCount": "938419",
    "StopTime": "0",
    "ConnectionFailedCount": "0"
  }
}

Метрики получили, теперь необходимо их куда-то отправить. Здесь можно пойти двумя путями.

  1. Использовать возможности Flume для предоставления метрик.
  2. Написать свою реализацию обработки метрик.

Flume предоставляет API, позволяющей задать способ мониторинга — для этого используются реализации интерфейса MonitorService [8]. Для того, чтобы подключить мониторинг, необходимо указать класс, реализующий MonitorService, в качестве системного свойства при запуске узла (или в коде).

java -Dflume.monitoring.type=org.apache.flume.instrumentation.http.HTTPMetricsServer ...

System.setProperty("flume.monitoring.type", "org.apache.flume.instrumentation.http.HTTPMetricsServer");

Класс HTTPMetricsServer предлагает стандартный способ отслеживания состояния узла. Он представляет собой небольшой web-сервер, который по запросу отдает полный список метрик узла в виде JSON (как в примере выше). Чтобы указать порт, на котором этот сервер будет слушать запросы, достаточно добавить в конфигурацию Flume параметр (по умолчанию использует порт 41414):

flume.monitoring.port = 61509

Запрос к этому серверу выглядит так: localhost:61509/metrics.

Если же такого способа следить за метриками недостаточно, то придется пойти вторым путём и написать собственную реализацию MonitorService. Именно так мы и поступили, чтобы наблюдать за состоянием наших узлов с помощью Graphite. Ниже приведен простой пример такой реализации.

FlumeGraphiteMonitor


package ru.dmp.flume.monitoring;

import com.google.common.base.CaseFormat;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flume.Context;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.util.JMXPollUtil;

public class FlumeGraphiteMonitor implements MonitorService {
   
    // нормализованные имена метрик, которые не нужно отправлять в Graphite
    private static final Set<String> EXCLUDED_METRICS = new HashSet<String>() {{
       add("start-time"); 
       add("stop-time");
    }};
    
    private volatile long period = 60 * 1000;   // интервал отправки, 1 минута
    
    private volatile boolean switcher = true;
    private Thread scheduler = new Thread(this::schedule);

    @Override
    public void configure(Context context) {
         // Здесь можно достать какие-нибудь настройки из файла конфигурации
    }   

    private void schedule() {
        while (switcher) {
            send();
            synchronized (this) {
                try {
                    wait(period);
                } catch (InterruptedException ex) {}
            }
        }
    }
                
    @Override
    public void start() {      
        scheduler.start();
    }

    @Override
    public void stop() {
        switcher = false;
        synchronized (this) {
            notifyAll();
        }
        try {
            scheduler.join();
        } catch (InterruptedException ex) {}
    }
    
    private void send() {
        Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
        for (Map.Entry<String, Map<String, String>> e: metricsMap.entrySet()) {
            if (e.getValue() != null) {
                // все метрики от узлов Flume начинаем с префикса "flume"
                String group = "flume." + normalize(e.getKey().toLowerCase()) + ".";
                for (Map.Entry<String, String> metric : e.getValue().entrySet()) {
                    try {
                        Double value = Double.valueOf(metric.getValue());
                        String metricName = normalize(metric.getKey());
                        if (!EXCLUDED_METRICS.contains(metricName)) {
                            String fullName = group + normalize(metric.getKey());
                            // Отправляем данные в графит или куда-то еще
                            // Graphite.send(metricName, value);
                        }
                    } catch (NumberFormatException ex) {
                        // так отсеиваем значения, не являющиеся числом
                    }
                }
            }
        }
    }
    
    // приводим к виду EventReceivedCount -> event-received-count (необязательно)
    private static String normalize(String str) {
        return CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, str).replaceAll("_", "-");
    } 

В итоге получаем аккуратную ветку Graphite со всеми метриками узла.

Flume — управляем потоками данных. Часть 3 - 2

Ниже приведены описания графиков и метрик, которые мы используем для одного из наших сервисов.

  1. Интенсивность отправки сервисом сообщений на узел Flume. График строится не по метрикам узла — эти значения в Graphite отправляют сервисы, которые генерируют данные и являются отправной точкой нашей транспортной системы. Если ваши источники данных не позволяют отслеживать отправку данных во Flume, то похожие графики можно снять с источника(-ов) узла.

    Если значение на этом графике падает до нуля, значит клиент по каким-то причинам не может отправить сообщения во Flume. Чтобы диагностировать, кто виноват в таких ситуациях, мы отдельно отображаем график ошибок, возникающих на стороне клиента. Соответственно, если он отличен от нуля — проблема на узле Flume, источник не может принять данные. Если же падение интенсивности не влечет роста числа ошибок — значит проблема на стороне сервиса, он перестал отправлять сообщения.

    Flume — управляем потоками данных. Часть 3 - 3

  2. Заполненность каналов узла. С этим графиком всё просто — он всегда должен быть очень близок к нулевому значению. Если канал не успевает опустошаться, значит где-то в нашей транспортной сети возникло узкое место и необходимо искать узлы, которые не успевают справляться с нагрузкой. Метрика на графике:
    flume.channel.{CHANNEL-NAME}.channel-fill-percentage

    Flume — управляем потоками данных. Часть 3 - 4

  3. Интенсивность работы стоков узла. Ожидаемые показатели стоков на этом узле — «сколько получили, столько и отправили», поскольку события от сервисов не дублируются в каналы. Таким образом, интенсивность опустошения стоков должна быть такой же, как и интенсивность отправки данных клиентами. Метрика на графике:
    flume.sink.{SINK-NAME}.event-drain-success-count

    Падение интенсивности любого из стоков до нуля говорит о потенциальной проблеме на следующем, принимающем, узле. Как следствие, канал, опустошаемый «поломанным» стоком, начнет заполняться. Также возможна ситуация, при которой принимающие узлы работают нормально, но просто не успевают обрабатывать входные данные — в этом случае графики стоков будут ненулевыми, но каналы будут постепенно заполняться.

    Flume — управляем потоками данных. Часть 3 - 5

Создание собственных компонентов Flume

Несмотря на то, что набор стандартных компонентов Flume довольно обширен, довольно часто возникают ситуации, разрешить которые этим стандартными компонентами невозможно. В этом случае можно написать свой компонент Flume и задействовать его в узлах. Свою реализацию можно написать для любого из компонентов Flume — sink, source, channel, interceptor и т.п.

Первое, что бросилось в глаза при изучении стоков Flume — это отсутствие гибкого стока для файловой системы. Да, есть File-Roll Sink [9], возможности которого описывались во 2й части цикла [3]. Но этот сток полностью лишен возможности как-либо влиять на имя файла, что не очень удобно.

Мы решили разработать свой сток, позволяющий формировать файлы в локальной файловой системе. При разработке руководствовались следующими соображениями.

  • У нас довольно много сервисов со сравнительно небольшой нагрузкой. Это значит, что в итоге у нас будет довольно много разнородных файлов — не хотелось бы под каждый из них настраивать отдельный сток.
  • Файлы должны ротироваться по времени. Причем, для различных данных период ротации может отличаться (под ротацией имеется ввиду «нарезка данных» на файлы по времени — 15 минут, час и т.п.).
  • Данные от каждого сервиса должны складироваться в отдельную папку. Причем, один сервис может генерировать данные для нескольких подпапок.

Исходя из этих тезисов, мы пришли к выводу, что задачу формирования имени файла лучше оставить клиенту (т.е. сервисам, генерирующим данные), иначе конфигурация стока будет слишком громоздкой, а для каждого нового «клиента» придется добавлять отдельный сток с индивидуальными настройками.

Примечание. Здесь уместно сравнение с HDFS-стоком, о котором мы говорили в предыдущей статье. Этот сток позволяет выполнить очень тонкую настройку ротации и именования файлов. Но эта гибкость настройки имеет и обратную сторону — например, для файлов, ротирующихся раз в 15 и раз в 30 минут приходится делать различные стоки, даже если во всем остальном параметры идентичны.

Итого, решение по функциональности файлового стока было принято следующее:

  • Имя файла, в который должны быть записаны данные, определяет клиент и передает его в заголовке вместе с событием.
  • В имени файла могут быть указаны подпапки.
  • Файлы, в которые ведется запись стоком, закрываются по некоторому таймауту, когда для них перестают приходить события.

Схематично процесс обработки данных этим стоком выглядит так:

Flume — управляем потоками данных. Часть 3 - 6

Что это дало в итоге:

  • Не требуется добавлять сток для каждого нового сервиса или типа данных.
  • Сток лишен издержек на формирование имени файла (в предыдущей части мы рассматривали эти издержки на примере HDFS-стока)
  • Поскольку имя файла однозначно идентифицируется одним заголовком, мы можем пользоваться группировкой событий на стороне клиента (этот прием также описан во второй части цикла).

Исходный код файлового стока и пример его конфигурации выложены на GitHub [10]. Подробно разбирать процесс разработки этого стока, я думаю, смысла нет, ограничусь несколькими тезисами:

  • За основу компонента берется либо абстрактный класс, либо интерфейс компонента (в зависимости от того, что вы разрабатываете — сток, перехватчик или что-то еще).
  • Делаем собственную реализацию — проще всего взять что-нибудь из готовых Flume-компонентов в качестве примера, благо все исходники доступны.
  • При конфигурации указываем не зарезервированный псевдоним компонента (типа 'avro' или 'logger'), а имя класса целиком.

Проектируем транспортную сеть

Общие приемы управления данными мы рассмотрели в предыдущих частях цикла — события можно делить между узлами, дублировать, выбирать «направление движения», используя заголовки и т.п. Попробуем теперь использовать все эти приемы для того, чтобы построить надежную транспортную сеть. Предположим, задача выглядит следующим образом.

  1. Поставщик данных — некий сервис, работающий на нескольких машинах (имеет несколько одинаковых инстансов).
  2. Данные, генерируемые сервисом, разнородны — часть из них нужно доставить в HDFS, часть — в файловую систему на некий лог-сервер.
  3. Необходимо в режиме реального времени вести некоторые неатомарные вычисления, связанные с данными.

На условии 3 остановлюсь подробнее. Предположим, что задача состоит в подсчете уникальных пользователей сайта за последний час. В этом случае мы не можем позволить себе распараллелить поток данных с машин или вычислять это значение отдельно на каждом веб-сервисе — вести подсчет уникальных пользователей по их кукам необходимо на едином потоке данных со всех машин. В противном случае каждый инстанс будет иметь свой набор уникальных пользователей, которые нельзя «взять и сложить» для получения конечного результата.

Примечание. Разумеется, пример немного надуман — эту задачу можно решить и другими, более эффективными способами. Суть примера сводится к ситуации, когда вам необходимо пропускать некоторый поток данных централизовано через один обработчик и разделить нагрузку невозможно из-за особенностей задачи.

Итак, для начала подготовим клиентские и конечные узлы:

Flume — управляем потоками данных. Часть 3 - 7

Для каждого из веб-сервисов ставим свой, индивидуальный узел — на той же машине, что и веб-сервис. Делается это по следующим причинам:

  • Эти узлы играют роль буфера — если по какой-то причине доставка событий на другие машины станет невозможна, эти узлы позволят некоторое время «продержаться» без потерь данных за счет толстого файлового канала.
  • Уменьшается время отклика. Разумеется, отправка данных во Flume должна выполняться асинхронно — но при пиковых нагрузках или забитой сети может возникнуть ситуация, когда фоновый поток не будет успевать отправлять новые события. В этом случае очередь на отправку может сильно вырасти, безжалостно поглощая память сервиса, что не очень хорошо. В случае, когда узел расположен на той же машине, что и сервис, эти издержки значительно снижаются.
  • Если дальнейшая логика обработки данных изменится и вы решите перестроить транспортную сеть, то изменения нужно будет внести только в конфигурацию клиентского узла, а не веб-сервиса. Для него все останется по-прежнему — «я отправляю данные на свой узел, дальше пусть сам решает, как быть».

Остается вопрос — как доставить данные так, чтобы ничего не потерять, если что-то сломается? Ряд мер мы уже предприняли — данные для HDFS и для FS пишутся на несколько машин. При этом данные не дублируются, а делятся. Таким образом, если одна из конечных машин выходит из строя, вся нагрузка пойдет на оставшуюся в живых. Следствием такой поломки будет дизбаланс по записанному объему данных на различных машинах, но с этим вполне можно жить.

Чтобы обеспечить бОльшую стабильность, добавим несколько промежуточных узлов Flume, которые будут заниматься непосредственно распределением данных:

Flume — управляем потоками данных. Часть 3 - 8

Получилась довольно жуткая паутина. Что здесь происходит:

  1. Веб-сервис отправляет события на клиентский узел.
  2. Каждое событие имеет заголовок, который указывает «пункт назначения» (например dist=FS или dist=HDFS), а также заголовок uniq с возможными значениями 1 или 0.
  3. Каждый клиентский узел имеет по 3 стока, которые равноправно опустошают канал и равномерно распределяют события между тремя промежуточными узлами — Splitter'ами (пока что без оглядки на заголовок dist).
  4. Каждый Splitter имеет несколько каналов — для HDFS, FS и счетчика уникальных пользователей. Необходимый канал выбирается по заголовкам dist и uniq.
  5. Каждый из этих каналов на Splitter'e имеет несколько стоков, которые равномерно распределяют события между конечными машинами (FS, HDFS или UNQ).

Если с клиентскими узлами всё относительно просто — они просто делят события между Splitter'ами, то структуру отдельно взятого Splitter'a стоит рассмотреть более детально.

Flume — управляем потоками данных. Часть 3 - 9

Здесь видно, что конечный пункт для данных определяется с помощью заголовка dist. При этом, события, по которым считаются уникальные пользователи, не зависят от заголовка dist — они ориентируются на заголовок uniq. Это значит, что некоторые события могут быть продублированы в несколько каналов, например HDFS и UNQ.

Ранее я специально не указал направления от Splitter'ов к узлам UNQ. Дело в том, что эти узлы не принимают распределенные данные, как HDFS или FS. Учитывая специфику задачи подсчета уникальных пользователей, весь поток данных должен проходить только через одну машину. Закономерный вопрос — зачем нам тогда 2 узла для подсчета уникальных пользователей? Ответ — потому что если один узел сломается, его будет некому заменить. Как здесь быть — делить события между узлами мы не можем, оставить один — тоже нельзя?

Здесь нам может помочь еще один инструмент Flume, позволяющий работать стокам в группе по принципу «Если сток 1 сломался, используй сток 2». Этот компонент называется Failover Sink Processor [11]. Его конфигурация выглядит следующим образом:

# Сами по себе стоки описываются как обычно
agent.sinks.sink-unq-1.type = avro
agent.sinks.sink-unq-1.batch-size = 5000
agent.sinks.sink-unq-1.channel = memchannel
agent.sinks.sink-unq-1.hostname = unq-counter-1.my-company.com
agent.sinks.sink-unq-1.port = 50001

agent.sinks.sink-unq-2.type = avro
agent.sinks.sink-unq-2.batch-size = 5000
agent.sinks.sink-unq-2.channel = memchannel
agent.sinks.sink-unq-2.hostname = unq-counter-2.my-company.com
agent.sinks.sink-unq-2.port = 50001

# Объединяем их в группу
agent.sinkgroups = failover-group
agent.sinkgroups.failover-group.sinks = sink-unq-1 sink-unq-2
# Тип процессинга указываем как failover
agent.sinkgroups.failover-group.processor.type = failover
# Приоритеты для стоков - сток с высоким значением будет задействован первым
agent.sinkgroups.failover-group.processor.priority.sink-unq-1 = 10
agent.sinkgroups.failover-group.processor.priority.sink-unq-2 = 1
# Как часто проверять - вернулся ли сток в строй (мс)
agent.sinkgroups.failover-group.processor.maxpenalty = 10000

Приведенная выше настройка группы потоков позволяет использовать только один сток, но при этом иметь «запасной» на случай аварии. Т.е. покуда сток с высоким приоритетом исправно работает, стоки с низким приоритетом будут простаивать.

Таким образом, поставленная задача выполнена — данные распределяются между HDFS и FS, счетчики уникальных пользователей работают корректно. При этом выход из строя любой машины не приведет к потере данных:

  1. Если ломается машина c Web-сервисом, то эта проблема решается балансером.
  2. Если из строя вышел один из Splitter'ов, нагрузка будет распределена между остальными.
  3. Конечные узлы также задублированы, поломка одного из них не приведет к застою или потере данных.
  4. Узел для подсчета уникальных пользователей имеет «дублёра» и в случае поломки будет заменен им без нарушения логики обработки данных.

Для такой схемы задачи масштабирования сводятся к простому изменению конфигурации узлов Flume для соответствующего уровня узлов (Client, Splitter или Final):

  1. Новый инстанс Web-сервиса — не требуется изменения конфигурации, он просто устанавливается вместе с клиентским узлом Flume.
  2. Новый Splitter — требуется изменить только конфигурацию клиентских узлов, добавив новый сток.
  3. Новый конечный узел — требуется изменить только конфигурацию Splitter'a, добавив новый сток.

Заключение

На этом мы завершаем цикл статей про Apache Flume. Мы осветили все самые ходовые его компоненты, разобрались как управлять потоком данных и рассмотрели пример полноценной транспортной сети. Тем не менее, возможности Flume не исчерпываются всем этим — в стандартном пакете есть еще довольно много не рассмотренных нами компонентов, которые могут значительно облегчить жизнь при решении определенных задач. Надеемся, что этот цикл статей помог вам познакомиться с Flume и получить достаточно полное представление о нём.

Спасибо за внимание!

Автор: DCA (Data-Centric Alliance)

Источник [12]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/big-data/171529

Ссылки в тексте:

[1] Apache Flume: https://flume.apache.org/

[2] Часть 1: https://habrahabr.ru/company/dca/blog/280386/

[3] Часть 2: https://habrahabr.ru/company/dca/blog/281933/

[4] java.lang.management.ManagementFactory: https://docs.oracle.com/javase/7/docs/api/java/lang/management/ManagementFactory.html

[5] MonitoredCounterGroup: https://flume.apache.org/releases/content/1.4.0/apidocs/org/apache/flume/instrumentation/MonitoredCounterGroup.html

[6] ссылка на исходный код: http://grepcode.com/file/repo1.maven.org/maven2/org.apache.flume/flume-ng-core/1.4.0/org/apache/flume/instrumentation/MonitoredCounterGroup.java

[7] JMXPollUtil: https://flume.apache.org/releases/content/1.4.0/apidocs/org/apache/flume/instrumentation/util/JMXPollUtil.html

[8] MonitorService: https://flume.apache.org/releases/content/1.4.0/apidocs/org/apache/flume/instrumentation/MonitorService.html

[9] File-Roll Sink: https://flume.apache.org/FlumeUserGuide.html#file-roll-sink

[10] выложены на GitHub: https://github.com/yakischik/flume-file-sink/tree/master

[11] Failover Sink Processor: https://flume.apache.org/FlumeUserGuide.html#failover-sink-processor

[12] Источник: https://habrahabr.ru/post/305932/?utm_source=habrahabr&utm_medium=rss&utm_campaign=best