Когда использовать параллельные stream-ы

в 9:50, , рубрики: concurrency, java, java streams, параллельное программирование

Источник
Авторы: Doug Lea совместно с Brian Goetz, Paul Sandoz, Алексей Шипилёв, Heinz Kabutz, Joe Bowbeer, ...

Фреймворк java.util.streams содержит управляемые данными (data-driven) операции над коллекциями и другими источниками данных. Большинство потоковых (stream) методов выполняют одну и ту же операцию над каждым из элементов. С помощью метода коллекций parallelStream(), при наличии нескольких ядер, data-driven можно превратить в data-parallel. Но когда это стоит делать?

Рассмотрим использование S.parallelStream().operation(F) вместо S.stream().operation(F) при условии, что операции независимы друг от друга, и, либо затратны с точки зрения вычислений, либо применяются к большому числу элементов эффективно расщепляемой (splittable) структуры данных, либо и то и другое. Точнее:

  • F: функция для работы с одним элементом, обычно лямбда, является независимой, т.е. выполнение операции над любым из элементов не зависит и не влияет на операции над другими элементами (рекомендации по использованию независимых (non-interfering) функций без состояния см. в документации пакета stream).
  • S: исходная коллекция эффективно расщепляема. Кроме коллекций существуют и другие, пригодные для распараллеливания, потоковые источники данных, например, java.util.SplittableRandom (для распараллеливанию которого можно использовать метод stream.parallel()). Но большинство источников с I/O в основе спроектированы, главным образом, для работы в последовательном режиме.
  • Общее время работы в последовательном режиме превышает минимально-допустимый предел. В наши дни для большинства платформ предел грубо равен (в границах x10) 100 микросекунд. Точные измерения, в данном случае, не обязательны. Для практических целей достаточно просто умножить N (количество элементов) на Q (время работы одной F), а Q примерно можно оценить числом операций или количеством строк кода. После надо проверить, что N * Q, по крайней мере, меньше 10000 (если трусите, то добавьте один или пару нолей). Итак, если F — это небольшая функция вроде x -> x + 1, то паралельное выполнение будет иметь смысл при N >= 10000. И наоборот, в случае если F — это увесистое вычисление, подобно нахождению следующего лучшего хода в игре в шахматы, то значение Q столь велико, что N можно пренебречь, но до тех пор пока коллекция полностью расщепляема.

Фреймворк потоковой обработки не будет (и не может) настаивать на чем-либо из перечисленного выше. Если вычисления зависимы между собой, то их параллельное выполнение не имеет смысла, либо вообще будет вредным и приведет к ошибкам. К другим, производным от приведенных выше инженерных ограничений (issues) и компромиссов (tradeoffs), критериям относятся:

  • Запуск (Start-up)
    Появление у процессоров дополнительных ядер, в большинстве случаев, сопровождалось добавлением механизма управления питанием, который может быть причиной замедления запуска ядер, иногда с дополнительными накладками от JVM, операционной системы и гипервизора. В этом случае предел, при котором параллельный режим имеет смысл, грубо соответствует времени, требуемому для начала обработки подзадач достаточным числом ядер. После этого параллельные вычисления могут быть более энергоэффективными, чем последовательные (в зависимости от деталей работы процессоров и систем. Для примера см. статью).
  • Детализация (Granularity)
    Редко имеет смысл дробить и так небольшие вычисления. Фреймворк, обычно, делит задачу таким образом, чтобы отдельные части могли работать на всех доступных ядрах системы. Если, после старта, для каждого ядра практически нет работы, то впустую будут растрачены (обычно последовательные) усилия по организации параллельных вычислений. Учитывая, что на практике количество ядер составляет от 2-х до 256-ти порог, также, препятствует нежелательному эффекту от чрезмерного деления задачи.
  • Делимость (Splittability)
    Наиболее эффективно расщепляемые коллекции включают ArrayList и {Concurrent}HashMap, а также обычные массивы (T[], которые разбиваются на части с помощью статических методов java.util.Arrays). Наименее эффективно расщепляемыми являются LinkedList, BlockingQueue и большинство источников с I/O в основе. Остальные где-то посередине (структуры данных которые поддерживают произвольный доступ и/или эффективный поиск обычно являются эффективно-расщепляемыми). Если дробление данных занимает больше времени, чем обработка, то усилия напрасны. Если Q достаточно велико, то можно получить прирост из-за распараллеливания даже для LinkedList, но это довольно редкий случай. Вдобавок, некоторые источники не могут быть расщеплены до одиночного элемента, и, таким образом, может иметь место ограничение на степень декомпозиции задачи.

