- PVSM.RU - https://www.pvsm.ru -

Строим надёжный процессинг данных — лямбда архитектура внутри Google BigQuery

В этой статье хочу поделиться способом, который позволил нам прекратить хаос с процессингом данных. Раньше я считал этот хаос и последующий ре-процессинг неизбежным, а теперь мы забыли что это такое. Привожу пример реализации на BiqQuery, но трюк довольно универсальный.
Строим надёжный процессинг данных — лямбда архитектура внутри Google BigQuery - 1
У нас вполне стандартный процесс работы с данными. Исходные данные в максимально сыром виде регулярно подгружаются в единое хранилище, в нашем случае в BigQuery. Из одних источников (наш собственный продакшн) данные приходят каждый час, из других (обычно сторонние источники) данные идут ежедневно.

В последствии данные обрабатываются до состояния пригодного к употреблению разнообразными пользователями. Это могут быть внутренние дашборды; отчёты партнёрам; результаты, которые идут в продакшн и влияют на поведение продукта. Эти операции могут быть довольно сложными и включать несколько источников данных. Но по большей части мы с этим справляется внутри BigQuery с помощью SQL+UDF. Результаты сохраняются в отдельные таблицы там же.

Очевидным способом организации этого процессинга является создание расписания операций. Если данные подгружаются ежедневно в час ночи, то мы настроим процессинг на 01:05. Если этот источник данных подгружается в районе 5й минуты каждого часа, то настроим процессинг на 10ю минуту каждого часа. Промежутки в 5 минут для пользователей не критичны и предполагается, что всё должно работать.
Но мир жесток! Данные не всегда приходят вовремя. Или вообще не приходят, если не починить. Если твоя часовая загрузка закончилась на 11й минуте, а трансформация запускалась на 10й – то пожалуйста, жди ещё час чтобы увидеть эти данные в дэшборде. А если операция использует несколько источников, то ситуация будет ещё веселее.

Более того, подгружаемые сырые данные не всегда верны (данные вообще всегда не верны!). Периодически данные приходится чистить или перезагружать. И тогда нужно перезапустить все операции и с корректными параметрами, чтобы всё починилось.

Это всё, конечно, проблемы с сырыми данными и нужно именно их и решать. Но это та война, в которой нельзя окончательно победить. Что-то всё равно будет поломано. Если источник данных внутренний – то ваши разработчики будут заняты новыми крутыми фичами, а не надёжностью трекинга. Если это сторонние данные, тогда вообще труба. Хотелось бы, чтобы по крайней мере процессинг не мешался по дороге и как только сырые данные починены — все клиенты сразу видели корректные результаты.

Это реально большая проблема. И как же ещё решить?

Решение №1 – убрать проблемные детали

Если процессинг приводит к проблемам, то не надо его делать! Не надо делать вообще никакой процессинг и хранить промежуточные результаты. Как только пользователю нужны результаты, всё должно вычисляться на лету из сырых данных. Учитывая скорость BigQuery это вполне реалистично. Особенно если все что вы делаете с данными это GROUP BY date и count(1), и нужны только данные за последние 14 дней.

Большинство аналитики работает именно с такими запросами. Поэтому мы данное решение активно используем. Но этот подход не работает со сложными трансформациями.

Одна проблема – это сложность кода. Если сложить все операции в один SQL запрос, то его будет не прочитать. К счастью это решается за счёт таблиц типа view [1] (представления). Это логические таблицы в BigQuery, данные в них не хранятся, а генерируются из SQL-запроса на лету. Это сильно упрощает код.

Но другая проблема – это производительность. Здесь всё плохо. Не важно какие быстрые и дешёвые современные базы данных. Если запустить сложную трансформацию на одном годе исторических данных, это займёт время и будет стоить денег. Других вариантов нет. Эта проблема делает данную стратегию неприменимой в довольно большом проценте случаев.

Решение № 2 – построить сложную систему

Если нет возможности обойтись без системы управления процессингом, то нужно построить эту систему хорошо. Не просто расписание выполнения скриптов в cron, а система мониторинга загрузки данных, которая определяет когда и какие трансформации запускать. Наверное паттерн pub/sub [2] тут очень подходит.

Но есть проблема. Если построить сложную систему более менее просто, то вот поддерживать её и ловить баги – это очень сложно. Чем больше кода, тем больше проблем.

По счастью есть и третье решение.

Решение № 3 – лямбда архитектура! …ну, типа того

Лямбда архитектура [3] – это знаменитый подход к процессингу данных, который использует преимущества обработки данных по расписанию и в реальном времени:

Строим надёжный процессинг данных — лямбда архитектура внутри Google BigQuery - 2
*Как нормально перевести на русский не знаю, batch job – это что пакетное задание? Кто знает, подскажите!

Обычно это все строится с использованием нескольких решений. Но мы используем по сути тот же трюк просто внутри BigQuery.

И вот как это работает:

Процессинг по расписанию (Batch layer). Мы ежедневно выполняем SQL-запросы, которые трансформируют данные имеющиеся на текущий момент, и сохраняем результаты в таблицы. У всех запросов следующая структура:

Строим надёжный процессинг данных — лямбда архитектура внутри Google BigQuery - 3

Результаты этого запроса будут сохранены в table_static (перезапишут её). Да, BigQuery позволяет сохранять результаты запроса в таблице, которая использовалась в этом запросе. В итоге мы берём старые, уже посчитанные данные (чтобы их не пересчитывать) и соединяем с новыми данными. X дней – это выбранный период, за который мы хотим пересчитать данные, чтобы учесть все возможные корректировки сырых данных. Предполагается что за X дней (сколько – это индивидуально для источника) все корректировки уже будут внесены, всё что сломалось починится и данные уже больше не будут меняться.

Доступ в реальном времени (Speed layer + Serving layer). Эти обе задачи объединены в один SQL-запрос:

Строим надёжный процессинг данных — лямбда архитектура внутри Google BigQuery - 4

Да, это тот же самый запрос! Его мы сохраняем как представление (view) с именем table_live и все пользователи (дэшборды, другие запросы, и т.п.) тянут результаты из этого представления. Так как представления в BigQuery хранятся на логическом уровне (только запрос, не данные), каждый раз при обращении он будет пересчитывать последние X дней на лету и все изменения в изначальных данных будут отражены в результатах.

Так как запрос в обоих случаях одинаковый, то в реальности, чтобы избежать дупликации кода, ежедневный запрос (из batch layer) выглядит так:

SELECT * FROM table_live

(и сохраняем результаты в table_static)

Этот подход имеет ряд важных преимуществ:

  • Каждый пользователь получает актуальные результаты, нет никакой ресинхронизации между сырыми данными и агрегированными (это при условии, что все косяки с сырыми данными разрешаются за X дней)
  • Пользователи получают результаты быстро. Никто не пересчитывает 2 года данных каждый раз, когда к ним обращаются. Пересчёт последних X дней в BigQuery происходит приемлемо быстро. X так и выбирается, чтобы не создавать проблем с производительностью. Можно и на часы вместо дней перейти конечно, но не приходилось этого делать с нашими данными.
  • Не нужно вообще думать о расписании ежедневных операций. Просто нужно сделать это раз в день, но от времени загрузки сырых данных это не зависит. Кроме того, проблем не возникает, если в один день трансформация упадёт или если её запустить дважды. Чтобы пользователь заметил проблему, нужно чтобы процессинг «лежал» больше X дней.
  • Для построения такой системы нужно минимальное количество кода (читай — проблем). Сам SQL-запрос на 10 строчек длиннее оригинального и теперь хранится внутри BigQuery как представление (view). Плюс нужно запускать ежедневный процессинг, но это элементарная задача.
  • Издержки данной системы могут быть даже меньше, чем при создании сложной системы расписаний. Кажется, что должно быть дороже, так как мы постоянно пересчитываем X дней. Но наш опыт показывает, что наоборот. Если вы подгружаете данные каждый час, то вам придётся делать пересчёт каждый час, включая выходные. И ещё придётся пересчитывать, на X дней назад, чтобы отразить корректировки. Итого за неделю наберётся 24*7=168 раз. Но в реальности пользователь может открывать этот дэшборд только три раза в неделю. При нашем подходе придётся пересчитать 7 раз по ежедневному расписанию и плюс 3 раза на лету. Существенно меньше.

PS Если любите использовать таблицы разбитые по датам в BigQuery (мы очень любим), то есть решение и для этого. Но это тема для другого поста. Подсказка – функции [4] для работы с этими таблицами не ругаются, если часть таблиц – это только представления.

PPS Если бы представления в BigQuery поддерживали кешинг (как это работает с обычными запросами), это было бы реально круто. Это по сути сделало бы их материализованными (materialized views). И эффективность нашего подхода стала бы ещё выше. Если вы согласны – здесь [5] можно поставить звёздочку, чтобы эту фичу быстрее реализовали.

Автор: NNikolay

Источник [6]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/data-mining/115324

Ссылки в тексте:

[1] view: https://cloud.google.com/bigquery/querying-data#views

[2] паттерн pub/sub: https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern

[3] Лямбда архитектура: https://en.wikipedia.org/wiki/Lambda_architecture

[4] функции: https://cloud.google.com/bigquery/query-reference#tablewildcardfunctions

[5] здесь: https://code.google.com/p/google-bigquery/issues/detail?id=184

[6] Источник: https://habrahabr.ru/post/279491/