Готовимся к С++20. Coroutines TS на реальном примере

от автора

В C++20 вот-вот появится возможность работать с корутинами из коробки. Нам в Яндекс.Такси эта тема близка и интересна (под собственные нужды мы разрабатываем асинхронный фреймворк). Поэтому сегодня мы покажем читателям Хабра, как можно работать с C++ stackless корутинами на реальном примере.

В качестве примера возьмём что-то простое: без работы с асинхронными сетевыми интерфейсами, асинхронными таймерами, состоящее из одной функции. Например, попробуем осознать и переписать вот такую «лапшу» из колбеков:

void FuncToDealWith() {     InCurrentThread();      writerQueue.PushTask([=]() {         InWriterThread1();          const auto finally = [=]() {             InWriterThread2();             ShutdownAll();         };          if (NeedNetwork()) {             networkQueue.PushTask([=](){                 auto v = InNetworkThread();                 if (v) {                     UIQueue.PushTask([=](){                         InUIThread();                         writerQueue.PushTask(finally);                     });                 } else {                     writerQueue.PushTask(finally);                 }             });         } else {             finally();         }     }); } 

Введение

Корутины или сопрограммы – это возможность остановить выполнение функции в заранее определённом месте; передать куда-либо всё состояние остановленной функции вместе с локальными переменными; запустить функцию с того же места, где мы её остановили.
Есть несколько разновидностей сопрограмм: stackless и stackful. Об этом поговорим позднее.

Постановка задачи

У нас есть несколько очередей задач. В каждую очередь помещаются определенные задачи: есть очередь для отрисовки графики, есть очередь для сетевых взаимодействий, есть очередь для работы с диском. Все очереди – это инстансы класса WorkQueue, у которых есть метод void PushTask(std::function<void()> task);

Функция FuncToDealWith() из примера выполняет какую-то логику в разных очередях и, в зависимости от результатов выполнения, ставит новую задачу в очередь.

Перепишем «лапшу» колбеков в виде линейного псевдокода, разметив в какой очереди нижележащий код должен выполняться:

