Rayon: параллелизм данных в Rust

в 16:48, , рубрики: Rust, параллельное программирование, полиморфизм, Программирование, типобезорасность

Последние пару недель я работал над обновление Rayon — моей экспериментальной библиотеки для параллелизма данных в Rust.
Я вполне доволен тем, как идёт разработка, так что я решил объяснить к чему я пришёл в блог посте.
Цель Rayon — сделать добавление параллелизма в последовательный код простым, так, чтобы любой цикл for или итератор можно было бы заставить работать в несколько потоков. Например если у вас есть такая цепочка итераторов:

let total_price = stores.iter()
                        .map(|store| store.compute_price(&list))
                        .sum()

то вы можете сделать её работу параллельной просто поменяв обычный «последовательный итератор» на «параллельный итератор» из Rayon:

let total_price = stores.par_iter()
                        .map(|store| store.compute_price(&list))
                        .sum()


Конечно мало сделать параллелизм простым, его надо сделать ещё и безопасным. Rayon гарантирует, что использование его API никогда не приведёт к гонке данных.
Этот пост объясняет принципы работы Rayon. Сначала рассказывается про основной примитив Rayon (join), а затем — как он реализован.
Я отдельно хочу обратить внимание на то, как сочетание множества фич Rust позволяют реализовать join с очень низкими накладными расходами во время выполнения программы, и при этом давая строгие гарантии безопасности. Затем я кратко расскажу как на основе join строится абстракция параллельного итератора.
Однако я хочу особо подчеркнуть, что Rayon в большей степени находится в процессе разработки. Я ожидаю, что дизайн параллельного итератора пройдёт ещё множество, скажем так, итераций (каламбур не намеренный), поскольку текущая реализация не настолько гибкая, как я бы того хотел. Кроме того есть несколько частных случаев, которые обрабатываются некорректно, в частности что касается распространения паники и очистки ресурсов. И всё же Rayon определённо может быть полезен для определённых задач уже сейчас. Чему я очень рад, как, надеюсь, будете рады и вы!

Основной примитив Rayon: join

В начала поста я показал пример использования параллельного итератора для операции map-reduce:

let total_price = stores.par_iter()
                        .map(|store| store.compute_price(&list))
                        .sum()

Однако фактически параллельные итераторы — лишь маленькая библиотечка, построенная на основе более фундаментального примитива: join. Использование join очень простое. Вы вызываете два замыкания, как показано ниже, и join потенциально запустит их параллельно. Как только они оба завершатся, он вернёт результат:

// `do_something` и `do_something_else` *могут* выполниться параллельно
join(|| do_something(), || do_something_else())

Основной момент тут в том, что два замыкания потенциально могут выполниться параллельно: решение, использовать или нет параллельные потоки, принимается динамически, в зависимости от того, есть ли свободные ядра, или нет. Идея в том, что вы можете с помощью join помечать в своей программе те места, где параллелизм может быть полезен, а затем позволить библиотеке во время выполнения решать, использовать ли его, или нет.
Подход потенциального параллелизма — основная идея, которая отличает Rayon от ограниченных потоков crossbeam. Если в crossbeam вы распределяете работу по двум ограниченным потокам, то она всегда будет выполняться параллельно в разных потоках. В то же время вызов join в Rayon не обязательно приводит к параллельному выполнению кода. В результате у нас не только более простой API, но более эффективное использование ресурсов. Всё оттого, очень трудно заранее предсказать, когда распараллеливание окажется выгодным, это всегда требует знание некоторого глобального контекста, например: есть ли у компьютера свободные ядра и какие ещё параллельные операции сейчас выполняются? Фактически, одна из основных целей этого поста — пропагандировать потенциальный параллелизм как основу для библиотек для параллелизма данных в Rust, в отличие от ганатированного параллелизма, который мы видели ранее.
Это не говоря о том, что отдельной роли для гарантированного параллелизма, которую предлагает crossbeam, нет. Семантика потенциального параллелизма так же накладывает некоторые ограничения на то, что могут делать ваши распараллеленные замыкания. Например, если вы я попытаюсь использовать канал для общения между двумя замыканиями в join, то это скорее всего приведёт к взаимной блокировке (deadlock). О join стоит думать, как о подсказке использовать параллелизм в обычно последовательном алгоритме. Иногда это не то, что вы хотите — некоторые алгоритмы изначально параллельные. (Заметьте, однако, что совершенно нормально использовать типы вроде Mutex, AtomicU32 и т.д. изнутри join — вы просто не хотите, чтобы одно замыкание блокировалось в ожидании другого.)

