- PVSM.RU - https://www.pvsm.ru -
Привет! Мы продолжаем цикл статей, посвященный Apache Flume [1]. В предыдущей части [2] мы поверхностно рассмотрели этот инструмент, разобрались с тем, как его настраивать и запускать. В этот раз статья будет посвящена ключевым компонентам Flume, с помощью которых не страшно манипулировать уже настоящими данными.
В прошлой статье мы рассмотрели Memory Channel. Очевидно, что канал, использующий для хранения данных память, не является надежным. Перезапуск узла приведет к тому, что все данные, хранящиеся в канале, будут потеряны. Это не делает Memory Channel бесполезным, есть некоторые случаи, когда его использование очень даже оправдано в силу быстродействия. Однако для действительно надежной транспортной системы необходимо более устойчивое к неполадкам решение.
Таким решением является файловый канал — File Channel. Несложно догадаться, что этот канал хранит данные в файлах. При этом канал использует Random Access для работы с файлом, позволяя таким образом и добавлять и забирать события, сохраняя их последовательность. Для быстрой навигации канал использует систему меток (checkpoints), с помощью которых реализуется механизм WAL. Всё это, в общем-то, спрятано «под капотом» канала, а для его настройки используются следующие параметры (жирным шрифтом — обязательные параметры).
Параметр | Описание | По умолчанию |
---|---|---|
type |
Реализация канала, должно быть указано file | - |
checkpointDir |
Папка для хранения файлов с метками. Если не указана, канал будет использовать домашнюю папку Flume. |
$HOME/... |
useDualCheckpoints |
Делать ли бекап папки с метками. |
false |
backupCheckpointDir |
Папка для бекапов файлов с метками, нужно обязательно указывать, если useDualCheckpoints=true (разумеется, этот бекап лучше держать подальше от оригинала — например, на другом диске). |
- |
dataDirs |
Список папок через запятую, в которых будут размещаться файлы с данными. Лучше указывать несколько папок на различных дисках для повышения производительности. Если папки не указаны, канал также будет использовать домашнюю папку Flume. |
$HOME/... |
capacity |
Вместимость канала, указывается число событий. |
1000000 |
transactionCapacity |
Максимальное число событий в одной транзакции. Очень важный параметр, от которого может зависеть работоспособность всей транспортной системы. Подробнее об этом будет написано ниже. |
10000 |
checkpointInterval |
Интервал между созданием новых меток, в миллисекуднах. Метки играют важную роль при перезапуске, позволяя «перепрыгивать» участки файлов с данными при восстановлении состояния канала. В итоге канал не перечитывает файлы с данными целиком, что существенно ускоряет запуск при «забитом» канале. |
30000 |
checkpointOnClose |
Записывать ли метку при закрытии канала. Замыкающая метка позволит каналу восстановиться при перезапуске максимально быстро — но её создание займет некоторое время при закрытии канала (на самом деле, очень незначительное). |
true |
keep-alive |
Таймаут (в секундах) для операции добавления в канал. Т.е., если канал забит, транзакция «даст ему шанс», выждав некоторое время. И если свободного места в канале так и не появилось, то транзакция откатится. | 3 |
maxFileSize |
Максимальный размер файла канала, в байтах. Значение этого параметра не определяет, сколько места может «откусить» ваш канал — оно задает размер одного файла с данными, а этих файлов канал может создать несколько. | 2146435071 (2ГБ) |
minimumRequiredSpace |
Если на вашем диске меньше свободного места, чем указано в этом параметре, то канал не будет принимать новые события. В случае, если папки с данными расположены на нескольких дисках, Flume будет использовать | 524288000 (500МБ) |
Остальные настройки [3] относятся к шифрованию данных в файлах канала и процессу восстановления (replay). Теперь пара слов о том, что нужно учитывать при работе с файловым каналом.
Если выполнить нехитрое деление, то получим, что узел Flume с файловым каналом на SSD может переваривать до 500/0.025 = 20000 событий в секунду (для справки — размер сообщений в данном примере около 1КБ, а канал использует для хранения только один диск).
В качестве альтернативы File-Channel Flume предлагает еще несколько каналов — в частности, JDBC-channel [4], использующий в качестве буфера базу данных, и Kafka-channel [5]. Разумеется, что для использования таких каналов нужно отдельно разворачивать базу данных и Kafka.
Avro — это один из инструментов сериализации данных [6], благодаря которому источник и сток получили свои названия. Сетевое взаимодействие этих компонентов реализовано с помощью Netty. В сравнении с Netcat Source, рассмотренным в предыдущей статье, Avro Source обладает следующими преимуществами:
Итак, рассмотрим настройки, которые нам предлагает Avro Source.
Параметр | Описание | По умолчанию |
type |
Реализация источника, должно быть указано avro. | - |
channels |
Каналы, в которые источник будет отправлять события (через пробел). | - |
bind |
Хост/IP, за которым закрепляем источник. | - |
port |
Порт, на котором источник будет принимать подключения от клиентов. | - |
threads |
Число потоков, обрабатывающих входящие события (I/O workers). При выборе значения следует ориентироваться на число потенциальных клиентов, которые будут слать события этому источнику. Необходимо выставлять как минимум 2 потока, иначе ваш источник может попросту «зависнуть», даже если клиент у него всего один. Если не уверены, сколько потоков необходимо — не указывайте этот параметр в конфигурации. | не ограничено |
compression-type |
Сжатие данных, здесь вариантов немного — либо none, либо deflate. Указывать необходимо только в том случае, если клиент передает данные в сжатом виде. Сжатие поможет вам существенно сэкономить трафик, и чем больше событий за раз вы передаете — тем существеннее будет эта экономия. |
none |
Как и для любого другого источника, для Avro Source можно указать:
Также для этого источника предусмотрена настройка фильтров Netty и параметры шифрования данных [7]. Для отправки событий этому источнику можно использовать вот такой код.
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.event.SimpleEvent;
public class FlumeSender {
public static void main(String[] args) throws EventDeliveryException {
RpcClient avroClient = RpcClientFactory.getDefaultInstance("127.0.0.1", 50001);
Map<String, String> headers = new HashMap<>();
headers.put("type", "common");
Event event = EventBuilder.withBody("Тело события".getBytes(), headers);
avroClient.append(event);
avroClient.close();
}
}
Теперь рассмотрим конфигурацию Avro-стока.
Параметр | Описание | По умолчанию |
---|---|---|
type |
Реализация стока, должно быть указано avro. | - |
channel |
Канал, из которого сток будет вытягивать события. | - |
hostname |
Хост/IP, на который сток будет отправлять события. | - |
port |
Порт, на котором указанная машина (hostname) ожидает подключения клиентов. | - |
batch-size |
очень важный параметр: размер «пачки» событий, отправляемых клиенту за один запрос. В то же время, это же значение используется при опустошении канала. Т.е., это еще и число событий, считываемых из канала за одну транзакцию. |
100 |
connect-timeout |
Таймаут соединения (handshake), в миллисекундах. |
20000 |
request-timeout |
Таймаут запроса (отправки пачки событий), в миллисекундах. |
20000 |
reset-connection-interval |
Интервал «смены хоста». Подразумевается, что за указанным hostname может скрываться несколько машин, обслуживаемых балансером. Этот параметр принудительно заставляет сток переключаться между машинами через указанный интервал времени. Удобство, по замыслу создателей стока, заключается в том, что если в зону ответственности балансера добавляется новая машина, отсутствует необходимость перезапускать узел Flume — сток сам сообразит, что появился еще один «пункт назначения». По умолчанию сток не осуществляет смены хостов. |
-1 |
maxIoWorkers |
Аналог threads для Avro Source. |
2 * PROC_CORES |
compression-type |
То же самое, что и для Avro Source. Разница в том, что сток сжимает данные, а источник, напротив, распаковывает. Соответственно, если Avro Sink шлет события на Avro Source, тип сжатия на обоих должен быть одинаковый. |
none |
compression-level |
Уровень сжатия, только если compression-type=deflate (0 — не сжимать, 9 — максимальное сжатие). |
6 |
Теперь поговорим о том, что важно учитывать при настройке этих компонентов.
Как я уже говорил, это очень важный параметр, непродуманный выбор которого может значительно подпортить вам жизнь. Прежде всего, batch-size обязательно должен быть меньше или равен вместимости транзакции канала (transactionCapacity). Это явно касается Avro Sink и неявно — Avro Source. Рассмотрим на примере:
Здесь TC — это transactionCapacity, а BS — batch-size. Условие нормальной работы заключается в том, что: BS <= TC1 и BS <= TC2. То есть, необходимо учитывать не только вместимость транзакции канала, с которым работает сток, но вместимость транзакции канала (-ов), с которым работает принимающий Avro Source. В противном случае сток не сможет опустошать свой канал, а источник — добавлять события в свой. В таких случаях Flume начинает интенсивно лить в лог сообщения об ошибках.
Случай из практики. В одном из стоков мы как-то поставили batch-size = 10000, в то время как на принимающем узле для канала была выставлена TC = 5000. И всё работало замечательно. Пока объём данных был небольшим, сток попросту не вытягивал из канала позволенные 10000 событий за раз — в канале не успевало накопиться столько событий. Но спустя некоторое время объем данных увеличился и у нас начались проблемы. Принимающий узел начал отклонять большие пачки данных. Ошибку вовремя заметили, изменили параметры и скопившиеся в канале данные озорным ручейком дотекли до места назначения.
В случае, если вы запускаете Flume средствами Java, переопределить зависимость можно средствами Maven. Если же вы настраиваете Flume средствами Cloudera или в виде сервиса [8], то зависимость Netty придётся менять вручную. Найти их можно в следующих папках:
Итак, мы разобрались, как настроить транспортные узлы на основе Avro Source/Sink и файлового канала. Осталось теперь разобраться с компонентами, которые замыкают (т.е. выводят данные из зоны ответственности Flume) нашу транспортную сеть.
Первый замыкающий сток, который стоит рассмотреть, это File-Roll Sink. Я бы сказал, что это сток для ленивых. Он поддерживает минимум настроек и может делать только одну вещь — записывать события в файлы.
Параметр | Описание | По умолчанию |
---|---|---|
type |
Реализация стока, должно быть указано file_roll. | - |
channel |
Канал, из которого сток будет вытягивать события. | - |
directory |
Папка, в которой будут храниться файлы. | - |
rollInterval |
Интервал между созданием новых файлов (0 — писать всё в один файл), в секундах. |
30 |
serializer |
Сериализация событий. Можно указать: TEXT, HEADER_AND_TEXT, AVRO_EVENT или свой класс, реализующий интерфейс EventSerializer.Builder. |
TEXT |
batch-size |
Аналогично Avro Sink, размер пачки событий, забираемых за транзакцию с канала. |
100 |
Почему я считаю его стоком для ленивых? Потому что в нем абсолютно ничего нельзя настроить. Ни сжатия, ни наименоваия файлов (в качестве имени будет использован timestamp создания), ни группировки по подпапкам — ничего. Даже размер файла ограничить нельзя. Этот сток подходит, пожалуй, только для случаев, когда «нет времени объяснять — нам нужно срочно начать принимать данные!».
Примечание. Поскольку необходимость записывать данные в файлы всё-таки имеется, мы пришли к выводу, что целесообразнее реализовать свой файловый сток, чем использовать этот. Учитывая, что все исходники Flume открыты, сделать его оказалось несложно, мы уложились за день. На второй день поправили мелкие баги — и сток уже больше года исправно работает, раскладывая данные по папкам в аккуратные архивы. Этот сток я выложу на GitHub после третьей части цикла.
Этот сток уже посерьезней — он поддерживает уйму настроек. Немного удивительно, что File-Roll Sink не сделан аналогичным образом.
Параметр | Описание | По умолчанию |
---|---|---|
type |
Реализация стока, должно быть указано hdfs. | - |
channel |
Канал, из которого сток будет вытягивать события. | - |
hdfs.path |
Папка, в которую будут записываться файлы. Убедитесь, что для этой папки выставлены нужные права доступа. Если вы настраиваете сток средствами Cloudera, то данные будут писаться от имени пользователя flume. | - |
hdfs.filePrefix |
Префикс имени файла. Базовое имя файла, как и для File-Roll — timestamp его создания. Соответстенно, если вы укажете my-data, итоговое имя файла будет my-data1476318264182. |
FlumeData |
hdfs.fileSuffix |
Постфикс имени файла. Добавляется в конец имени файла. Можно использовать, чтобы указать расширение, например, .gz. | - |
hdfs.inUsePrefix |
Аналогично filePrefix, но для временного файла, в который еще ведется запись данных. | - |
hdfs.inUseSuffix |
Аналогично fileSuffix, но для временного файла. По сути, временное расширение. |
.tmp |
hdfs.rollInterval |
Период создания новых файлов, в секундах. Если файлы не нужно закрывать по такому критерию, ставим 0. |
30 |
hdfs.rollSize |
Триггер для закрытия файлов по объему, указывается в байтах. Также ставим 0, если этот критерий нам не подходит. |
1024 |
hdfs.rollCount |
Триггер для закрытия файлов по числу событий. Также можно поставить 0. |
10 |
hdfs.idleTimeout |
Триггер для закрытия файлов из-за неактивности, в секундах. То есть, если в файл некоторое время ничего не записывается — он закрывается. Этот триггер по умолчанию отключен. |
0 |
hdfs.batchSize |
То же самое, что и для других стоков. Хотя в документации к стоку написано, что это число событий, записываемых в файл, прежде чем они будут сброшены в HDFS. При выборе также ориентируемся на объем транзакции канала. |
100 |
hdfs.fileType |
Тип файла — SequenceFile (Hadoop-файл с парами ключ-значение, как правило, в качестве ключа используется timestamp из хидера «timestamp» или текущее время), DataStream (текстовые данные, по сути, построчная запись с указанной сериализацией, как в File-Roll Sink) или CompressedStream (аналог DataStream, но с сжатием). |
SequenceFile |
hdfs.writeFormat |
Формат записи — Text или Writable. Только для SequenceFile. Отличие — в качестве значения будет писаться либо текст (TextWritable) или байты (BytesWritable). |
5000 |
serializer |
Настраивается для DataStream и CompressedStream, по аналогии с File-Roll Sink. |
TEXT |
hdfs.codeC |
Этот параметр необходимо указывать, если вы используете тип файла CompressedStream. Предлагаются такие варианты сжатия: gzip, bzip2, lzo, lzop, snappy. | - |
hdfs.maxOpenFiles |
Максимально допустимое число одновременно открытых файлов. Если этот порог будет превышен, то наиболее старый файл будет закрыт. |
5000 |
hdfs.minBlockReplicas |
Важный параметр. Минимальное число реплик на блок HDFS. Если не указан, берется из конфигурации Hadoop, указанной в classpath при запуске (т.е. настроек вашего кластера). Честно говоря, я не могу объяснить причину поведения Flume, связанного с этим параметром. Суть в том, что если значение этого параметра отличается от 1, то сток начнет закрывать файлы без оглядки на другие триггеры и в рекордные сроки наплодит уйму мелких файлов. | - |
hdfs.maxOpenFiles |
Максимально допустимое число одновременно открытых файлов. Если этот порог будет превышен, то наиболее старый файл будет закрыт. |
5000 |
hdfs.callTimeout |
Таймаут обращения к HDFS (открыть/закрыть файл, сбросить данные), в миллисекундах. |
10000 |
hdfs.closeTries |
Число попыток закрыть файл (если с первого раза не получилось). 0 — пытаться до победного конца. |
0 |
hdfs.retryInterval |
Как часто пытаться закрыть файл в случае неудачи, в секундах. |
180 |
hdfs.threadsPoolSize |
Число потоков, осуществляющих IO операции с HDFS. Если у вас «солянка» из событий, которые расфасовываются по многим файлам, то лучше поставить это число побольше. |
10 |
hdfs.rollTimerPoolSize |
В отличии от предыдущего пула, этот пул потоков выполняет задачи по расписнию (закрывает файлы). Причем, он работает на основе двух параметров — rollInterval и retryInterval. Т.е. этот пул выполняет как плановое закрытие по триггеру, так и периодические повторные попытки закрыть файл. Одного потока должно быть достаточно. |
1 |
hdfs.useLocalTimeStamp |
HDFS сток предполагает использование элементов даты в назании формируемых файлов (например, hdfs.path = /logs/%Y-%m-%d позволит вам группировать файлы по дням). Использование даты предполагает, что она откуда-то должна быть получена. Этот параметр предлагает два варианта: использовать время на момент обработки события (true) или использовать время, указанное в событии — а именно, в заголовке «timestamp» (false). Если вы используете timestamp события, то убедитесь, что ваши собтия имеют этот заголовок. Иначе не будут записаны в HDFS. |
false |
hdfs.round |
Округлять timestamp до некоторого значения. |
false |
hdfs.roundValue |
Насколько округлять timestamp. |
1 |
hdfs.roundUnit |
В каких единицах округлять (second,minute или hour). |
second |
Вот такой огромный перечень настроек для HDFS-стока. Этот сток позволяет нарезать данные в файлы практически как угодно — особенно приятно то, что можно использовать элементы даты. Официальная документация по этому стоку находится на всё той же странице [9].
Возможно вы заметили интересную особенность конфигурации HDFS-стока — здесь нет параметра, указывающего адрес HDFS. Создатели стока предполагают, что данный сток должен быть использован на тех же машинах, что и HDFS.
Итак, что же необходимо учитывать при настройке этого стока.
hdfs.path = /logs/%{dir} hdfs.filePrefix = %{src}/%Y-%m-%d/%Y-%m-%d-%H-%M-%S.%{host}.%{src}
Здесь dir и src — значения заголовков событий с соотв. ключами. Результирующий файл будет иметь вид /logs/web/my-source/2016-04-15/2016-04-15-12-00-00.my-host.my-source.gz. На моем компьютере генерация этого имени для 1 млн. событий занимает почти 20 секунд! Т.е. для 10000 событий это займет примерно 200мс. Делаем вывод: если вы претендуете на скорость записи 10000 событий в секунду, будьте готовы отдать 20% времени на генерацию имени файла. Это ужасно. Вылечить это можно, взяв на себя ответственность за генерацию имени файла на стороне клиента. Да, для этого придется написать немного кода, но зато можно будет изменить настройки стока на вот такие:
hdfs.path = /logs hdfs.filePrefix = %{file-name}
Передавая сформированное имя файла в заголовке file-name вы сэкономите ресурсы и время. Формирование пути файла по таким заголовком занимает уже не 20 секунд, а 500-600 миллисекунд для 1 млн. событий. Т.е., почти в 40 раз быстрее.
Объединяя события в соотношении хотя бы 5 к 1 вы уже получите существенный прирост производительности. Естественно, здесь нужно быть осторожным — если события на клиенте генерируются по одному, то наполнение буфера для объединения событий может занять некоторое время. Всё это время события будут храниться в памяти, ожидая формирования группы для объединения. А значит повышаются шансы потерять данные. Резюме:
Примечание. На самом деле при принятии такого решения стоит задуматься — ведь каждый сток будет писать однородные данные в свой файл. В результате вы можете получить кучу мелких файлов на HDFS. Решение должно быть взвешенным — если объем данных невелик, то можно ограничиться одним узлом Flume для записи в HDFS. Это так называемая консолидация данных [10] — когда данные из множества источников в итоге попадают на один сток. Однако если данные «текут рекой», то одного узла может быть недостаточно. Подробнее о проектировании всей транспортной сети мы поговорим в следующей статье этого цикла.
Я много раз упоминал эти таинственные перехватчики, пожалуй теперь самое время рассказать о том, что это такое. Перехватчики — это обработчики событий, которые работают на этапе между получением событий на источнике и отправкой их в канал. Перехватчики могут преобразовывать события, изменять их или фильтровать.
Flume предоставляет по умолчанию множество перехватчиков [11], позволяющих:
# ============================ Avro-источник с перехватчиками ============================ #
# Обязательные параметры для Vvro-источника
my-agent.sources.avro-source.type = avro
my-agent.sources.avro-source.bind = 0.0.0.0
my-agent.sources.avro-source.port = 50001
my-agent.sources.avro-source.channels = my-agent-channel
# Добавляем к источнику перехватчики, указываем их названия (названия значения не имеют)
my-agent.sources.avro-source.interceptors = ts directory host replace group-replace filter extractor
# ------------------------------------------------------------------------------ #
# Первый перехватчик добавляет статичный заголовок ко всем событиям.
# Наименование заголовка будет "dir", а значение — "test-folder".
my-agent.sources.avro-source.interceptors.directory.type = static
my-agent.sources.avro-source.interceptors.directory.key = dir
my-agent.sources.avro-source.interceptors.directory.value = test-folder
# Если такой заголовок уже есть — сохранить имеющийся (по умолчанию — false)
my-agent.sources.avro-source.interceptors.directory.preserveExisting = true
# ------------------------------------------------------------------------------ #
# Второй перехватчик добавляет заголовок "timestamp" ко всем событиям с текущим значением времени, в миллисекундах
my-agent.sources.avro-source.interceptors.ts.type = timestamp
my-agent.sources.avro-source.interceptors.ts.preserveExisting = true
# ------------------------------------------------------------------------------ #
# Третий перехватчик добавляет заголовок с хостом/IP текущей машины
my-agent.sources.avro-source.interceptors.host.type = host
my-agent.sources.avro-source.interceptors.host.useIP = true
# Наименование заголовка (аналог directory.key)
my-agent.sources.avro-source.interceptors.host.hostHeader = host
my-agent.sources.avro-source.interceptors.host.preserveExisting = true
# ------------------------------------------------------------------------------ #
# Этот перехватчик заменяет все символы табуляции на ; в теле события
my-agent.sources.avro-source.interceptors.replace.type = search_replace
my-agent.sources.avro-source.interceptors.replace.searchPattern = t
my-agent.sources.avro-source.interceptors.replace.replaceString = ;
# Тело передается как byte[], поэтому необходимо указать кодировку (по умолчанию — UTF-8)
my-agent.sources.avro-source.interceptors.replace.charset = UTF-8
# ------------------------------------------------------------------------------ #
# Более "умный" вариант замены
my-agent.sources.avro-source.interceptors.group-replace.type = search_replace
# Предположим, наша строка начинается с даты 2014-01-20 и нам нужно поменять ее формат на 20/01/2014
# при этом сохранив всё остальное. Мы "разбиваем" строку на 4 блока () и затем выполняем подстановку,
# используя индексы этих блоков в результирующей строке
my-agent.sources.avro-source.interceptors.group-replace.searchPattern = (\d{4})-(\d{2})-(\d{2})(.*)
my-agent.sources.avro-source.interceptors.group-replace.replaceString = $3/$2/$1$4
# ------------------------------------------------------------------------------ #
# Перехватчик-фильтр, исключает события по регулярному выражению
my-agent.sources.avro-source.interceptors.filter.type = regex_filter
my-agent.sources.avro-source.interceptors.filter.regex = error$
# Если true — то фильтровать события, тело которых подходит под регулярное выражение,
# в противном случае — фильтровать то, что не подходит под регулярку
my-agent.sources.avro-source.interceptors.filter.excludeEvents = true
# ------------------------------------------------------------------------------ #
# Перехватчик, извлекающий данные из события и добавляющий их в заголовки
my-agent.sources.avro-source.interceptors.extractor.type = regex_extractor
# Например, мы передаем события вида: "2016-04-15;WARINING;КАКАЯ-ТО ИНФОРМАЦИЯ"
my-agent.sources.avro-source.interceptors.extractor.regex = (\d{4}-\d{2}-\d{2});(.*);
# здесь важно — сериализаторы должны быть перечислены в том же порядке,
# что и соотв. группы в регулярном выражении
# (\d{4}-\d{2}-\d{2}) -> $1 -> ts
# (.*) -> $2 -> loglevel
my-agent.sources.avro-source.interceptors.extractor.serializers = ts loglevel
# Первую группу будем сериализовать специальным классом, который извлекая из даты TS
my-agent.sources.avro-source.interceptors.extractor.serializers.ts.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
my-agent.sources.avro-source.interceptors.extractor.serializers.ts.name = timestamp
my-agent.sources.avro-source.interceptors.extractor.serializers.ts.pattern = yyyy-MM-dd
# Вторую группу будем сериализовать as is
my-agent.sources.avro-source.interceptors.extractor.serializers.loglevel.name = level
Среди стандартных перехватчиков, к несчастью, не обнаружилось фильтра по заголовкам. Впрочем, при желании такой перехватчик можно написать самому. Теперь, чтобы полноценно сконфигурировать транспорт Flume, нам необходимо рассмотреть еще один тип компонентов Flume — селекторы.
Селектор необходим каналу для того, чтобы понимать, в какой канал какие события отправлять [12]. Всего существует 2 типа селекторов:
# ============================ Avro-источник с селектором ============================ #
my-source.sources.avro-source.type = avro
my-source.sources.avro-source.port = 50002
my-source.sources.avro-source.bind = 127.0.0.1
my-source.sources.avro-source.channels = hdfs-channel file-roll-channel null-channel
# Объявляем селектор — multiplexing, будем сортировать события
# Предположим, что мы ранее помечали события как "важные" и "обычные" и хотим,
# чтобы важные события записывались в файловую систему и HDFS, а обычные — только в файлы
my-source.sources.avro-source.selector.type = multiplexing
# указываем название заголовка, по которому будем делить события
my-source.sources.avro-source.selector.header = type
# если type = important, то отправляем события и в HDFS, и в файловый сток
my-source.sources.avro-source.selector.mapping.important = hdfs-channel file-roll-channel
# если type = common, то только в файловый сток
my-source.sources.avro-source.selector.mapping.common = file-roll-channel
# если заголовок type не найден или значение какое-то другое, отправляем событие на фильтрацию
# (как правило, для фильтрации используем небольшой memchannel и null-sink)
my-source.sources.avro-source.selector.mapping.default = hdfs-null-channel
Селекторы обрабатывают события после перехватчиков. Это значит, что вы можете выполнить над событиями некоторые манипуляции перехватчиками (например, понаизвлекать различные заголовки) и использовать результаты этих манипуляций уже в селекторе.
Статья неожиданно получилась большой, поэтому обещанный мониторинг узла я решил рассмотреть в следующей части этого цикла статей. В заключение хочу продемонстрировать одну из рабочих конфигураций Flume для HDFS. Она неплохо подходит для доставки и организации небольших объемов данных — примерно до 2000 событий в секунду на одну ноду. Этот узел требует наличия в событиях заголовков roll («15m» или «60m»), dir и srс — с помощью них получается двухуровневая иерархия папок.
flume-hdfs.sources = hdfs-source
flume-hdfs.channels = hdfs-15m-channel hdfs-60m-channel hdfs-null-channel
flume-hdfs.sinks = hdfs-15m-sink hdfs-60m-sink
# =========== Avro-источник, с селектором и добавлением заголовка host ============ #
flume-hdfs.sources.hdfs-source.type = avro
flume-hdfs.sources.hdfs-source.port = 50002
flume-hdfs.sources.hdfs-source.bind = 0.0.0.0
flume-hdfs.sources.hdfs-source.interceptors = hostname
flume-hdfs.sources.hdfs-source.interceptors.hostname.type = host
flume-hdfs.sources.hdfs-source.interceptors.hostname.hostHeader = host
flume-hdfs.sources.hdfs-source.channels = hdfs-null-channel hdfs-15m-channel
flume-hdfs.sources.hdfs-source.selector.type = multiplexing
flume-hdfs.sources.hdfs-source.selector.header = roll
flume-hdfs.sources.hdfs-source.selector.mapping.15m = hdfs-15m-channel
flume-hdfs.sources.hdfs-source.selector.mapping.60m = hdfs-60m-channel
flume-hdfs.sources.hdfs-source.selector.mapping.default = hdfs-null-channel
# ============================ Файловый канал, 15 минут ============================ #
flume-hdfs.channels.hdfs-15m-channel.type = file
flume-hdfs.channels.hdfs-15m-channel.maxFileSize = 1073741824
flume-hdfs.channels.hdfs-15m-channel.capacity = 10000000
flume-hdfs.channels.hdfs-15m-channel.transactionCapacity = 10000
flume-hdfs.channels.hdfs-15m-channel.dataDirs = /flume/flume-hdfs/hdfs-60m-channel/data1,/flume/flume-hdfs/hdfs-60m-channel/data2
flume-hdfs.channels.hdfs-15m-channel.checkpointDir = /flume/flume-hdfs/hdfs-15m-channel/checkpoint
# ============================ Файловый канал, 60 минут ============================ #
flume-hdfs.channels.hdfs-60m-channel.type = file
flume-hdfs.channels.hdfs-60m-channel.maxFileSize = 1073741824
flume-hdfs.channels.hdfs-60m-channel.capacity = 10000000
flume-hdfs.channels.hdfs-60m-channel.transactionCapacity = 10000
flume-hdfs.channels.hdfs-60m-channel.dataDirs =/flume/flume-hdfs/hdfs-60m-channel/data1,/flume/flume-hdfs/hdfs-60m-channel/data2
flume-hdfs.channels.hdfs-60m-channel.checkpointDir = /flume/flume-hdfs/hdfs-60m-channel/checkpoint
# =========== Сток для файлов, заворачиваемых каждые 15 минут (5 мин. неактивности) =========== #
flume-hdfs.sinks.hdfs-15m-sink.type = hdfs
flume-hdfs.sinks.hdfs-15m-sink.channel = hdfs-15m-channel
flume-hdfs.sinks.hdfs-15m-sink.hdfs.filePrefix = %{src}/%Y-%m-%d/%Y-%m-%d-%H-%M-%S.%{src}.%{host}.log
flume-hdfs.sinks.hdfs-15m-sink.hdfs.path = /logs/%{dir}
flume-hdfs.sinks.hdfs-15m-sink.hdfs.fileSuffix = .gz
flume-hdfs.sinks.hdfs-15m-sink.hdfs.writeFormat = Text
flume-hdfs.sinks.hdfs-15m-sink.hdfs.codeC = gzip
flume-hdfs.sinks.hdfs-15m-sink.hdfs.fileType = CompressedStream
flume-hdfs.sinks.hdfs-15m-sink.hdfs.minBlockReplicas = 1
flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollInterval = 0
flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollSize = 0
flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollCount = 0
flume-hdfs.sinks.hdfs-15m-sink.hdfs.idleTimeout = 300
flume-hdfs.sinks.hdfs-15m-sink.hdfs.round = true
flume-hdfs.sinks.hdfs-15m-sink.hdfs.roundValue = 15
flume-hdfs.sinks.hdfs-15m-sink.hdfs.roundUnit = minute
flume-hdfs.sinks.hdfs-15m-sink.hdfs.threadsPoolSize = 8
flume-hdfs.sinks.hdfs-15m-sink.hdfs.batchSize = 10000
# =========== Сток для файлов, заворачиваемых каждые 60 минут (20 мин. неактивности) =========== #
flume-hdfs.sinks.hdfs-60m-sink.type = hdfs
flume-hdfs.sinks.hdfs-60m-sink.channel = hdfs-60m-channel
flume-hdfs.sinks.hdfs-60m-sink.hdfs.filePrefix = %{src}/%Y-%m-%d/%Y-%m-%d-%H-%M-%S.%{src}.%{host}.log
flume-hdfs.sinks.hdfs-60m-sink.hdfs.path = /logs/%{dir}
flume-hdfs.sinks.hdfs-60m-sink.hdfs.fileSuffix = .gz
flume-hdfs.sinks.hdfs-60m-sink.hdfs.writeFormat = Text
flume-hdfs.sinks.hdfs-60m-sink.hdfs.codeC = gzip
flume-hdfs.sinks.hdfs-60m-sink.hdfs.fileType = CompressedStream
flume-hdfs.sinks.hdfs-60m-sink.hdfs.minBlockReplicas = 1
flume-hdfs.sinks.hdfs-60m-sink.hdfs.rollInterval = 0
flume-hdfs.sinks.hdfs-60m-sink.hdfs.rollSize = 0
flume-hdfs.sinks.hdfs-60m-sink.hdfs.rollCount = 0
flume-hdfs.sinks.hdfs-60m-sink.hdfs.idleTimeout = 1200
flume-hdfs.sinks.hdfs-60m-sink.hdfs.round = true
flume-hdfs.sinks.hdfs-60m-sink.hdfs.roundValue = 60
flume-hdfs.sinks.hdfs-60m-sink.hdfs.roundUnit = minute
flume-hdfs.sinks.hdfs-60m-sink.hdfs.threadsPoolSize = 8
flume-hdfs.sinks.hdfs-60m-sink.hdfs.batchSize = 10000
# ================ NULL-сток + небольшой канал для него =============== #
flume-hdfs.channels.hdfs-null-channel.type = memory
flume-hdfs.channels.hdfs-null-channel.capacity = 30000
flume-hdfs.channels.hdfs-null-channel.transactionCapacity = 10000
flume-hdfs.channels.hdfs-null-channel.byteCapacityBufferPercentage = 20
flume-hdfs.sinks.hdfs-null-sink.channel = hdfs-null-channel
flume-hdfs.sinks.hdfs-null-sink.type = null
В следующей, заключительной статье цикла, мы рассмотрим:
Автор: DCA (Data-Centric Alliance)
Источник [13]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/razrabotka/118718
Ссылки в тексте:
[1] Apache Flume: https://flume.apache.org/
[2] предыдущей части: https://habrahabr.ru/company/dca/blog/280386/
[3] Остальные настройки: https://flume.apache.org/FlumeUserGuide.html#file-channel
[4] JDBC-channel: https://flume.apache.org/FlumeUserGuide.html#jdbc-channel
[5] Kafka-channel: https://flume.apache.org/FlumeUserGuide.html#kafka-channel
[6] сериализации данных: https://avro.apache.org/docs/current/
[7] параметры шифрования данных: https://flume.apache.org/FlumeUserGuide.html#avro-source
[8] виде сервиса: https://dpaynedudhe.wordpress.com/2015/06/16/installing-flume-on-ubuntu/
[9] находится на всё той же странице: https://flume.apache.org/FlumeUserGuide.html#hdfs-sink
[10] консолидация данных: https://flume.apache.org/FlumeUserGuide.html#consolidation
[11] предоставляет по умолчанию множество перехватчиков: https://flume.apache.org/FlumeUserGuide.html#flume-interceptors
[12] в какой канал какие события отправлять: https://flume.apache.org/FlumeUserGuide.html#flume-channel-selectors
[13] Источник: https://habrahabr.ru/post/281933/
Нажмите здесь для печати.