Когда старый MapReduce лучше нового Tez

в 13:53, , рубрики: big data, Hadoop, MapReduce, tez, Блог компании Mail.Ru Group, высокая производительность, метки:

Когда старый MapReduce лучше нового Tez - 1

Как всем известно, количество данных в мире растёт, собирать и обрабатывать поток информации становится всё сложнее. Для этого служит популярное решение Hadoop c идеей упрощения методов разработки и отладки многопоточных приложений, использующее парадигму MapReduce. Эта парадигма не всегда удачно справляется со своими задачами, и через некоторое время появляется «надстройка» над Hadoop: Apache Tez с парадигмой DAG. Под появление Tez подстраивается и HDFS-SQL-обработчик Hive. Но не всегда новое лучше старого. В большинстве случаев HiveOnTez значительно быстрее HiveOnMapReduce, но некоторые подводные камни могут сильно повлиять на производительность вашего решения. Здесь я хочу рассказать, с какими нюансами столкнулся. Надеюсь, это поможет вам ускорить ETL или другой Hadoop UseCase.

MapReduce, Tez и Hive

Как я сказал ранее, данных в мире всё больше и больше. И для их хранения и обработки придумывают всё более хитрые решения, среди них и Hadoop. Чтобы процесс обработки хранящихся на HDFS данных был прост даже для рядового аналитика, есть несколько SQL-надстроек над Hadoop. Самая старая и «простая» из них — Hive. Суть Hive такова: мы имеем данные в каком-либо внятном column-store формате, заносим информацию о них в метаданные, пишем стандартный SQL с рядом ограничений, и он генерирует цепочку MapReduce-job’ов, которые решают нашу задачу. Здорово, удобно, но медленно. Например, вот простой запрос:

select 
    t1.column1, 
    t2.column2 
from 
    table1 t1
    inner join table2  t2 on t1.column1 = t2.column1
union 
select 
    t3.column1, 
    t4.column2 
from 
    table3 t3
    inner join table4 t4 on t3.column1 = t4.column1
order by 
    column1;

Этот запрос порождает четыре джоба:

  • table1 inner join table2;
  • table3 inner join table4;
  • union;
  • sort.

Когда старый MapReduce лучше нового Tez - 2

Шаги выполняются последовательно, и каждый из них завершается записью данных на HDFS. Это выглядит весьма неоптимальным. Например, шаги 1 и 2 могли бы выполняться параллельно. А бывают и такие ситуации, когда у нескольких шагов разумно применить один и тот же Mapper, а потом уже на результаты этих Mapper’ов наложить несколько видов Reducer’ов. Но концепция MapReduce в рамках одного job’а не позволяет так делать. Для решения этой проблемы достаточно быстро появляется Apache Tez с концепцией DAG. Суть DAG сводится к тому, что вместо пары Mapper-Reducer (+epsilon) мы строим нецикличный направленный граф, каждая вершина которого является Mapper.Class’ом или Reduser.Class’ом, а ребра означают потоки данных / порядок выполнения. Кроме DAG, Tez предоставил еще несколько бонусов: ускоренный запуск job’ов (можно посылать DAG-job’ы через уже запущенный Tez-Engine), возможность удерживать ресурсы в памяти ноды между шагами, самостоятельно запускать распараллеливание и т. д. Естественно, вместе с Tez вышла и соответствующая надстройка над Hive. С этой надстройкой наш запрос превратится DAG-job примерно следующей структуры:

  1. Mapper считывает table1.
  2. Mapper считывает table2 и джойнит её с результатом шага 1.
  3. Mapper считывает table3 и фильтрует column1 IS NOT NULL.
  4. Mapper считывает table4 и фильтрует column1 IS NOT NULL.
  5. Reducer джойнит результаты шагов 3 и 4.
  6. Reducer, делающий union.
  7. Reducer Group By и Sort.
  8. Собирает результат.

Когда старый MapReduce лучше нового Tez - 3