Пример использования join: параллельный quicksort

Примитив join идеально подходит для алгоритмов из серии разделяй и властвуй. Эти алгоритмы разделяют работу на примерно равные части и затем рекурсивно её выполняют. Например мы можем реализовать параллельную версию quicksort:

fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
    if v.len() > 1 {
        let mid = partition(v);
        let (lo, hi) = v.split_at_mut(mid);
        rayon::join(|| quick_sort(lo),
                    || quick_sort(hi));
    }
}
fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
    // см. https://en.wikipedia.org/wiki/Quicksort#Lomuto_partition_scheme
}

Фактически единственная разница между этой версией quicksort и последовательной в том, что мы в конце вызываем rayon::join!

Как реализован join: перехват работы (work-stealing)

Внутри join реализован с использованием техники, известной как перехват работы. Насколько мне известно, перехват работы был впервые представлен как часть проекта Cilk, и с тех пор стал довольно стандартной техникой (фактически название Rayon (англ. «вискоза» — прим. перев.) — дань уважения Cilk).
Главная идея в том, что при каждом вызове join(a, b) мы определяем две задачи a и b, которые могут быть безопасно выполнены параллельно, но мы пока что не знаем, есть ли для этого свободные потоки. Всё, что делает текущий поток, это добавляет b в очередь «планируемой работы», а затем берёт, и немедленно выполняет a. В то же время существует пул других активных потоков (обычно по одному потоку на ядро ЦП, или что-то типа того). Как только какой-то из потоков освобождается, он идёт и копается в очередях «планируемой работы» других потоков: если там находится задача, свободный поток захватывает её и выполняет её сам. Так что в таком случае, пока первый поток занят выполнением a, другой поток может начать выполнение b.
Как только первый поток заканчивает a, он проверяет: начал ли кто-то другой выполнять b? Если нет, то он выполняет её сам. Если да, то ему нужно подождать, пока другой поток её закончит. Но пока первый поток ждёт, он может пойти, и стащить работу у другого процесса, тем самым способствуя завершению всего процесса работы в целом.
В виде Rust-подобного псевдокода join выглядит как-то так (настоящий код немного другой, например, он позволяет каждой операции иметь результат):

fn join<A,B>(oper_a: A, oper_b: B)
    where A: FnOnce() + Send,
        B: FnOnce() + Send,
{
    // Показать `oper_b` другим потокам, так чтобы они могли её перехватить:
    let job = push_onto_local_queue(oper_b);

    // Выполняем `oper_a` сами:
    oper_a();

    // Check whether anybody stole `oper_b`:
    if pop_from_local_queue(oper_b) {
        // Не перехвачена, выполяем сами.
        oper_b();
    } else {
        // Перехвачена, ждём завершения её обработки.
        // В то же время попытаемся перехватить работу у других:
        while not_yet_complete(job) {
            steal_from_others();
        }
        result_b = job.result();
    }
}

Что делает перехват работы таким элегантным, так это его естественная адаптация к нагрузке ЦП. То есть если все рабочие потоки заняты, то join(a, b) начинает выполнять все замыкания последовательно (т. е. a(); b();), что не хуже, чем последовательный код. Но если свободные потоки есть, то мы получаем параллельное выполнение.

Замер производительности

Rayon всё ещё довольно молод, так что у меня не очень много тестовых программ (и я пока что особо не оптимизировал). Не смотря на это, уже сейчас можно получить заметное ускорение, хотя для этого придётся потратить немного больше времени на отладку, чем мне бы хотелось. Например с доработанной версией quicksort я вижу следующее ускорение от параллельного выполнения на моём 4-хядерном Macbook Pro (так что четырёхкратное ускорение — максимум, чего можно ожидать):

