Готовимся к С++20. Coroutines TS на реальном примере

в 6:13, , рубрики: c++, c++ библиотеки, C++20, coroutine, coroutines, Блог компании Яндекс, высокая производительность, Программирование, С++

В C++20 вот-вот появится возможность работать с корутинами из коробки. Нам в Яндекс.Такси эта тема близка и интересна (под собственные нужды мы разрабатываем асинхронный фреймворк). Поэтому сегодня мы покажем читателям Хабра, как можно работать с C++ stackless корутинами на реальном примере.

В качестве примера возьмём что-то простое: без работы с асинхронными сетевыми интерфейсами, асинхронными таймерами, состоящее из одной функции. Например, попробуем осознать и переписать вот такую «лапшу» из колбеков:
Готовимся к С++20. Coroutines TS на реальном примере - 1

void FuncToDealWith() {
    InCurrentThread();

    writerQueue.PushTask([=]() {
        InWriterThread1();

        const auto finally = [=]() {
            InWriterThread2();
            ShutdownAll();
        };

        if (NeedNetwork()) {
            networkQueue.PushTask([=](){
                auto v = InNetworkThread();
                if (v) {
                    UIQueue.PushTask([=](){
                        InUIThread();
                        writerQueue.PushTask(finally);
                    });
                } else {
                    writerQueue.PushTask(finally);
                }
            });
        } else {
            finally();
        }
    });
}

Введение

Корутины или сопрограммы – это возможность остановить выполнение функции в заранее определённом месте; передать куда-либо всё состояние остановленной функции вместе с локальными переменными; запустить функцию с того же места, где мы её остановили.
Есть несколько разновидностей сопрограмм: stackless и stackful. Об этом поговорим позднее.

Постановка задачи

У нас есть несколько очередей задач. В каждую очередь помещаются определенные задачи: есть очередь для отрисовки графики, есть очередь для сетевых взаимодействий, есть очередь для работы с диском. Все очереди – это инстансы класса WorkQueue, у которых есть метод void PushTask(std::function<void()> task);

Функция FuncToDealWith() из примера выполняет какую-то логику в разных очередях и, в зависимости от результатов выполнения, ставит новую задачу в очередь.

Перепишем «лапшу» колбеков в виде линейного псевдокода, разметив в какой очереди нижележащий код должен выполняться:

void CoroToDealWith() {
    InCurrentThread();

    // => перейти в writerQueue
    InWriterThread1();
    if (NeedNetwork()) {
        // => перейти в networkQueue
        auto v = InNetworkThread();
        if (v) {
            // => перейти в UIQueue
            InUIThread();
        }
    }

    // => перейти в writerQueue
    InWriterThread2();
    ShutdownAll();
}

Приблизительно такого результата и хочется добиться.

При этом есть ограничения:

  • Интерфейсы очередей менять нельзя – ими пользуются в других частях приложения сторонние разработчики. Ломать код разработчиков или добавлять новые инстансы очередей нельзя.
  • Нельзя менять способ использования функции FuncToDealWith. Можно изменить только её имя, но нельзя делать так, чтобы она возвращала какие-то объекты, которые пользователь должен у себя хранить.
  • Полученный код должен быть таким же производительным, как первоначальный (или даже производительнее).

Решение

Переписываем функцию FuncToDealWith

В Coroutines TS настройка корутины производится заданием типа возвращаемого значения функции. Если тип удовлетворяет определённым требованиям, то внутри тела функции можно пользоваться новыми ключевыми словами co_await/co_return/co_yield. В данном примере, для переключения между очередями будем использовать co_yield:

CoroTask CoroToDealWith() {
    InCurrentThread();

    co_yield writerQueue;
    InWriterThread1();
    if (NeedNetwork()) {
        co_yield networkQueue;
        auto v = InNetworkThread();
        if (v) {
            co_yield UIQueue;
            InUIThread();
        }
    }

    co_yield writerQueue;
    InWriterThread2();
    ShutdownAll();
}

Получилось очень похоже на псевдокод из прошлой секции. Вся «магия» по работе с корутинами скрыта в классе CoroTask.

CoroTask

В простейшем (в нашем) случае содержимое класса «настройщика» сопрограммы состоит всего из одного алиаса:

#include <experimental/coroutine>

struct CoroTask {
    using promise_type = PromiseType;
};

promise_type — это тип данных, который мы должны сами написать. В нём содержится логика, описывающая:

  • что делать при выходе из корутины
  • что делать при первом заходе в корутину
  • кто освобождает русурсы
  • как поступать с исключениями вылетающими из корутины
  • как создавать объект CoroTask
  • что делать, если внутри корутины позвали co_yield

Алиас promise_type обязан называться именно так. Если вы измените имя алиаса на что-то другое, то компилятор будет ругаться и говорить, что вы неправильно написали CoroTask. Имя CoroTask же можно менять как вам вздумается.

А зачем вообще этот CoroTask, если всё описывается в promise_type?

В более сложных случаях можно создавать такие CoroTask, которые будут вам позволять общаться с остановленной корутиной, передавать и получать из неё данные, пробуждать и уничтожать её.

PromiseType

Приступаем к самому интересному. Описываем поведение корутин:

class WorkQueue; // forward declaration

class PromiseType {
public:
    // Когда выходим из корутины через `return;` или просто выходим из функции, то...
    void return_void() const { /* ... ничего не делаем :) */ }

    // Когда в самый первый раз заходим в функцию, возвращающую CoroTask, то...
    auto initial_suspend() const {
        // ... говорим что останавливать выполнение корутины не нужно.
        return std::experimental::suspend_never{};
    }

    // Когда в корутина завершается и вот-вот уничтожится, то...
    auto final_suspend() const {
        // ... говорим что останавливать выполнение корутины не нужно 
        // и компилятор сам должен уничтожить корутину.
        return std::experimental::suspend_never{};
    }

    // Когда из корутины вылетает исключение, то...
    void unhandled_exception() const {
        // ... прибиваем приложение (для простоты примера).
        std::terminate();
    }

    // Когда нужно создать CoroTask, для возврата из корутины, то...
    auto get_return_object() const {
        // ... создаём CoroTask.
        return CoroTask{};
    }

    // Когда в корутине вызвали co_yield, то...
    auto yield_value(WorkQueue& wq) const; // ... <смотрите описание ниже>
};

В коде выше можно заметить тип данных std::experimental::suspend_never. Это специальный тип данных, который говорит, что корутину останавливать не надо. Есть ещё его противоположность – тип std::experimental::suspend_always, который велит обязательно остановить корутину. Эти типы – так называемые Awaitables. Если вам интересно их внутреннее устройство, то не переживайте, мы скоро напишем свои Awaitables.

Самое нетривиальное место в приведённом выше коде – это final_suspend(). Функция обладает неожиданными эффектами. Так, если в этой функции мы не будем останавливать выполнение, то ресурсы, выделенные для корутины компилятором, подчистит за нас сам компилятор. А вот если в этой функции мы остановим выполнение корутины (например, вернув std::experimental::suspend_always{}), то освобождением ресурсов придётся заниматься вручную откуда-то извне: придётся где-то сохранять умный указатель на корутину и явно вызывать у него destroy(). К счастью, для нашего примера это не нужно.

НЕПРАВИЛЬНЫЙ PromiseType::yield_value

Кажется, что написать PromiseType::yield_value достаточно просто. У нас есть очередь; корутина, которую надо приостановить и в эту очередь поставить:

auto PromiseType::yield_value(WorkQueue& wq) {
    // Получаем умный невладеющий указатель на нашу корутину
    std::experimental::coroutine_handle<> this_coro
        = std::experimental::coroutine_handle<>::from_promise(*this);

    // Отправляем его в очередь. У this_coro определён operator(), так что для
    // wq наша корутина будет казаться обычной функцией. Когда настанет время,
    // из очереди будет извлечена корутина, вызван operator(), который
    // возобновит выполнение сопрограммы.
    wq.PushTask(this_coro);

    // Говорим что сопрограмму надо остановить.
    return std::experimental::suspend_always{};
}

И тут нас поджидает очень большая и сложно обнаруживаемая проблема. Дело в том, что мы сначала корутину ставим в очередь и только потом приостанавливаем. Может случиться так, что корутина будет извлечена из очереди и начнёт выполняться еще до того, как мы её приостановим в текущем потоке. Это приведёт к состоянию гонки, неопределённому поведению и абсолютно невменяемым рантайм ошибкам.

Корректный PromiseType::yield_value

Итак, нам надо сначала остановить корутину и только после этого добавлять её в очередь. Для этого мы напишем свой Awaitable и назовём его schedule_for_execution:

auto PromiseType::yield_value(WorkQueue& wq) {
    struct schedule_for_execution {
        WorkQueue& wq;

        constexpr bool await_ready() const noexcept { return false; }
        void await_suspend(std::experimental::coroutine_handle<> this_coro) const {
            wq.PushTask(this_coro);
        }
        constexpr void await_resume() const noexcept {}
    };

    return schedule_for_execution{wq};
}

Классы std::experimental::suspend_always, std::experimental::suspend_never, schedule_for_execution и прочие Awaitables должны содержать в себе 3 функции. await_ready вызывается для проверки, надо ли останавливать сопрогармму. await_suspend вызывается после остановки программы, в него передаётся handle остановленной корутины. await_resume вызывается, когда выполнение корутины возобновляется.

А что можно написать в треугольных скобрах std::experimental::coroutine_handle<>?

Можно указать там тип PromiseType, и пример будет работать абсолютно так же :)

std::experimental::coroutine_handle<> (он же std::experimental::coroutine_handle<void>) является базовым типом для всех std::experimental::coroutine_handle<ТипДанных>, где ТипДанных должен быть promise_type текущей корутины. Если вам не нужно обращаться к внутреннему содержимому ТипДанных, то можно писать std::experimental::coroutine_handle<>. Это может быть полезно в тех местах, где вам хочется абстрагироваться от конкретного типа promise_type и использовать type erasure.

Готово

Можно компилировать, запускать пример онлайн и всячески экспериментировать.

А а если мне не нравится co_yield, можно ли его заменить на что-то?

Можно заменить на co_await. Для этого в PromiseType надо добавить вот такую функцию:

auto await_transform(WorkQueue& wq) { return yield_value(wq); }

А а если мне и co_await не нравится?

Дело плохо. Ничего не изменить.

Шпаргалка

CoroTask – класс, настраивающий поведение корутины. В более сложных случаях позволяет общаться с остановленной корутиной и забирать какие-либо данные из неё.

CoroTask::promise_type описывает, как и когда корутине останавливаться, как освобождать ресурсы и как конструировать CoroTask.

Awaitables (std::experimental::suspend_always, std::experimental::suspend_never, schedule_for_execution и прочие) говорят компилятору, что делать с корутиной в конкретной точке (надо ли останавливать корутину, что делать с остановленной корутиной и что делать когда корутина пробуждается).

Оптимизации

В нашем PromiseType есть недостаток. Даже если мы в данный момент выполняемся в правильной очереди задач, вызов co_yield всё равно приостановит корутину и заново поместит её в эту же очередь задач. Куда оптимальнее было бы не останавливать выполнение корутины, а сразу продолжить выполнение.

Давайте мы исправим этот недостаток. Для этого добавим в PromiseType приватное поле:

WorkQueue* current_queue_ = nullptr;

В нём будем держать указатель на очередь, в которой мы выполняемся в данный момент.

Дальше подправим PromiseType::yield_value:

auto PromiseType::yield_value(WorkQueue& wq) {
    struct schedule_for_execution {
        const bool do_resume;
        WorkQueue& wq;

        constexpr bool await_ready() const noexcept { return do_resume; }
        void await_suspend(std::experimental::coroutine_handle<> this_coro) const {
            wq.PushTask(this_coro);
        }
        constexpr void await_resume() const noexcept {}
    };

    const bool do_not_suspend = (current_queue_ == &wq);
    current_queue_ = &wq;
    return schedule_for_execution{do_not_suspend, wq};
}

