- PVSM.RU - https://www.pvsm.ru -
Apache Spark [1] на сегодняшний день является, пожалуй, наиболее популярной платформой для анализа данных большого объема. Немалый вклад в её популярность вносит и возможность использования из-под Python. При этом все сходятся на том, что в рамках стандартного API производительность кода на Python и Scala/Java сопоставима, но касательно пользовательских функций (User Defined Function, UDF) единой точки зрения нет. Попробуем разобраться в том, насколько увеличиваются накладные расходы в этом случае, на примере задачи проверки решения SNA Hackathon 2019 [2].
В рамках конкурса участники решают задачу сортировки новостной ленты социальной сети и загружают решения в виде набора отсортированных списков. Для проверки качества полученного решения сначала для каждого из загруженных списков вычисляется ROC AUC [3], а потом выводится среднее значение. Обратите внимание, что вычислить надо не один общий ROC AUC, а персональный для каждого пользователя — готовой конструкции для решения этой задачи нет, поэтому придется писать специализированную функцию. Хороший повод сравнить два подхода на практике.
В качестве платформы для сравнения мы будем использовать облачный контейнер с четырьмя ядрами и Spark, запущенный в локальном режиме, а работать с ним будем посредством Apache Zeppelin [4]. Для сравнения функциональности будем зеркально выполнять один и тот же код [5] в PySpark и Scala Spark. [здесь] Начнем с загрузки данных.
data = sqlContext.read.csv("sna2019/modelCappedSubmit")
trueData = sqlContext.read.csv("sna2019/collabGt")
toValidate = data.withColumnRenamed("_c1", "submit")
.join(trueData.withColumnRenamed("_c1", "real"), "_c0")
.withColumnRenamed("_c0", "user")
.repartition(4).cache()
toValidate.count()
val data = sqlContext.read.csv("sna2019/modelCappedSubmit")
val trueData = sqlContext.read.csv("sna2019/collabGt")
val toValidate = data.withColumnRenamed("_c1", "submit")
.join(trueData.withColumnRenamed("_c1", "real"), "_c0")
.withColumnRenamed("_c0", "user")
.repartition(4).cache()
toValidate.count()
При использовании стандартного API обращает на себя внимание практически полная идентичность кода, с точностью до ключевого слова val
. Время работы существенно не отличается. Теперь попробуем определить нужную нам UDF.
parse = sqlContext.udf.register("parse",
lambda x: [int(s.strip()) for s in x[1:-1].split(",")], ArrayType(IntegerType()))
def auc(submit, real):
trueSet = set(real)
scores = [1.0 / (i + 1) for i,x in enumerate(submit)]
labels = [1.0 if x in trueSet else 0.0 for x in submit]
return float(roc_auc_score(labels, scores))
auc_udf = sqlContext.udf.register("auc", auc, DoubleType())
val parse = sqlContext.udf.register("parse",
(x : String) => x.slice(1,x.size - 1).split(",").map(_.trim.toInt))
case class AucAccumulator(height: Int, area: Int, negatives: Int)
val auc_udf = sqlContext.udf.register("auc", (byScore: Seq[Int], gt: Seq[Int]) => {
val byLabel = gt.toSet
val accumulator = byScore.foldLeft(AucAccumulator(0, 0, 0))((accumulated, current) => {
if (byLabel.contains(current)) {
accumulated.copy(height = accumulated.height + 1)
} else {
accumulated.copy(area = accumulated.area + accumulated.height, negatives = accumulated.negatives + 1)
}
})
(accumulator.area).toDouble / (accumulator.negatives * accumulator.height)
})
При реализации специфичной функции видно, что Python лаконичнее, в первую очередь из-за возможности использовать встроенную функцию scikit-learn [6]. Однако есть и неприятные моменты — необходимо явно указывать тип возвращаемого значения, тогда как в Scala он определяется автоматически. Выполним операцию:
toValidate.select(auc_udf(parse("submit"), parse("real"))).groupBy().avg().show()
toValidate.select(auc_udf(parse($"submit"), parse($"real"))).groupBy().avg().show()
Код выглядит практически идентично, но результаты обескураживают.
Реализация на PySpark отрабатывала полторы минуты вместо двух секунд на Scala, то есть Python оказался в 45 раз медленнее. Во время работы top показывает 4 активных процесса Python, работающих на полную, и это говорит о том, что проблемы здесь создает совсем не Global Interpreter Lock [7]. Но! Возможно, проблема именно во внутренней реализации scikit-learn — попробуем воспроизвести код на Python буквально, не обращаясь к стандартным библиотекам.
def auc(submit, real):
trueSet = set(real)
height = 0
area = 0
negatives = 0
for candidate in submit:
if candidate in trueSet:
height = height + 1
else:
area = area + height
negatives = negatives + 1
return float(area) / (negatives * height)
auc_udf_modified = sqlContext.udf.register("auc_modified", auc, DoubleType())
toValidate.select(auc_udf_modified(parse("submit"), parse("real"))).groupBy().avg().show()
Проведенный эксперимент показывает интересные результаты. С одной стороны, при таком подходе производительность выровнялась, но с другой — пропала лаконичность. Полученные результаты могут говорить о том, что при работе в Python с использованием дополнительных С++ модулей появляются существенные накладные расходы на переход между контекстами. Конечно, подобные накладные расходы есть и при использовании JNI [8] в Java/Scala, однако с примерами деградации в 45 раз при их использовании мне сталкиваться не приходилось.
Для более детального анализа проведем два дополнительных эксперимента: с использованием чистого Python без Spark, чтобы измерить вклад именно от вызова пакета, и с увеличенным размером данных в Spark, чтобы амортизировать накладные расходы и получить более точное сравнение.
def parse(x):
return [int(s.strip()) for s in x[1:-1].split(",")]
def auc(submit, real):
trueSet = set(real)
height = 0
area = 0
negatives = 0
for candidate in submit:
if candidate in trueSet:
height = height + 1
else:
area = area + height
negatives = negatives + 1
return float(area) / (negatives * height)
def sklearn_auc(submit, real):
trueSet = set(real)
scores = [1.0 / (i + 1) for i,x in enumerate(submit)]
labels = [1.0 if x in trueSet else 0.0 for x in submit]
return float(roc_auc_score(labels, scores))
Эксперимент с локальным Python и Pandas подтвердил предположение о существенных накладных расходах при использовании дополнительных пакетов — при использовании scikit-learn скорость уменьшается более чем в 20 раз. Однако, 20 это не 45 — попробуем «раздуть» данные и снова сравнить производительность Spark.
k4 = toValidate.union(toValidate)
k8 = k4.union(k4)
m1 = k8.union(k8)
m2 = m1.union(m1)
m4 = m2.union(m2).repartition(4).cache()
m4.count()
Новое сравнение показывает преимущество по скорости Scala-реализации над Python в 7-8 раз — 7 секунд против 55. Напоследок попробуем «самое быстрое, что есть в Python» — numpy [9]:
Опять существенное замедление — 5 секунд Scala против 80 секунда Python. Подводя итоги, можно сделать следующие выводы:
В итоге, несмотря на всю прелесть Python, применение его в связке со Spark не всегда выглядит оправданным. Если данных не так много, чтобы накладные расходы на Python стали значимыми, то стоит подумать, а нужен ли здесь Spark? Если данных много, но обработка происходит в рамках стандартного Spark SQL API, то нужен ли здесь Python?
Если же данных много и часто приходится сталкиваться с выходящими за пределы SQL API задачами, то для выполнения того же объема работ при использовании PySpark придется увеличивать кластер в разы. Например, для Одноклассников стоимость капитальных расходов на кластер Spark увеличилась бы на многие сотни миллионов рублей. А если попробовать воспользоваться расширенными возможностями библиотек экосистемы Python, то есть риск замедления не просто в разы, а на порядок.
Некоторое ускорение можно получить, используя относительно новую функциональность векторизованных функций. В этом случае на вход UDF подается не отдельно взятый ряд, а пакет из нескольких рядов в виде Pandas Dataframe. Однако разработка этой функциональности еще не завершена [10], и даже в этом случае разница будет значительной [11].
Альтернативой может быть поддержание обширной команды data engineer-ов, способных оперативно закрывать потребности data scientist-ов дополнительными функциями. Или всё-таки погрузиться в мир Scala, благо это не так сложно: многие необходимые инструменты уже существуют [12], появляются обучающие программы [13], выходящие за рамки PySpark.
Автор: dmitrybugaychenko
Источник [14]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/python/311419
Ссылки в тексте:
[1] Apache Spark: http://spark.apache.org
[2] SNA Hackathon 2019: https://snahackathon.org
[3] ROC AUC: https://ru.wikipedia.org/wiki/ROC-%D0%BA%D1%80%D0%B8%D0%B2%D0%B0%D1%8F
[4] Apache Zeppelin: http://zeppelin.apache.org
[5] код: https://www.zepl.com/viewer/notebooks/bm90ZTovL2RtaXRyeWJ1Z2F5Y2hlbmtvL2E1MjJkNzc4OThjNjQ1NjE4Y2M2YTZlMGFlZjU0MjVjL25vdGUuanNvbg
[6] scikit-learn: https://scikit-learn.org/stable/
[7] Global Interpreter Lock: https://ru.wikipedia.org/wiki/Global_Interpreter_Lock
[8] JNI: https://ru.wikipedia.org/wiki/Java_Native_Interface
[9] numpy: http://www.numpy.org
[10] не завершена: https://issues.apache.org/jira/browse/SPARK-22216
[11] значительной: https://mindfulmachines.io/blog/2018/6/apache-spark-scala-vs-java-v-python-vs-r-vs-sql26
[12] существуют: https://habr.com/ru/company/mailru/blog/442688
[13] обучающие программы: http://gaurl.ru/iHrSZD
[14] Источник: https://habr.com/ru/post/443324/?utm_source=habrahabr&utm_medium=rss&utm_campaign=443324
Нажмите здесь для печати.