Длина массива Ускорение
1K 0.95x
32K 2.19x
64K 3.09x
128K 3.52x
512K 3.84x
1024K 4.01x

Изменение, которая сделал по сравнению с оригинальной версией — я добавил переход к последовательному алгоритму. Суть такова, что если входной массив достаточно мал (в моём коде — менее 5000 элементов), то мы переходим к последовательной версии алгоритма, отказываясь от вызова join. Это можно сделать вообще без дублирования кода с помощью типажей, как видно из кода моего примера. (Если любопытно, я объясняю идею в приложении в конце статьи.)
Надеюсь после некоторых оптимизаций переход к последовательному выполнению будет нужен менее часто, но стоит отметить, что высокоуровневые API (такие как параллельный итератор, который я упоминал выше) тоже могут делать переход к последовательному выполнению за вас, так что не нужно про это постоянно думать.
В любом случае, если вы не делаете переход к последовательному выполнению, то результаты будут не такими хорошими, хотя они и могли бы быть намного хуже:

Длина массива Ускорение
1K 0.41x
32K 2.05x
64K 2.42x
128K 2.75x
512K 3.02x
1024K 3.10x

В частности помните, что эта версия кода отдаёт на параллельную обработку все подмассивы вплоть до единичной длины. Если массив длиной 512K или 1024K, то создаётся очень много подмассивов, а значит и очень много задач, но мы всё равно получаем ускорении вплоть до 3.10x. Я думаю, причина того, что код выполняется так хорошо в том, что основной подход верный: Rayon избегает выделения памяти и виртуальной диспетчеризации, как раскрыто в следующей части. И всё же я бы хотел лучшей производительности, чем 0.41x для массивов длиной 1K (и думаю, что это возможно).

Использование фич Rust для минимизации накладных расходов

Как можно увидеть выше, чтобы сделать эту схему рабочей, нужно как можно сильнее уменьшить накладные расходы на помещение задачи в локальную очередь. В конце концов ожидается, что большинство задач не будут перехвачены, потому что число процессоров гораздо меньше числа задач. API Rayon сделано так, чтобы использовать некоторые фичи Rust для уменьшения этих накладных расходов:

  • join полиморфен относительно типов замыканий своих аргументов. А это означает, что в процессе мономорфизации будут созданы отдельные копии join специализированные для каждого конкретного вызова. Что в свою очередь приводит к тому, что когда join вызывает oper_a() и oper_b() (в отличие от относительно редких случаев, когда они перехватываются), вызовы диспетчеризуются статически, а значит могут быть заинлайнены. Да, и выходит, что создание замыкания не требует выделения памяти.
  • Поскольку join блокирует выполнения до выполнения обоих замыканий, мы можем на полную использовать размещение на стеке. Это хорошо и для пользователей API, и для реализации: например, пример с quicksort выше основан на том, что имеет доступ к срезу &mut [T], который передаётся на вход, что возможно благодаря блокировке в join. В то же время реализация join может полностью избежать выделения памяти из кучи и использовать только стек (например объекты замыканий, которые кладутся в локальную очередь задач, размещаются на стеке).

Как видно из вышенаписанного, накладные расходы на размещение задачи достаточно низкие, хотя и не настолько, насколько я бы хотел. Есть несколько способов уменьшить их ещё больше:

  • Многие реализации перехвата работы используют эвристики при принятии решения о пропуске размещения задачи в очереди задач на параллельную обработку. Например в работе Ленивое планирование Тзаннеса (Tzannes) сделана попытка избежать размещения задачи в очереди, если отсутствуют свободные рабочие потоки (там они названы «голодными» потоками), способные перехватить работу.
  • И конечно могут помочь старые добрые оптимизации. К примеру, я никогда даже не заглядывал в биткод LLVM или ассемблерный код, полученный при компиляции join, и очень похоже, что там оптимизировать проще всего.

Свобода от гонок данных