Фактически шаги 1 и 2 — это первый join, а 2, 3 и 4 — это второй join (я специально подобрал таблицы разных размеров, чтобы join’ы обрабатывались по-разному). При этом два блока друг от друга не зависят и могут выполняться параллельно. Это уже очень здорово. Tez действительно даёт значительный прирост в скорости обработки сложных запросов. Но иногда Tez может быть хуже MapReduce, и поэтому перед отправкой в production стоит попробовать запрос как с set hive.execution.engine=tez, так и с set hive.execution.engine=mr.

Так что же такое Tez?

Всё, что надо знать о Tez: он меняет логику MapReduce на логику DAG (directed acyclic graph — направленного ациклического графа), предоставляя возможность в рамках одного DataFlow параллельно выполнять несколько разных процессов, будь то Mapper или Reducer. Главное, чтобы его входные данные были готовы. Хранить данные можно локально на нодах между шагами, а иногда и просто в оперативной памяти ноды, не прибегая к дисковым операциям. Можно оптимизировать количество и местоположение Mapper’ов и Reducer’ов, чтобы минимизировать передачу данных по Сети даже с учётом многошаговых расчётов, переиспользовать контейнеры, которые уже отработали в соседних процессах в рамках одного Tez-Job’а, и подстраивать параллельное выполнение под статистику, собранную на предыдущем шаге. Кроме того, движок позволяет конечному пользователю создавать DAG-задачи с той же простотой, что и MapReduce, при этом он сам будет заниматься ресурсами, перезапусками и управлением DAG на кластере. Tez очень мобилен, добавление поддержки Tez не ломает уже работающие процессы, а тестирование новой версии возможно локально «на клиентской стороне» тогда, когда во всех задачах кластера будет работать старая версия Tez. Last but not least: отметим, что Tez может запускаться на кластере как служба и работать в «фоновом режиме», что позволяет ему отправлять задачи на выполнение значительно быстрее, чем это происходит при стандартном запуске MapReduce. Если вы ещё не пробовали Tez и у вас остались сомнения, то посмотрите на сравнение скорости, опубликованное в презентации HortonWorks:

Когда старый MapReduce лучше нового Tez - 4

И в паре с Hive:

Когда старый MapReduce лучше нового Tez - 5

Но при всей этой красоте графиков и описаний в HiveOnTez есть и проблемы.

Tez менее устойчив к неравномерному распределению данных, чем MapReduce

Первая и самая большая проблема лежит в разнице создания DAG-job и MapReduce-job. У них один принцип: количество Mapper’oв и Reducer’ов рассчитывается в момент запуска job’а. Только когда запрос выполняется цепочкой MapReduce-job’ов, Hadoop рассчитывает необходимое количество задач на основе результата предыдущих шагов и собранной аналитики по источникам, а в случае DAG-job это происходит до вычисления всех шагов, только на основе аналитики.

Поясню на примере. Где-то в середине запроса по мере выполнения вложенных запросов у нас появляются две таблицы. По оценкам статистики, в каждой по n строк и по k уникальных значений join-ключа. На выходе ожидаем примерно n*k строк. И допустим, это количество хорошо укладывается в один контейнер, и Tez выделит на следующий шаг (допустим, сортировка) один Reducer. И это число Reducer’ов уже в процессе выполнения не поменяется независимо ни от чего. Теперь допустим, что на самом деле у этих таблиц очень плохой skew: на одно значение приходится n – k + 1 строка, а все остальные — по одной строке. Таким образом, на выходе мы получим n^2 + k^2 – 2kn – k + 2n строк. То есть (n + 2 – 2k)/k + (k – 1)/n больше n/k в два раза. И уже такое количество один Reducer будет выполнять вечность. А в случае с MapReduce, получив на выходе этого шага n^2 + k^2 – 2kn – k + 2n, Hadoop объективно оценит свои силы и выдаст нужное количество Mapper’ов и Reducer’ов. В результате c MapReduce всё отработает гораздо быстрее.

Сухие вычисления могут показаться надуманными, но на самом деле такая ситуация реальна. И если её не произошло, то считайте, что вам повезло. С аналогичным эффектом Tez-DAG’а я сталкивался ещё при использовании lateral view в сложных запросах или кастомных Mapper’ах.

Особенности тюнинга Tez