Получение точных характеристик этих эффектов может быть затруднительным (хотя, если постараться, и выполнимым, используя такие инструменты как JMH). Но совокупный эффект заметить довольно легко. Чтобы ощутить его самим — проведите эксперимент. Например, на 32-х ядерной машине для тестов при запуске небольших функций, вроде max() или sum(), над ArrayList точка безубыточности (break-even) примерно равна 10,000. Для большего количества элементов отмечается ускорение до 20-ти раз. Время работы для коллекций с менее чем 10,000 элементов не намного меньше чем для 10,000, и поэтому медленнее, чем при последовательной обработке. Наихудший результат возникает при менее чем 100 элементах — в этом случае задействованные потоки останавливаются не сделав ничего полезного, т.к. вычисления завершаются еще до их старта. С другой стороны, когда операции над элементами затратны по времени, в случае использования эффективно и полностью расщепляемых коллекций, таких как ArrayList, польза видна сразу.

Если перефразировать все вышесказанное, то использование parallel() в случае неоправданно малого объема вычислений может стоить около 100 микросекунд, а использование в противном случае должно сохранить, по крайней мере, само это время (или возможно часы для очень больших задач). Конкретная стоимость и польза будет различаться со временем и для различных платформ, и, также, в зависимости от контекста. Например, запуск небольших вычислений параллельно внутри последовательного цикла усиливает эффект подъемов и спадов (микротесты производительности, в которых проявляется подобное, могут не отражать реальную ситуацию).

Вопросы и ответы

  • Почему JVM сама не может понять когда выполнять операции в параллельном режиме?

Она могла бы попытаться, но слишком часто решение будет неверным. Поиски полностью автоматического многоядерного параллелизма не привели к универсальному решению за последние тридцать лет, и, поэтому, фреймворк использует более надежный подход, требующий от пользователя лишь выбор между да или нет. Данный выбор основывается на, постоянно встречающихся и в последовательном программировании, инженерных проблемах, которые вряд ли полностью исчезнут когда-либо. Например, вы можете столкнуться со стократным замедлением при поиске максимального значения в коллекции содержащей единственный элемент в сравнение с использованием этого значения напрямую (без коллекции). Иногда JVM может оптимизировать подобные случаи за вас. Но это редко происходит в последовательных случаях, и никогда в случае параллельного режима. С другой стороны, можно ожидать, что, по мере развития, инструменты будут помогать пользователям принимать более верные решения.

  • Что если для принятия хорошего решения у меня нет достаточного знания о параметрах (F, N, Q, S)?

Это, также, схоже с часто встречающимися в последовательном программировании проблемами. Например, метод S.contains(x) класса Collection, обычно, выполняется быстро если S это HashSet, медленно если LinkedList, и средне в других случаях. Обычно, для автора компоненты использующей коллекцию, лучшим выходом из данной ситуации является ее инкапсуляция и публикация только конкретной операции над ней. Тогда пользователи будут изолированы от необходимости выбора. То же самое применимо и к параллельным операциям. Например, компонент с внутренней коллекцией цен может определить метод проверяющий её размер на достижение предела, что будет иметь смысл до тех пор пока поэлементные вычисления не станут слишком дорогими. Пример:

public long getMaxPrice() { return priceStream().max(); }

private Stream priceStream() {
 return (prices.size() < MIN_PAR) ? 
   prices.stream() : prices.parallelStream();
}

Эту идею можно распространить и на другие соображения о том когда и как использовать параллелизм.

  • Что если моя функция, возможно, будет делать I/O или синхронизированные операции?