Здесь мы подправили schedule_for_execution::await_ready(). Теперь эта функция сообщает компилятору, что корутину не надо приостанавливать, если текущая очередь задач совпадает с той, на которой мы пытаемся запуститься.

Готово. Можно всячески экспериментировать.

Про производительность

В первоначальном примере при каждом вызове WorkQueue::PushTask(std::function<void()> f) у нас создавался экземпляр класса std::function<void()> от лямбды. В реальном коде эти лямбды зачастую достаточно большие по размеру, из-за чего std::function<void()> вынужден динамически аллоцировать память для хранения лямбды.

В примере с корутинами мы создаём экземпляры std::function<void()> из std::experimental::coroutine_handle<>. Размер std::experimental::coroutine_handle<> зависит от имплементации, но большинство имплементаций стараются держать его размер минимальным. Так на clang размер его равен sizeof(void*). При конструировании std::function<void()> от небольших объектов динамической аллокации не происходит.
Итого – с корутинами мы избавились от нескольких лишних динамических аллокаций.

Но! Компилятор зачастую не может просто сохранить всю корутину на стеке. Из-за этого возможна одна дополнительная динамическая аллокация при заходе в CoroToDealWith.

Stackless vs Stackful

Мы только что поработали со Stackless корутинами, для работы с которыми требуется поддержка от компилятора. Есть ещё Stackful корутины, которые можно реализовать целиком на уровне библиотеки.

Первые позволяют более экономно аллоцировать память, потенциально они лучше оптимизируются компилятором. Вторые проще внедрять в имеющиеся проекты, так как они требуют меньше модификаций кода. Однако в данном примере разницу не почувствовать, нужны примеры сложнее.

Итоги

Мы рассмотрели базовый пример и получили универсальный класс CoroTask, который можно использовать для создания и других сопрограмм.

Код с ним становится более читабельным и чуть более производительным, чем при наивном подходе:

Было С корутинами
void FuncToDealWith() {
  InCurrentThread();

  writerQueue.PushTask([=]() {
      InWriterThread1();

      const auto fin = [=]() {
          InWriterThread2();
          ShutdownAll();
      };

      if (NeedNetwork()) {
          networkQueue.PushTask([=](){
              auto v = InNetThread();
              if (v) {
                  UIQueue.PushTask([=](){
                      InUIThread();
                      writerQueue.PushTask(fin);
                  });
              } else {
                  writerQueue.PushTask(fin);
              }
          });
      } else {
          fin();
      }
  });
}
CoroTask CoroToDealWith() {
  InCurrentThread();

  co_yield writerQueue;
  InWriterThread1();
  if (NeedNetwork()) {
      co_yield networkQueue;
      auto v = InNetThread();
      if (v) {
          co_yield UIQueue;
          InUIThread();
      }
  }

  co_yield writerQueue;
  InWriterThread2();
  ShutdownAll();
}

За бортом остались моменты:

  • как вызывать из корутины другую корутину и ждать её завершения
  • что полезного можно напихать в CoroTask
  • пример, на котором чувствуется разница между Stackless и Stackful

Прочее

Если вы хотите узнать про другие новинки языка С++ или пообщаться лично с соратниками по плюсам, то загляните на конференцию C++Russia. Ближайшая состоится 6 октября в Нижнем Новгороде.

Если у вас есть боль, связанная с C++, и вы хотите что-то улучшить в языке или просто желаете обсудить возможные нововведения, то добро пожаловать на https://stdcpp.ru/.

Ну а если вас удивляет, что в Яндекс.Такси есть огромное количество задач, не связанных с графами, то надеюсь, что это оказалось для вас приятным сюрпризом :) Приходите к нам в гости 11 октября, поговорим о C++ и не только.

Автор: antoshkka

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js