На текущем проекте у нас начинает активно использоваться Apache NiFi в качестве основного ETL/ELT-инструмента. NiFi используется для получения данных из различных источников (Kafka, REST, HDFS) и подготовки данных для их последующей загрузки в основное хранилище на базе Greenplum. Загрузка подготовленных данных в Greenplum реализована средствами последнего (PXF), поэтому NiFi только подготавливает данные в формате Avro и записывает их в HDFS.
Немного о задаче. Пусть мы имеем информацию о подписках пользователя на уведомления для различных разделов/сервисов портала. Для каждого раздела пользователи могут указать от нуля до нескольких видов транспорта, которым эти уведомления будут доставляться, например, PUSH, EMAIL, SMS. Требуется обеспечить загрузку этих данных в наше аналитическое хранилище.
Исходные данные
Данные приходят в Apache Kafka, формат данных примерно такой:
{
"uID": 1000358546,
"events": [{
"eventTypeCode": "FEEDBACK",
"transports": ["PUSH", "SMS"]
}, {
"eventTypeCode": "MARKETING",
"transports": ["PUSH", "EMAIL"]
}, {
"eventTypeCode": "ORDER_STATUS",
"transports": ["SOC_VK"]
}
]
}
Вроде всё просто, но у PXF есть некоторые сложности с загрузкой иерархических структур (массив простых значений оно загрузит, но массив объектов - нет), поэтому нам нужно сделать наши данные максимально плоскими. Да, и цель статьи - показать, что можно сделать с JSON используя встроенные процессоры.
Для начинающих
Для экспериментов в NiFi можно создать процессорную группу, выбрав в качестве первого процессора GenerateFlowFile поместив в параметр Custom Text текст нашего Json.
FlattenJson
Одним из способов сделать из произвольного json плоский является процессор FlattenJson. Процессор предоставляет пользователю возможность взять вложенный документ JSON и представить его в простой документ содержащий пары ключ/значение. Ключи объединяются на каждом уровне с помощью определяемого пользователем разделителя, который по умолчанию имеет значение ".". Процессор поддерживает три режима преобразования (Flatten Mode): normal, keep-arrays и dot notation (применяется для запросов в MongoDB). Режим преобразования по умолчанию - "keep-arrays". В режиме keep-arrays мы получим, для нашего примера, практически исходный Json. Поэтому этот режим нам не подходит, и мы переключим процесоор на режим normal. В результате мы получим такой json:
{
"uID" : 1000358546,
"events[0].eventTypeCode" : "FEEDBACK",
"events[0].transports[0]" : "PUSH",
"events[0].transports[1]" : "SMS",
"events[1].eventTypeCode" : "MARKETING",
"events[1].transports[0]" : "PUSH",
"events[1].transports[1]" : "EMAIL",
"events[2].eventTypeCode" : "ORDER_STATUS",
"events[2].transports[0]" : "SOC_VK"
}
Что-ж, оно действительно плоское, но... Если такое попытаться завернуть в avro, то оно сломается. Если не на этапе конвертации, то на этапе загрузки в Greenplum точно. Всё по причине того, что тут буквально для каждого пользователя будет свой набор полей и схема каждого avro-файла будет различаться.
Этот процессор, сам по себе весьма полезный в иных случаях, в чистом виде нам не подходит. Смотрим, что же мы можем использовать ещё.
JoltTransformJSON
JoltTransformJSON, пожалуй, самый мощный в арсенале NiFi, процессор для трансформации Json. Он позволяет применять список Jolt-спецификаций к нашему Json. На хабре уже была статья, посвященная этому процессору. Но, позвольте мне рассказать об этом процессоре применительно к нашей задаче.
Вариантов решения нашей задачи как минимум два - это либо обработать Json, полученный с помощью FlattenJson-процессора с помощью Jolt, либо попробовать от FlattenJson избавиться и решить всё с помощью JoltTransformJson.
Но, для начала опишем, какие же возможности предоставляет нам этот процессор. Самое главное - это возможность работать с Jolt-спецификацией в расширенном редакторе, где можно не только писать спецификацию, проверить её корректность, но и выполнить трансформацию произвольного json не покидая окна редактора.
Это всё круто и очень помогает в работе, но, если честно, я предпочитаю использовать Jolt Transform Demo (jolt-demo.appspot.com) . Субъективно он более удобен и там есть примеры с комментариями для начала работы с Jolt.
Итак, как вы видите, на картинке выше, я начал с простой спецификации:
[{
"operation": "shift",
"spec": {
"*": "&"
}
}]
Эта спецификация, по сути ничего не трансформирует, поскольку говорит "возьми любое поле и выведи его как оно есть". Будем исправлять. Для начала определить нашу цель. А уже потом напишем для неё Jolt-спецификацию.
Итак, у нас на входе:
{
"uID": 1000358546,
"events": [{
"eventTypeCode": "FEEDBACK",
"transports": ["PUSH", "SMS"]
}, {
"eventTypeCode": "MARKETING",
"transports": ["PUSH", "EMAIL"]
}, {
"eventTypeCode": "ORDER_STATUS",
"transports": ["SOC_VK"]
}
]
}
А хотим мы получить такой json на выходе:
{
"uID": 1000358546,
"FEEDBACK": ["PUSH", "SMS"],
"MARKETING": ["PUSH", "EMAIL"],
"ORDER_STATUS": ["SOC_VK"]
}
Давайте напишем для него спецификацию. Нам нужно для каждого значения в events->transports взять ключ из events->eventTypeCode и в результате записать с этим ключем массив значений. Поле uID оставляем без изменений.
[{
"operation": "shift",
"spec": {
"events": {
"*": {
"transports": {
"*": {
"@": "@(3,eventTypeCode)[]"
}
}
}
},
"*": "&"
}
}]
Пояснение спецификации
"events":{"*":{"transports":{"*":{
,думаю, не вызывает особых сложностей. Здесь мы для каждого events берём каждый элемент массива (это первая "*") для которого из transports берём каждый элемент массива (вторая "*").
"@": "@(3,eventTypeCode)[]"
вот тут самое интересное. Левая, от двоеточия, часть ("@") говорит о том, что мы берём текущее значение элемента массива, а это у нас PUSH для самого первого совпадения. А вот правая часть говорит о том, с каким ключём мы запишем это значение в результирующий json. И запись @(3,eventTypeCode) означает, что для того, чтобы получить имя ключа, нам нужно поднятся на 3 уровня выше (на уровень первой "*") и взять там значение поля eventTypeCode. Если, опять же, рассматривать самое первое совпадение, то это значение будет FEEDBACK - это и будет ключём, в который будет записано значение PUSH.
Думаю, вы заметили квадратные скобки на конце правой части выражения? Они говорят о том, что ключ, который мы определяем, должен быть массивом. Если их не поставить, то тогда, например для ORDER_STATUS, мы получим не массив, а просто строку. Для других типов событий у нас определено несколько значений транспорта, поэтому они будут объединены в массив автоматически. Т.е. результат мог бы выглядеть так:
{
"uID": 1000358546,
"FEEDBACK": ["PUSH", "SMS"],
"MARKETING": ["PUSH", "EMAIL"],
"ORDER_STATUS": "SOC_VK"
}
И так, мы добились желаемого результата. Но, давайте подумаем, какие сложности с данным вариантом. А мы опять имеем проблемы с потенциально различными схемами для разных пользователей. Я бы даже сказал с гарантированными. Это можно обойти, если при конвертации в avro будем использовать определенную схему, а не выводить её из данных json. Но, это значит, что мы должны в этой схеме заранее прописать все типы событий всех наших сервисов, и менять схему при каждом изменении их состава. Было бы легче, если бы мы взяли в качестве ключа транспорт, а в качестве значения массив типов событий, для которых уведомления используют данный транспорт. Я не буду приводить Jolt-спецификацию для данной трансформации, оставлю это в качестве задания читателю. Да, видов транспорта сильно меньше, чем событий, но это не гарантирует нам постоянство их списка.
И так, подумаем, как мы можем обеспечить постоянство схемы выходных данных, если у нас на входе могут меняться как типы событий так и виды транспортов. Вариантов не так много:
-
сделать два массива, один для событий, другой для транспортов, а соответствующие значения записывать с одним и тем-же индексом
-
сделать один массив, в котором пары событие/транспорт будут строками с разделителем
Первый вариант был отклонён из-за более сложной реализации разбора на стороне Greenplum. Второй вариант выглядит, для нашего примера, так:
{
"uID" : 1000358546,
"events" : [
"FEEDBACK|PUSH", "FEEDBACK|SMS",
"MARKETING|PUSH", "MARKETING|EMAIL",
"ORDER_STATUS|SOC_VK"
]
}
Чтобы выполнить такую Jolt-трансформацию понадобится цепочка преобразований. Для начала, нужно развернуть массив транспортов и желательно вывести тип событий на один уровень с транспортом. Затем склеить строки и убрать лишние поля.
Первый этап
[{
"operation": "shift",
"spec": {
"nET": {
"*": {
"transports": {
"*": {
"*": {
"@1": "outer[&4].inner[&2].t",
"@(3,eventTypeCode)": "outer[&4].inner[&2].etc"
}
}
}
}
},
"*": "&"
}
}]
Здесь мы для каждого транспорта создаём объект, который вложен в два массива - внешний и внутренний (относительно "transports"). Значение транспорта записываем в поле t этого объекта, потом добавляем в этот объект поле etc, которое будет иметь значение из eventTypeCode.
Результат выполнения:
{
"uID" : 1000358546,
"outer" : [ {
"inner" : [ {
"t" : "PUSH",
"etc" : "FEEDBACK"
}, {
"t" : "SMS",
"etc" : "FEEDBACK"
} ]
}, {
"inner" : [ {
"t" : "PUSH",
"etc" : "MARKETING"
}, {
"t" : "EMAIL",
"etc" : "MARKETING"
} ]
}, {
"inner" : [ {
"t" : "SOC_VK",
"etc" : "ORDER_STATUS"
} ]
} ]
}
Второй этап
Итак, у нас есть пары, но, прежде чем склеивать, давайте упростим массив:
{
"operation": "shift",
"spec": {
"outer": {
"*": {
"inner": {
"*": "events[]"
}
}
},
"*": "&"
}
}
Добавив эту спецификацию в список Jolt-спецификаций получим результат:
{
"uID" : 1000358546,
"events" : [ {
"t" : "PUSH",
"etc" : "FEEDBACK"
}, {
"t" : "SMS",
"etc" : "FEEDBACK"
}, {
"t" : "PUSH",
"etc" : "MARKETING"
}, {
"t" : "EMAIL",
"etc" : "MARKETING"
}, {
"t" : "SOC_VK",
"etc" : "ORDER_STATUS"
} ]
}
Второй этап
Теперь наш массив не выглядит так страшно, как после первого этапа. Теперь мы можем легко склеить пары значений в одну строку.
{
"operation": "modify-default-beta",
"spec": {
"events": {
"*": {
"transport": "=concat(@(1,etc),'|',@(1,t))"
}
}
}
}
Добавив этот этап к нашему списку спецификаций получим:
{
"uID" : 1000358546,
"events" : [ {
"t" : "PUSH",
"etc" : "FEEDBACK",
"transport" : "FEEDBACK|PUSH"
}, {
"t" : "SMS",
"etc" : "FEEDBACK",
"transport" : "FEEDBACK|SMS"
}, {
"t" : "PUSH",
"etc" : "MARKETING",
"transport" : "MARKETING|PUSH"
}, {
"t" : "EMAIL",
"etc" : "MARKETING",
"transport" : "MARKETING|EMAIL"
}, {
"t" : "SOC_VK",
"etc" : "ORDER_STATUS",
"transport" : "ORDER_STATUS|SOC_VK"
} ]
}
Последний этап
Теперь нам остаётся только преобразовать массив объектов в массив строк, взяв только значения поля transport.
{
"operation": "shift",
"spec": {
"events": {
"*": {
"@transport": "events[]"
}
},
"*": "&"
}
}
Добавление этой операции приведёт нас к желаемому результату:
{
"uID" : 1000358546,
"events" : [
"FEEDBACK|PUSH", "FEEDBACK|SMS",
"MARKETING|PUSH", "MARKETING|EMAIL",
"ORDER_STATUS|SOC_VK"
]
}
Итак, цепочка спецификаций выглядит так:
[
{
"operation": "shift",
"spec": {
"events": {
"*": {
"transports": {
"*": {
"*": {
"@1": "outer[&4].inner[&2].t",
"@(3,eventTypeCode)": "outer[&4].inner[&2].etc"
}
}
}
}
},
"*": "&"
}
}, {
"operation": "shift",
"spec": {
"outer": {
"*": {
"inner": {
"*": "events[]"
}
}
},
"*": "&"
}
},
{
"operation": "modify-default-beta",
"spec": {
"events": {
"*": {
"transport": "=concat(@(1,etc),'|',@(1,t))"
}
}
}
},
{
"operation": "shift",
"spec": {
"events": {
"*": {
"@transport": "events[]"
}
},
"*": "&"
}
}
]
Результат наших преобразований будет иметь простую схему при конвертации в avro и не будет изменяться от пользователя к пользователю. Строка легко разбивается в запросе к внешней pxf-таблице в Greenplum.
Коллеги, вопросы, предложения, комментарии...
Автор: Валерий Шинкевич