По иронии, последняя известная мне важная особенность Tez, которая может навредить, связана с его силой — DAG. Чаще всего кластер — это не просто хранилище информации. Это еще и система, в которой ведётся постобработка данных, и важно, чтобы на эту часть кластера не влияла остальная деятельность. Так как ноды — это ресурс, то обычно количество ваших контейнеров не безгранично. А значит, когда вы запускаете job, то лучше не забивать все контейнеры, чтобы сильно не тормозить регулярные процессы. И тут DAG может подложить вам свинью. DAG требуется (в среднем по палате) меньше контейнеров за счёт их переиспользования, более плавной нагрузки и т. д. Но когда быстрых шагов много, контейнеры начинают размножаться в геометрической прогрессии. Первые Mapper’ы ещё не доработали, но данные уже распространяются по другим Mapper’ам, под всё это выделяются контейнеры, и — бум! Ваш кластер забит в потолок, никто больше не может запустить ни одного job’а. Ресурсов не хватает, и вы смотрите, как медленно меняются цифры на прогресс-баре. MapReduce из-за своей последовательности от такого эффекта избавлен, но платите вы за это, как всегда, скоростью.

Мы давно знаем, как бороться с тем, что стандартный MapReduce занимает слишком много контейнеров. Регулируем параметры:

  • mapreduce.input.fileinputformat.split.maxsize: уменьшая — увеличиваем количество Mapper’ов;
  • mapreduce.input.fileinputformat.split.minsize: увеличивая — уменьшаем количество Mapper’ов;
  • mapreduce.input.fileinputformat.split.minsize.per.node, mapreduce.input.fileinputformat.split.minsize.per.rack: более тонкая настройка для контроля локальных (в смысле node или rack) партиций;
  • hive.exec.reducers.bytes.per.reducer: увеличивая — уменьшаем количество Reducer’ов;
  • mapred.tasktracker.reduce.tasks.maximum: выставляем максимальное количество Reducer’ов;
  • mapred.reduce.tasks: задаём конкретное число Reducer’ов.

Осторожно! В DAG все reduce-шаги будут иметь столько процессов, сколько вы укажете тут! Но параметры Tez более хитрые, и не всегда параметры, которые мы задали для MapReduce, на него действуют. Во-первых, Tez очень чувствителен к hive.tez.container.size, и интернет советует брать значение между yarn.scheduler.minimum-allocation-mb и yarn.scheduler.maximum-allocation-mb. Во-вторых, взгляните на параметры удержания неиспользуемого контейнера:

  • tez.am.container.ide.release-timeout-max.millis;
  • tez.am.container.ide.release-timeout-min.millis.

Опция tez.am.container.reuse.enabled активирует или дезактивирует переиспользование контейнеров. Если она отключена, то предыдущие два параметра не работают. И в-третьих, посмотрите на параметры группировки:

  • tez.grouping.split-waves;
  • tez.grouping.max-size;
  • tez.grouping.min-size.

Дело в том, что ради распараллеливания чтения внешних данных Tez изменил процесс формирования задач: сначала Tez оценивает, сколько волн (w) можно запустить на кластере, потом это количество умножается на параметр tez.grouping.split-waves, и произведение (N) делится на количество стандартных сплитов на задачу. Если результат действий находится между tez.grouping.min-size и tez.grouping.max-size, то всё хорошо и задача запускается в N задач. Если нет, то число адаптируется к рамкам. Документация по Tez советует «только в качестве эксперимента» выставлять параметр tez.grouping.split-count, который отменяет всю вышеизложенную логику и группирует сплиты в указанное в параметре количество групп. Но я этим свойством стараюсь не пользоваться, оно не дает гибкости Tez’у и Hadoop’у в целом для оптимизации под конкретные входные данные.

Нюансы Tez

Кроме крупных проблем, Tez не избавлен от маленьких недостатков. Например, если вы пользуетесь http Hadoop ResourceManager’ом, то вы не увидите в нём, сколько какой Tez-job занимает контейнеров, а тем более — в каком состоянии его Mapper’ы и Reducer’ы. Для мониторинга состояния кластера я использую этот маленький python-скрипт:

import os
import threading