Ранее я упоминал, что Rayon гарантирует свободу от гонок данных. Это означает, что можно добавлять параллелизм в ранее последовательный код, не волнуясь о том, что могут появиться странные, сложные в воспроизведении баги.
Есть два типа ошибок, о которых мы должны беспокоиться. Во-первых, два замыкания могут использовать одно и то же изменяемое состояние, так что изменения, сделанные в одном потоке, могут повлиять на другой. Например если я изменю пример выше так, что он (неправильно) вызывает quick_sort с параметром lo в обоих замыканиях, то я надеюсь, что код не скомпилируется:

fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
    if v.len() > 1 {
        let mid = partition(v);
        let (lo, hi) = v.split_at_mut(mid);
        rayon::join(|| quick_sort(lo),
                    || quick_sort(lo)); // <-- oops
    }
}

И вправду, я увижу такую ошибку:

test.rs:14:10: 14:27 error: closure requires unique access to `lo` but it is already borrowed [E0500]
test.rs:14          || quick_sort(lo));
                    ^~~~~~~~~~~~~~~~~

Похожие ошибки возникают если я попытаюсь в одном замыкании обработать lo (или hi), а в другом — v, который перекрывается с обоими срезами.
Замечание: этот пример кажется искусственным, но на самом деле это настоящий баг, который я однажды допустил (или, скорее, допустил бы) при реализации параллельных итераторов, про которые я расскажу позже. Сделать такие ошибки при копи-пасте очень просто, и очень хорошо, что Rust превращает их в невозможное событие, а не в баг с падение программы.
Другой вид багов, который можно словить — использование потоконебезопасных типов из одного из замыканий в join. Например Rust предлагает тип с неатомарным счётчиком ссылок под названием Rc. Поскольку Rc использует неатомарные инструкции для обновления счётчика ссылок, разделять Rc между разными потоками не безопасно. Если кто-то попробует сделать так, как в следующем примере, счётчик ссылок может запросто стать не правильным, что может привести к двойному освобождению памяти или чему похуже:

fn share_rc<T:PartialOrd+Send>(rc: Rc<i32> {
    // В замыканиях нижу вызовы `clone` увеличивают счётчики ссылок.
    // Эти вызовы МОГУТ выполниться параллельно.
    // Что будет не очень хорошо!
    rayon::join(|| something(rc.clone()),
                || something(rc.clone()));
}

Но, конечно, если я попытаюсь скомпилировать этот пример, я получу ошибку:

test.rs:14:5: 14:9 error: the trait `core::marker::Sync` is not implemented
                        for the type `alloc::rc::Rc<i32>` [E0277]
test.rs:14     rayon::join(|| something(rc.clone()),
            ^~~~~~~~~~~
test.rs:14:5: 14:9 help: run `rustc --explain E0277` to see a detailed explanation
test.rs:14:5: 14:9 note: `alloc::rc::Rc<i32>` cannot be shared between threads safely

Как видите, в последнем сообщение после «note»" компилятор говорит нам, что вы не можете разделять доступ к Rc между различными потоками.
Вам может быть интересно, какая такая тёмная магия позволяет функции join поддерживать оба этих инварианта? Фактически, ответ удивительно прост. Первая ошибка, которую я получил, когда попытался передать один и тот же &mut-срез в два разных замыкания, проистекает из базовой системы типов Rust: нельзя иметь два замыкания, которые оба существуют одновременно, и при этом имеют доступ к одному и тому же &mut-срезу (срезу с правом изменения данных, на которые он ссылается — прим. перев.). Это потому, что доступ к &mut-данным должен быть уникальным, а значит если бы у вас два замыкания могли получить уникальный доступ к одному и тому же &mut-значению, это бы сделало значение не таким уж уникальным.
(Фактически, это было одно из величайших прозрений для меня при работе с системой типов Rust. До этого я думал, что «висячие указатели» в последовательных программах и «гонки данных» — совершенно разного рода баги, но теперь я представляю их как две головы одной Гидры. В основе своей оба вида багов имеют безудержное использование псевдонимов и изменения данных, и оба они могут быть решены с помощью системы владения и заимствования. Ловко, да?)
Так что же касательно второй ошибки, при которой я попытался послать Rc между потоками? Она возникла оттого, что функция join требует, чтобы оба её аргумента-замыкания удовлетворяли типажу Send. Типаж Send в Rust обозначает, что данные могут быть безопасно переданы между потоками. Так что когда join объявляет, что оба замыкания должны удовлетворять типажу Send, она как бы говорит: «для данных, к которым замыкания могут получить доступ, должно быть безопасно переходить из одного потока в другой».

Параллельные итераторы

В начале поста я дал такой пример с параллельным итератором:

let total_price = stores.par_iter()
                        .map(|store| store.compute_price(&list))
                        .sum();

Но с тех пор я сконцентрировался исключительно на join. Как я говорил ранее, API для параллельных итераторов на самом деле довольно простая обёртка вокруг join. На данный момент она больше похожа на концент, чем на что-то иное. Но что в ней по настоящему изящно, так это то, что она не требует никакого небезопасного кода, связанного с параллелизмом. (Здесь небезопасный, или unsafe-код тот, который может привести к неопределённому поведению при работе с памятью — в Rust такой код возможет только в специальных блоках unsafe {}) — прим. перев.) То есть API параллельных итераторов просто строится на основе join, который скрывает весь небезопасный код. (Чтобы быть более точным, всё же есть совсем немного небезопасного кода, связанного с управлением неинициализированной памятью при конструировании вектора. Но этот код не имеет ничего общего с параллелизмом, похожий код можно найти в реализации Vec. Этот код также не корректный в некоторых граничных случаях, поскольку у меня не было времени написать его как следует.)
Я не хочу погружаться слишком сильно в детали реализации параллельного итератора, поскольку по моим планам он ещё поменяется. Но на высоком уровне идея в том, чтобы у нас был типаж ParallelIterator со следующими основными методами:

pub trait ParallelIterator {
    type Item;
    type Shared: Sync;
    type State: ParallelIteratorState<Shared=Self::Shared, Item=Self::Item> + Send;

    fn state(self) -> (Self::Shared, Self::State);

    ... // несколько неинтересных вспомогательных методов, вроде `map` и т. д.
}

Идея в том, чтобы метод state разделял итератор на какое-то общее состояние и состояние отдельных потоков. Общее состояние будет (потенциально) доступно всем рабочим потокам, так что оно должно отвечать типажу Sync (поддерживать совместный доступ из многих потоков одновременно). Состояние отдельных потоков будет отделено для каждого вызова к join, так что оно должно отвечать только типажу Send (можно безопасно переслать другому потоку).
Типаж ParallelIteratorState представляет некоторый кусок оставшейся работы (например под-срез для обработки). У него три метода:

pub trait ParallelIteratorState: Sized {
    type Item;
    type Shared: Sync;

    fn len(&mut self) -> ParallelLen;

    fn split_at(self, index: usize) -> (Self, Self);

    fn for_each<OP>(self, shared: &Self::Shared, op: OP)
        where OP: FnMut(Self::Item);
}

Метод len даёт представление о количестве оставшейся работы. Метод split_at разделяет это состояние на две части. Метод for_each выдаёт все значения из данного итератора. Так что, например, параллельный итератор для среза &[T] должен будет:

  • реализовать len, который просто вернёт длину среза,
  • реализовать split_at, который разделит срез на два под-среза,
  • и реализовать for_each, который пройдётся по массиву и вызовет для каждого элемента операцию op.

Имея оба этих типажа, мы можем реализовать параллельную обработку коллекции, следуя одному и тому же шаблону. Мы проверяем сколько осталось работы, если её слишком много, то делим её на две части. Иначе мы обрабатываем коллекцию последовательно (заметьте, это автоматически включает переход к последовательной обработке, который мы видели раньше):

fn process(shared, state) {
    if state.len() is too big {
        // разделение на параллельные потоки
        let midpoint = state.len() / 2;
        let (state1, state2) = state.split_at(midpoint);
        rayon::join(|| process(shared, state1),
                    || process(shared, state2));
    } else {
        // базовый случай последовательного выполнения
        state.for_each(|item| {
            // process item
        })
    }
}

Вот ссылки, по которым можно увидеть примеры для сбора элементов в вектор и сворачивания потока данных в одно значение.

Заключение и историческая справка

Я очень рад последней версии Rayon. Она жутко простая в использовании, очень выразительная, и я думаю, что у неё большой потенциал стать очень эффективной.
А ещё, очень отрадно видеть, каким элегантным стал параллелизм данных в Rust. Это результат долгой эволюции и многих итераций разработки. В своих ранних днях Rust, например, использовал строгий, похожий на Erlang подход, когда параллельные задачи общаются через каналы, без использования общей памяти. Такой подход хорош для высокоуровневых приложений, но не для написания параллельной сортировки quicksort. Однако, постепенно мы изменяли систему типов, так что мы всё больше и больше приближались к простой и быстрой версии параллельной quicksort.
Если посмотреть на мои ранние дизайны, то должно быть ясно, что текущая итерация Rayon намного более лучшая. Что мне особенно нравится, так это что она не только проста для пользователей, но и для разработчиков — то есть она не требует применять сумасшедшие трюки с системой типов Rust или использовать головоломные типажи, чтобы достичь безопасности. Я думаю, что это возможно благодаря двум основным решениям:

  • Представьте, что никогда не слышали про псевдонимы. Решение, которое утвердило, что на изменяемые ссылки &mut нельзя делать псевдонимы, и убрало константные const-ссылки (ссылки только для чтения, но не изменяемые) (это не значит, что в Rust нет не изменяемых ссылок, наоборот, все ссылки по-умолчнию не изменяемые, то есть надобности их отдельно поменять ключевым словом const нет — прим. перев.). Это означает, что авторы Rust стали писать код, по умолчанию свободный от гонок данных.
  • Улучщенный типаж Send, RFC458. Решение изменило типаж Send так, что он начал разрешать передавать заимствованные ссылки. До этого RFC за авторством Джошуа Янковского у нас было ограничение на Send-данные: они должны были быть 'static, то есть не могли содержать ссылки на стек. Это было наследием влияния Erlang, когда все потоки были независимыми и асинхронными, но никто из нас не замечал этого недостатка. Из-за этого ограничения мне приходилось жутко корёжить мои ранние проекты, чтобы найти альтернативные типажи для выражения идеи о потокобезопасных данных со ссылками на стек.
    Благодаря Джошуа, который додумался просто убрать ограничение на 'static с типажа Send, теперь всё стало намного проще!

Приложение: реализация перехода к последовательному выполнению без дубликации кода

Ранее я упоминал, что для лучшей производительности в примере с quicksort, нужно использовать переход на последовательный код, если массив достаточно мал. Было бы очень неприятно иметь две реализации quicksort для этих случаев. К счастью, можно использовать типажи Rust для генерации двух версий кода автоматически из одного и того же исходного кода. Это приложение объясняет трюк, который я использовал в данном примере.
Во-первых, определяется типаж Joiner, который абстрагируется от функции join:

trait Joiner {
    /// Истина для параллельного режима, ложь в противном случае.
    fn is_parallel() -> bool;

    /// Либо вызывает `rayon::join`, либо просто `oper_a(); oper_b();`.
    fn join<A,R_A,B,R_B>(oper_a: A, oper_b: B) -> (R_A, R_B)
        where A: FnOnce() -> R_A + Send, B: FnOnce() -> R_B + Send;
}

У этого типажа есть две реализации, соответствующие последовательному и параллельному режимам работы:

struct Parallel;
impl Joiner for Parallel { .. }

struct Sequential;
impl Joiner for Sequential { .. }

Теперь можно переписать quick_sort в виде полиморфной функции относительно типа J: Joiner, определяющего выбранную реализацию (последовательную или параллельную). Параллельная версия для малых массивов будет превращаться в последовательную:

fn quick_sort<J:Joiner, T:PartialOrd+Send>(v: &mut [T]) {
if v.len() > 1 {
    // Откат к использованию последовательного выполнения, если массивы короче 5K:
    if J::is_parallel() && v.len() <= 5*1024 {
    return quick_sort::<Sequential, T>(v);
    }

    let mid = partition(v);
    let (lo, hi) = v.split_at_mut(mid);
    J::join(|| quick_sort::<J,T>(lo),
            || quick_sort::<J,T>(hi));
}

Автор: kstep

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js