void CoroToDealWith() {     InCurrentThread();      // => перейти в writerQueue     InWriterThread1();     if (NeedNetwork()) {         // => перейти в networkQueue         auto v = InNetworkThread();         if (v) {             // => перейти в UIQueue             InUIThread();         }     }      // => перейти в writerQueue     InWriterThread2();     ShutdownAll(); }

Приблизительно такого результата и хочется добиться.

При этом есть ограничения:

  • Интерфейсы очередей менять нельзя – ими пользуются в других частях приложения сторонние разработчики. Ломать код разработчиков или добавлять новые инстансы очередей нельзя.
  • Нельзя менять способ использования функции FuncToDealWith. Можно изменить только её имя, но нельзя делать так, чтобы она возвращала какие-то объекты, которые пользователь должен у себя хранить.
  • Полученный код должен быть таким же производительным, как первоначальный (или даже производительнее).

Решение

Переписываем функцию FuncToDealWith

В Coroutines TS настройка корутины производится заданием типа возвращаемого значения функции. Если тип удовлетворяет определённым требованиям, то внутри тела функции можно пользоваться новыми ключевыми словами co_await/co_return/co_yield. В данном примере, для переключения между очередями будем использовать co_yield:

CoroTask CoroToDealWith() {     InCurrentThread();      co_yield writerQueue;     InWriterThread1();     if (NeedNetwork()) {         co_yield networkQueue;         auto v = InNetworkThread();         if (v) {             co_yield UIQueue;             InUIThread();         }     }      co_yield writerQueue;     InWriterThread2();     ShutdownAll(); }

Получилось очень похоже на псевдокод из прошлой секции. Вся «магия» по работе с корутинами скрыта в классе CoroTask.

CoroTask

В простейшем (в нашем) случае содержимое класса «настройщика» сопрограммы состоит всего из одного алиаса:

#include <experimental/coroutine>  struct CoroTask {     using promise_type = PromiseType; };

promise_type — это тип данных, который мы должны сами написать. В нём содержится логика, описывающая:

  • что делать при выходе из корутины
  • что делать при первом заходе в корутину
  • кто освобождает русурсы
  • как поступать с исключениями вылетающими из корутины
  • как создавать объект CoroTask
  • что делать, если внутри корутины позвали co_yield

Алиас promise_type обязан называться именно так. Если вы измените имя алиаса на что-то другое, то компилятор будет ругаться и говорить, что вы неправильно написали CoroTask. Имя CoroTask же можно менять как вам вздумается.

А зачем вообще этот CoroTask, если всё описывается в promise_type?

В более сложных случаях можно создавать такие CoroTask, которые будут вам позволять общаться с остановленной корутиной, передавать и получать из неё данные, пробуждать и уничтожать её.

PromiseType

Приступаем к самому интересному. Описываем поведение корутин:

class WorkQueue; // forward declaration  class PromiseType { public:     // Когда выходим из корутины через `return;` или просто выходим из функции, то...     void return_void() const { /* ... ничего не делаем :) */ }      // Когда в самый первый раз заходим в функцию, возвращающую CoroTask, то...     auto initial_suspend() const {         // ... говорим что останавливать выполнение корутины не нужно.         return std::experimental::suspend_never{};     }      // Когда в корутина завершается и вот-вот уничтожится, то...     auto final_suspend() const {         // ... говорим что останавливать выполнение корутины не нужно          // и компилятор сам должен уничтожить корутину.         return std::experimental::suspend_never{};     }      // Когда из корутины вылетает исключение, то...     void unhandled_exception() const {         // ... прибиваем приложение (для простоты примера).         std::terminate();     }      // Когда нужно создать CoroTask, для возврата из корутины, то...     auto get_return_object() const {         // ... создаём CoroTask.         return CoroTask{};     }      // Когда в корутине вызвали co_yield, то...     auto yield_value(WorkQueue& wq) const; // ... <смотрите описание ниже> };

В коде выше можно заметить тип данных std::experimental::suspend_never. Это специальный тип данных, который говорит, что корутину останавливать не надо. Есть ещё его противоположность – тип std::experimental::suspend_always, который велит обязательно остановить корутину. Эти типы – так называемые Awaitables. Если вам интересно их внутреннее устройство, то не переживайте, мы скоро напишем свои Awaitables.

Самое нетривиальное место в приведённом выше коде – это final_suspend(). Функция обладает неожиданными эффектами. Так, если в этой функции мы не будем останавливать выполнение, то ресурсы, выделенные для корутины компилятором, подчистит за нас сам компилятор. А вот если в этой функции мы остановим выполнение корутины (например, вернув std::experimental::suspend_always{}), то освобождением ресурсов придётся заниматься вручную откуда-то извне: придётся где-то сохранять умный указатель на корутину и явно вызывать у него destroy(). К счастью, для нашего примера это не нужно.

НЕПРАВИЛЬНЫЙ PromiseType::yield_value

Кажется, что написать PromiseType::yield_value достаточно просто. У нас есть очередь; корутина, которую надо приостановить и в эту очередь поставить:

auto PromiseType::yield_value(WorkQueue& wq) {     // Получаем умный невладеющий указатель на нашу корутину     std::experimental::coroutine_handle<> this_coro         = std::experimental::coroutine_handle<>::from_promise(*this);      // Отправляем его в очередь. У this_coro определён operator(), так что для     // wq наша корутина будет казаться обычной функцией. Когда настанет время,     // из очереди будет извлечена корутина, вызван operator(), который     // возобновит выполнение сопрограммы.     wq.PushTask(this_coro);      // Говорим что сопрограмму надо остановить.     return std::experimental::suspend_always{}; }

И тут нас поджидает очень большая и сложно обнаруживаемая проблема. Дело в том, что мы сначала корутину ставим в очередь и только потом приостанавливаем. Может случиться так, что корутина будет извлечена из очереди и начнёт выполняться еще до того, как мы её приостановим в текущем потоке. Это приведёт к состоянию гонки, неопределённому поведению и абсолютно невменяемым рантайм ошибкам.

Корректный PromiseType::yield_value

Итак, нам надо сначала остановить корутину и только после этого добавлять её в очередь. Для этого мы напишем свой Awaitable и назовём его schedule_for_execution:

auto PromiseType::yield_value(WorkQueue& wq) {     struct schedule_for_execution {         WorkQueue& wq;          constexpr bool await_ready() const noexcept { return false; }         void await_suspend(std::experimental::coroutine_handle<> this_coro) const {             wq.PushTask(this_coro);         }         constexpr void await_resume() const noexcept {}     };      return schedule_for_execution{wq}; }

Классы std::experimental::suspend_always, std::experimental::suspend_never, schedule_for_execution и прочие Awaitables должны содержать в себе 3 функции. await_ready вызывается для проверки, надо ли останавливать сопрогармму. await_suspend вызывается после остановки программы, в него передаётся handle остановленной корутины. await_resume вызывается, когда выполнение корутины возобновляется.

А что можно написать в треугольных скобрах std::experimental::coroutine_handle<>?

Можно указать там тип PromiseType, и пример будет работать абсолютно так же 🙂

std::experimental::coroutine_handle<> (он же std::experimental::coroutine_handle<void>) является базовым типом для всех std::experimental::coroutine_handle<ТипДанных>, где ТипДанных должен быть promise_type текущей корутины. Если вам не нужно обращаться к внутреннему содержимому ТипДанных, то можно писать std::experimental::coroutine_handle<>. Это может быть полезно в тех местах, где вам хочется абстрагироваться от конкретного типа promise_type и использовать type erasure.

Готово

Можно компилировать, запускать пример онлайн и всячески экспериментировать.

А а если мне не нравится co_yield, можно ли его заменить на что-то?

Можно заменить на co_await. Для этого в PromiseType надо добавить вот такую функцию:

auto await_transform(WorkQueue& wq) { return yield_value(wq); } 

А а если мне и co_await не нравится?

Дело плохо. Ничего не изменить.

Шпаргалка

CoroTask – класс, настраивающий поведение корутины. В более сложных случаях позволяет общаться с остановленной корутиной и забирать какие-либо данные из неё.

CoroTask::promise_type описывает, как и когда корутине останавливаться, как освобождать ресурсы и как конструировать CoroTask.

Awaitables (std::experimental::suspend_always, std::experimental::suspend_never, schedule_for_execution и прочие) говорят компилятору, что делать с корутиной в конкретной точке (надо ли останавливать корутину, что делать с остановленной корутиной и что делать когда корутина пробуждается).

Оптимизации

В нашем PromiseType есть недостаток. Даже если мы в данный момент выполняемся в правильной очереди задач, вызов co_yield всё равно приостановит корутину и заново поместит её в эту же очередь задач. Куда оптимальнее было бы не останавливать выполнение корутины, а сразу продолжить выполнение.

Давайте мы исправим этот недостаток. Для этого добавим в PromiseType приватное поле:

WorkQueue* current_queue_ = nullptr;

В нём будем держать указатель на очередь, в которой мы выполняемся в данный момент.

Дальше подправим PromiseType::yield_value:

auto PromiseType::yield_value(WorkQueue& wq) {     struct schedule_for_execution {         const bool do_resume;         WorkQueue& wq;          constexpr bool await_ready() const noexcept { return do_resume; }         void await_suspend(std::experimental::coroutine_handle<> this_coro) const {             wq.PushTask(this_coro);         }         constexpr void await_resume() const noexcept {}     };      const bool do_not_suspend = (current_queue_ == &wq);     current_queue_ = &wq;     return schedule_for_execution{do_not_suspend, wq}; }

Здесь мы подправили schedule_for_execution::await_ready(). Теперь эта функция сообщает компилятору, что корутину не надо приостанавливать, если текущая очередь задач совпадает с той, на которой мы пытаемся запуститься.

Готово. Можно всячески экспериментировать.

Про производительность

В первоначальном примере при каждом вызове WorkQueue::PushTask(std::function<void()> f) у нас создавался экземпляр класса std::function<void()> от лямбды. В реальном коде эти лямбды зачастую достаточно большие по размеру, из-за чего std::function<void()> вынужден динамически аллоцировать память для хранения лямбды.

В примере с корутинами мы создаём экземпляры std::function<void()> из std::experimental::coroutine_handle<>. Размер std::experimental::coroutine_handle<> зависит от имплементации, но большинство имплементаций стараются держать его размер минимальным. Так на clang размер его равен sizeof(void*). При конструировании std::function<void()> от небольших объектов динамической аллокации не происходит.
Итого – с корутинами мы избавились от нескольких лишних динамических аллокаций.

Но! Компилятор зачастую не может просто сохранить всю корутину на стеке. Из-за этого возможна одна дополнительная динамическая аллокация при заходе в CoroToDealWith.

Stackless vs Stackful

Мы только что поработали со Stackless корутинами, для работы с которыми требуется поддержка от компилятора. Есть ещё Stackful корутины, которые можно реализовать целиком на уровне библиотеки.

Первые позволяют более экономно аллоцировать память, потенциально они лучше оптимизируются компилятором. Вторые проще внедрять в имеющиеся проекты, так как они требуют меньше модификаций кода. Однако в данном примере разницу не почувствовать, нужны примеры сложнее.

Итоги

Мы рассмотрели базовый пример и получили универсальный класс CoroTask, который можно использовать для создания и других сопрограмм.

Код с ним становится более читабельным и чуть более производительным, чем при наивном подходе:

Было С корутинами
void FuncToDealWith() {   InCurrentThread();    writerQueue.PushTask([=]() {       InWriterThread1();        const auto fin = [=]() {           InWriterThread2();           ShutdownAll();       };        if (NeedNetwork()) {           networkQueue.PushTask([=](){               auto v = InNetThread();               if (v) {                   UIQueue.PushTask([=](){                       InUIThread();                       writerQueue.PushTask(fin);                   });               } else {                   writerQueue.PushTask(fin);               }           });       } else {           fin();       }   }); } 
CoroTask CoroToDealWith() {   InCurrentThread();    co_yield writerQueue;   InWriterThread1();   if (NeedNetwork()) {       co_yield networkQueue;       auto v = InNetThread();       if (v) {           co_yield UIQueue;           InUIThread();       }   }    co_yield writerQueue;   InWriterThread2();   ShutdownAll(); }

За бортом остались моменты:

  • как вызывать из корутины другую корутину и ждать её завершения
  • что полезного можно напихать в CoroTask
  • пример, на котором чувствуется разница между Stackless и Stackful

Прочее

Если вы хотите узнать про другие новинки языка С++ или пообщаться лично с соратниками по плюсам, то загляните на конференцию C++Russia. Ближайшая состоится 6 октября в Нижнем Новгороде.

Если у вас есть боль, связанная с C++, и вы хотите что-то улучшить в языке или просто желаете обсудить возможные нововведения, то добро пожаловать на https://stdcpp.ru/.

Ну а если вас удивляет, что в Яндекс.Такси есть огромное количество задач, не связанных с графами, то надеюсь, что это оказалось для вас приятным сюрпризом 🙂 Приходите к нам в гости 11 октября, поговорим о C++ и не только.


ссылка на оригинал статьи https://habr.com/post/420861/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *