Производительность Apache Parquet

в 23:47, , рубрики: apache hadoop, Apache Spark, avro, big data, csv, data mining, file format, parquet, performance tests, scala, кто читает теги?, сжатие данных, хранение данных

Плохой пример хорошего теста

Примечание переводчика:
Изначально статья задумывалась как вольный перевод текста Дона Дрейка (@dondrake) для Cloudera Engineering Blog об опыте сравнения Apache Avro и Apache Parquet при использовании Apache Spark. Однако в процессе перевода я углубился в детали и нашел в тестах массу спорных моментов. Я добавил к статье подзаголовок, а текст снабдил комментариями со злорадным указанием неточностей.

В последнее время в курилках часто возникали дискуссии на тему сравнения производительности различных форматов хранения данных в Apache Hadoop — включая CSV, JSON, Apache Avro и Apache Parquet. Большинство участников сразу отметают текстовые форматы как очевидных аутсайдеров, оставляя главную интригу состязанию между Avro и Parquet.

Господствующие мнения представляли собой неподтвержденные слухи о том, что один формат выглядит "лучше" при работе со всем датасетом, а второй "лучше" справляется с запросами к подмножеству столбцов.

Как любой уважающий себя инженер, я подумал, что было бы неплохо провести полноценные performance-тесты, чтобы наконец проверить, на чьей стороне правда. Результат сравнения — под катом.

Apache Parquet Logo

Тестовый датасет

Я подумал, что для тестов будет правильным использовать реальные данные и настоящие запросы. В этом случае можно ожидать, что производительность в продакшен окружении будет вести себя аналогично тестовой. Другими словами, для теста не обойтись подсчетом строк на суррогатных данных.

Выбор "реальных данных" и "реальных запросов" для теста представляется крайне спорной идеей, т.к. у всех разные реальные данные и запросы. Для решения этой проблемы синтезируют типовые тесты производительности хранилищ, например TPC Benchmarks.

Я покопался в датасетах, с которыми недавно работал, и нашел там два отлично подходящих для теста. Первый из них, назовем его "узкий", состоит из всего трех колонок и содержит 82,3 млн строк, что в CSV занимает 3,9 ГБ.

Как будет видно ниже, из этого получится 750-1000 МБ сериализованных данных, а обрабатываться это будет в 50 воркеров. Каждому воркеру достанется 15-20 МБ данных. Скорее всего, инициализация воркера займет больше времени, чем чтение и обработка данных.

Второй, назовем его "широкий", содержит 103 колонки и 694 млн строк, что дает CSV файл размером 194 ГБ. Я думаю, такой подход позволит оценить, какой формат работает лучше с файлами большого и маленького размера.

"Широкий" датасет не только в 30 раз шире, но и в 8 раз длиннее. И в 49 раз больше по исходному размеру. Датасеты правильнее называть "маленьким" и "большим".
Кроме того, судя по отношению размеров, похоже, что в датасетах представлены колонки разных типов. В этой работе различия в типах данных вообще игнорируются. Меж тем, это ключевой аспект формата хранения.

Методология теста

Я выбрал Apache Spark 1.6 в качестве рабочей лошадки для тестов. Spark поддерживает Parquet из коробки, поддержка Avro и CSV подключается отдельно. Все операции проводились на кластере CDH 5.5.x из 100+ машин.

Мне было интересно замерить производительность форматов на различных видах процессинга — загрузки, простых запросов, нетривиальных запросов, обработки целого датасета, а также объем используемого дискового пространства.

Я запускал тесты через spark-shell с одной и той же конфигурацией для обоих датасетов (отличие было только в числе executor'ов). Режим шелла :paste спас мне жизнь, позволив копировать Scala-код прямо в REPL, не беспокоясь о многострочных командах, которые могут смутить интерпретатор.

#!/bin/bash -x

# Drake
export HADOOP_CONF_DIR=/etc/hive/conf
export SPARK_HOME=/home/drake/coolstuff/spark/spark-1.6.0-bin-hadoop2.6
export PATH=$SPARK_HOME/bin:$PATH

# use Java8
export JAVA_HOME=/usr/java/latest
export PATH=$JAVA_HOME/bin:$PATH

# NARROW
NUM_EXECUTORS=50
# WIDE
NUM_EXECUTORS=500

spark-shell —master yarn-client 
    —conf spark.eventLog.enabled=true 
    —conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory 
    —conf spark.yarn.historyServer.address=http://yarnhistserver.allstate.com:18088 
    —packages com.databricks:spark-csv_2.10:1.3.0,com.databricks:spark-avro_2.10:2.0.1 
    —driver-memory 4G 
    —executor-memory 2G 
    —num-executors $NUM_EXECUTORS 
    ...

Я брал время выполнения запроса с вкладки "Job" в Spark Web UI. Каждый тест я повторил трижды, а затем вычислил среднее время. Запросы к узкому датасету выполнялись на относительно нагруженном кластере, в то время, как запросы к широкому датасету выполнялись в моменты полного простоя кластера. Так получилось не специально, скорее, это совпадение.

Использование различных окружений между тестами (в т.ч. разное число воркеров и разную загруженность кластера) делает невозможным сравнение абсолютных значений.
Использование загруженного кластера само по себе негативно влияет на воспроизводимость результатов замеров при повторном запуске — им банально нельзя верить.
Трехкратное повторение эксперимента выглядит статистически несерьезным — доверительный интервал оценки будет очень большим. Впрочем, автор о доверительных интервалах даже не упоминает.

Препроцессинг данных

При чтении узкого датасета из CSV я не выводил схемы, но мне пришлось сконвертировать колонку типа String в Timestamp. Я не включал время на это преобразование в результат, т.к. оно не относится к форматам хранения. При работе с широким датасетом я использовал вывод схемы, однако время на это я так же не учитывал.

Под выводом схемы (в оригинале — infer schema) имеется ввиду неявное преобразование из RDD в DataFrame с помощью Reflection.

В процессе тестирования я был удивлен, узнав, что нельзя сохранить Avro файл с колонкой типа Timestamp. Фактически, Avro версий 1.7.x в принципе не поддерживает ни Date, ни Timestamp.

Avro 1.8 поддерживает логические типы Date, Timestamp и их производные. По сути, они являются лишь оберткой над int или long.

Тест "узкого" датасета

Для начала, я оценил время, за которое можно записать на диск узкий датасет в формате Avro или Parquet. Считал только эффективное время на запись, уже после того, как данные прочитаны в датафрейм. Получилась разница в пределах статистической погрешности. Таким образом, производительность записи узкого датасета для обоих форматов примерно одинакова.

Время сериализации получилось неправдоподобно большим, даже с учетом возможных накладных расходов на сеть и прочее — ведь на один воркер приходится менее 20 МБ выходных данных.
Выглядит так, как будто автор неправильно отделял время на чтение и процессинг и время на запись. В этом случае, вполне может получиться, что большая часть этого времени — это чтение 4х-гигабайтового CSV файла, возможно, даже в один поток. А на все остальное уходит 5-10 секунд.

Время записи на диск узкого датасета, в секундах (чем меньше, тем лучше):
narrow save as

После этого я посмотрел, сколько времени занимает простой подсчет числа строк в узком датасете. Avro и Parquet отработали одинаково быстро[^fast-row-count]. Для сравнения и чтобы запугать читателей, я посчитал также время подсчета несжатого CSV.

Файлы Parquet содержат в метаданных число объектов в блоке. При таком соотношении объема данных на воркер каждому достается не больше одного блока Parquet'а. Таким образом, для подсчета достаточно каждому прочесть по одной чиселке, а потом сделать общий reduce для получения итоговой суммы.
Для Avro задача значительно сложнее — блоки Avro также содержат число объектов в блоке, однако сами блоки значительно меньше (64 КБ по умолчанию), а файл содержит множество блоков. Теоретически, время подсчета всех объектов в avro-файле должно быть больше. На практике, для таких маленьких файлов разницу можно и не заметить.
Для подсчета числа строк в CSV-файле необходимо этот файл полностью прочитать, как и в случае с Avro. Если правильно шардировать 4 ГБ файл, на каждый воркер придется по 80 МБ данных, что можно прочесть за несколько секунд. Однако ж, процесс чтения у автора занимает 45 секунд, что свидетельствует в пользу того, что файл недостаточно распараллелен.

Время подсчета числа строк в узком датасете, в секундах (чем меньше, тем лучше):
narrow row count

После я попробовал более сложный запрос с группировкой GROUP BY. Одна из колонок в этом датасете — таймштамп, и я подсчитал сумму по другой колонке за каждый день. Т.к. Avro не поддерживает Date и Timestamp, пришлось подправить запрос, чтобы получить аналогичный результат.

Запрос для Parquet:

val sums = sqlContext.sql("""select to_date(precise_ts) as day, sum(replacement_cost) 
  from narrow_parq
  group by to_date(precise_ts)
  """)

Запрос для Avro query:

val a_sums = sqlContext.sql("""select to_date(from_unixtime(precise_ts/1000)) as day, sum(replacement_cost) 
  from narrow_avro
  group by to_date(from_unixtime(precise_ts/1000))
  """)

Для запроса с группировкой Parquet оказался в 2,6 раз быстрее Avro:
narrow group by

Далее, я решил выполнить преобразование .map() на DataFrameе, чтобы сымитировать процессинг всего датасета. Я выбрал преобразование, которое считает число столбцов в строке и собирает все их уникальные значения.

def numCols(x: Row): Int = {
  x.length
}
val numColumns = narrow_parq.rdd.map(numCols).distinct.collect

Операция .distinct() существенно усложняет задачу. Для простоты можно считать, что она добавляет reduce-фазу к процессу, что само по себе означает, что измеряется уже не только .map() для всего датасета, но и оверхед на обмен данными между воркерами.

Это не совсем та задача, которая будет выполняться при реальной обработке данных, но тем не менее, она форсит порцессинг всего датасета. И вновь Parquet оказывается почти в 2 раза быстрее Avro:
narrow map

Последнее, что обязательно нужно сделать — это сравнить размеры датасета на диске. График показывает размер в байтах. Avro был сконфигурирован на использование кодека сжатия Snappy, а для Parquet использовались дефолтные настройки.

Датасет в Parquet оказался меньше Avro на 25%.
narrow disk usage

Использовать настройки сжатия по умолчанию и даже не заглядывать в них — очень плохая практика.
Тем не менее, Parquet по умолчанию использует gzip. Gzip сжимает заметно сильнее Snappy. Вдруг разница в размерах обусловлена исключительно разными кодеками? Для корректного сравнения нужно посчитать размеры датасетов при использовании одинакового сжатия или вообще без оного.
Также для честности стоит отметить, что обычно текстовый файл можно сжать в разы. Допускаю, что агрессивно gzip-ованный исходный CSV-файл займет не больше 1,5 ГБ. Так что преимущество бинарных форматов будет не таким драматическим.

Тест "широкого" датасета

Я выполнил аналогичные операции на большом "широком" датасете. Напомню, этот датасет содержит 103 колонки и 694 миллиона строк, что выливается в 194 ГБ несжатого CSV файла.

А я, забегая вперед, сообщу, что это выливается в 5 ГБ Parquet и 17 ГБ Avro. Что при 500 воркерах даёт нам нагрузку на воркер в 100 МБ для Parquet или 340 МБ для Avro. По компактности хранения выиграл, конечно, Parquet. Но в файлах Avro получилось больше блоков, а значит, скорость их обработки можно увеличить, нарастив число воркеров. Так что, если загрузить кластер не в потолок и динамически рассчитывать число воркеров, можно добиться лучшей производительности Avro, чем в этих тестах.

Вначале я замерил время на сохранение широкого датасета в обоих форматах. Parquet каждый раз был быстрее Avro:
wide save as

В подсчете числа строк Parquet наголову разбил Avro, выдавая результат быстрее, чем за 3 секунды:
wide row count

Parquet по умолчанию использует размер блока в 128 МБ, что больше, чем средний объем данных на воркер. Таким образом, при работе с Parquet продолжает действовать трюк из "узкого" датасета, когда для вычисления числа строк в датасете достаточно прочесть одну чиселку из метаданных.
Для Avro-файлов приходится делать полное чтение датасета, интерпретируя только метаданные каждого блока и пропуская (не десериализуя) сами данные. Это выливается в "настоящую" работу диска. Для CSV ситуация еще хуже — там приходится еще и парсить каждый байт.

Для более сложных GROUP BY запросов Parquet вновь выходит в лидеры:
wide group by

Вот здесь стоит вспомнить, что есть возможность запустить в 3,4 раза больше воркеров для Avro. Сохранит ли Parquet тогда лидерство?

И даже для .map() преобразований всего датасета Parquet вновь побеждает с убедительным отрывом:
wide map

И здесь также стоит помнить, что есть возможность запустить в 3,4 раза больше воркеров для Avro. И какую долю во времени работы операции занимает .distinct(), а какую — собственно чтение с диска?

Последний тест, тест эффективности утилизации дискового пространства, показал впечатляющие результаты для обоих участников. Parquet смог сжать исходные 194 ГБ в 4.7 ГБ, обеспечив грандиозное сжатие выше 97%. Avro также показал впечатляющий результат, сжав данные до 16.9 ГБ (91% сжатия). Поапплодируем обоим участникам:
wide disk usage

Заключение

В итоге, Parquet продемонстрировал как минимум не худшую производительность на каждом тесте. При увеличении объема данных его преимущество стало очевидным. Своими хорошими результатами Parquet частично обязан лучшей эффективности сжатия, ведь Avro приходилось читать в 3,5 раза больше, чем Parquet. И Avro не показал той высокой производительности при чтении всего датасета, что приписывала ему молва.

Когда приходится выбирать формат хранения данных в Hadoop, нужно учитывать множество факторов, таких как интеграцию со сторонними приложениями, эволюцию схемы, поддержку специфических типов данных… Но если вы ставите производительность во главу угла, то тесты выше убедительно показывают, что Parquet — ваш лучший выбор.

И от себя добавлю. Это вполне годный замер производительности форматов. Он подтверждается многочисленными разрозненными наблюдениями из промышленного опыта нашей команды. Тем не менее, методология тестирования местами искажает замеры посторонними действиями (чтение CSV, GROUP BY', '.distinct(), ...), а местами совсем игнорирует важные вопросы (сжатие, форматы данных, ...). Я осознаю, что весьма непросто сделать каноничный тест с "дистиллированными" метриками. Но от блога Cloudera я ожидал именно этого.

Автор: fediq

Источник

Поделиться новостью

* - обязательные к заполнению поля