result = []
e = threading.Lock()

def getContainers(appel):
    attemptfile = os.popen("yarn applicationattempt -list " + appel[0])
    attemptlines = attemptfile.readlines()
    attemptfile.close()

    del attemptlines[0]
    del attemptlines[0]

    for attempt in attemptlines:
        splt = attempt.split('t');
        if ( splt[1].strip()  == "RUNNING" ):
            containerfile = os.popen("yarn container -list " + splt[0] )
            containerlines = containerfile.readlines()
            containerfile.close()
            appel[2] += int( containerlines[0].split("Total number of containers :")[1].strip()  )
    e.acquire()
    result.append(appel)
    e.release()

appfile = os.popen("yarn application -list -appStates RUNNING")
applines = appfile.read()
appfile.close()

apps = applines.split('application_')
del apps[0]

appsparams = []

for app in apps:
    splt = app.split('t')
    appsparams.append(['application_' + splt[0],splt[3], 0])

cnt = 0
threads = []

for app in appsparams:
    threads.append(threading.Thread(target=getContainers, args=(app,)))
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

result.sort( key=lambda x:x[2] )

total = 0
for app in result:
    print(app[0].strip() + 't' + app[1].strip() + 't' + str(app[2]) )
    total += app[2]

print("Total:",total)

Несмотря на заверения HortonWorks, наша практика показывает, что когда в Hive вы делаете простой SELECT smth FROM table WHERE smth, то чаще всего MapReduce отработает быстрее, правда, ненамного. К тому же в начале статьи я вас немного обманул: распараллеливание в HiveOnMapReduce возможно, но не такое интеллектуальное. Достаточно только сделать set hive.exec.parallel=true и настроить set hive.exec.parallel.thread.number=… — и независимые шаги (пары Mapper + Reducer) будут выполняться параллельно. Да, в нём нет возможности, что на выходе одного Mapper’а будет запускаться несколько Reducer’ов или следующих Mapper’ов. Да, распараллеливание куда более примитивно, но тоже ускоряет работу.

Ещё одна интересная особенность Tez: он запускает свой движок на кластере и держит его включённым некоторое время. С одной стороны, это действительно ускоряет работу, так как задача запускается на нодах значительно быстрее. Но с другой стороны — неожиданный минус: важные процессы в таком режиме запускать нельзя, потому что TEZ-engine со временем порождает слишком много классов и падает с GC-overflow. И бывает так: вы запустили на ночь nohup hive -f ....hql > hive.log &, пришли утром, а оно где-то посередине упало, хайв завершился, temporary tables удалились, и всё надо считать заново. Неприятно.

Добавляет в копилку мелких проблем то, что старый добрый MapReduce уже вошёл в стабильный релиз, а TEZ, несмотря на популярность и прогрессивность, до сих пор находится в версии 0.8.4, и баги в нём могут встретиться на любом шагу. Самый страшный баг для меня — это удаление информации, но такого я не встречал. А вот с некорректным расчётом на Tez мы сталкивались, причём MapReduce считает корректно. Например, мой коллега использовал две таблицы — table1 и table2, имеющие уникальное поле EntityId. Сделал через Tez запрос:

select 
    table1.EntityId, count(1)
from 
    table1
    left join table2 on table1.EntityId = table2.EntityId
group by 
    EntityId 
having 
    count(1) > 1

И получил на выходе какие-то строки! Хотя MapReduce ожидаемо вернул пустой результат. Про похожую проблему есть bugreport.

Заключение

Tez — безусловное благо, которое в большинстве случаев делает жизнь проще, позволяет писать в Hive более сложные запросы и ожидать на них быстрого ответа. Но, как и любое благо, оно требует к себе осторожного подхода, осмотрительности и знания каких-то нюансов. И как следствие, иногда использование старого, проверенного, надёжного MapReduce лучше, чем использование Tez. Я очень удивился, что не смог найти ни одной статьи (ни в рунете, ни в инглишнете) о минусах HiveOnTez, и решил восполнить этот пробел. Надеюсь, что информация окажется кому-то полезной. Спасибо! Всем удачи и пока!

Автор: Mail.Ru Group

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js