Одной крайностью являются функции, которые не соответствуют критериям независимости, включая последовательные по своей природе I/O-операции, доступ к блокирующим синхронизированным ресурсам, и случаи когда ошибка в одной параллельной подзадаче, выполняющей I/O, влияет на другие. Их распараллеливание не имеет большого смысла. С другой стороны находятся вычисления изредка выполняющие I/O или редко блокируемую синхронизацию (например, большинство случаев логирования, и использование таких конкурентных коллекций как ConcurrentHashMap). Они безвредны. То, что находится между ними требует большего исследования. Если каждая подзадача может блокироваться на значительное время ожидая I/O или доступа, ресурсы ЦПУ будут простаивать без возможности их использования программой или JVM. От такого плохо всем. В этих случаях параллельная потоковая обработка не всегда является верным выбором. Но есть хорошие альтернативы — например, асинхронное I/O и подход CompletableFuture.

  • Что если мой источник основан на I/O?

На данный момент, использующие I/O генераторы Streamов JDK (например, BufferedReader.lines()), приспособлены, главным образом, для использования в последовательном режиме, обрабатывая элементы один за другим по мере поступления. Поддержка высокоэффективной массовой (bulk) обработки буферизированного I/O возможна, но, на данный момент, это требует разработки специальных генераторов Streamов, Spliteratorов и Collectorов. Поддержка некоторых общих случаев может быть добавлена в будущих релизах JDK.

  • Что если моя программа работает на загруженном компьютере и все ядра заняты?

Машины, обычно, располагают фиксированным числом ядер, и не могут магическим образом создавать новые при выполнении параллельных операций. Однако, до тех пор пока критерии выбора параллельного режима явно говорят за, сомневаться не в чем. Ваши параллельные задачи будут конкурировать за ЦПУ с другими и вы заметите меньшее ускорение. В большинстве случаев это все равно более эффективно, чем другие альтернативы. Лежащий в основе механизм спроектирован так, что если доступные ядра отсутствуют, то вы заметите лишь небольшое замедление в сравнение с последовательным вариантом, за исключением случаев когда система настолько перегружена, что тратит все свое время на переключение контекста вместо выполнения какой-то реальной работы, или настроена в расчете на то, что вся обработка выполняется последовательно. Если у вас такая система, то, возможно, администратор уже отключил использование много- поточности/ядерности в настройках JVM. А если администратором системы являетесь вы сами, то есть смысл это сделать.

  • Все ли операции распараллеливаются при использовании параллельного режима?

Да. По крайней мере в какой-то степени. Но стоит принимать во внимание, что stream-фреймворк учитывает ограничения источников и методов при выборе того как это делать. В общем, чем меньше ограничений, тем больший потенциал параллелизма. С другой стороны нет гарантий, что фреймворк выявит и применит все имеющиеся возможности для параллелизма. В некоторых случаях, если у вас есть время и компетенции, собственное решение может намного лучше использовать возможности параллелизма.

  • Какое ускорение я получу от параллелизма?

Если вы придерживаетесь данных советов, то, обычно, достаточное чтобы иметь смысл. Предсказуемость не является сильной стороной современного аппаратного обеспечения и систем, и поэтому универсального ответа нет. Локальность кеша, характеристики GC, JIT-компиляция, конфликты обращения к памяти, расположение данных, политики диспетчеризации ОС, и наличие гипервизора являются одними из факторов имеющих значительное влияние. Производительность последовательного режима, также, подвержена их влиянию, которое, при использовании параллелизма, часто усиливается: проблема вызывающая 10-ти процентную разницу в случае последовательного выполнения может привести к 10-ти кратной разнице при параллельной обработке.

Stream-фреймворк включает некоторые возможности, которые помогают увеличить шансы на ускорение. Например, использование специализации для примитивов, такой как IntStream, обычно имеет больший эффект для параллельного режима, чем для последовательного. Причиной является то, что в этом случае не только снижается потребление ресурсов (и памяти), но также улучшается локальность кеша. Использование ConcurrentHashMap вместо HashMap, в случае параллельной работы операции collect, уменьшает внутренние издержки. Новые советы и рекомендации будут появляться по мере накопления опыта работы с фреймворком.

  • Все это слишком страшно! Не можем ли мы просто придумать правила использования свойств JVM для выключения параллелизма?

Мы не хотим говорить вам что делать. Появление для программистов новых способов делать что-то неправильно может пугать. Ошибки в коде, архитектуре, и оценках конечно же будут случаться. Десятилетия назад некоторые люди предсказывали, что наличие параллелизма на уровне приложения приведет к большим бедам. Но это так и не сбылось.

Автор: semyong

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js