C++20: Пулы потоков в cppcoro

от автора

Этот пост является заключительным в моей мини-серии из трех постов о cppcoro. cppcoro — это библиотека абстракций корутин от Льюиса Бейкера (Lewis Baker). Сегодня я покажу вам пулы потоков (thread pools).

 

Чтобы получить максимум пользы от этого поста, вам следовало бы ознакомиться с двумя моими предыдущим постами про cppcoro.

В дополнение к функции cppcoro::sync_wait, которую можно использовать для ожидания указанный Awaitable завершается, cppcoro предлагает довольно интересную функцию cppcoro::when_all.

when_all

  • when_all: создает Awaitable, который ожидает все свои Input-Awaitable и возвращает совокупность их результатов.

Я упростил определение функции cpporo::when_all для этой статьи. Следующий пример должен помочь вам понять что к чему.

// cppcoroWhenAll.cpp  #include <chrono> #include <iostream> #include <thread>  #include <cppcoro/sync_wait.hpp> #include <cppcoro/task.hpp> #include <cppcoro/when_all.hpp>  using namespace std::chrono_literals;  cppcoro::task<std::string> getFirst() {     std::this_thread::sleep_for(1s);                       // (3)     co_return "First"; }  cppcoro::task<std::string> getSecond() {      std::this_thread::sleep_for(1s);                      // (3)     co_return "Second"; }  cppcoro::task<std::string> getThird() {      std::this_thread::sleep_for(1s);                     // (3)     co_return "Third"; }   cppcoro::task<> runAll() {                                                           // (2)     auto[fir, sec, thi] = co_await cppcoro::when_all(getFirst(), getSecond(), getThird());          std::cout << fir << " " << sec << " " << thi << std::endl;      }  int main() {          std::cout << std::endl;          auto start = std::chrono::steady_clock::now();          cppcoro::sync_wait(runAll());                          // (1)          std::cout << std::endl;          auto end = std::chrono::high_resolution_clock::now();     std::chrono::duration<double> elapsed = end - start;   // (4)     std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;          std::cout << std::endl;  }

Задача верхнего уровня cppcoro::sync_wait(runAll()) (строка 1) ожидает Awaitable runAll. runAll ожидает Awaitable getFirst, getSecond и getThird (строка 2). Awaitable runAll, getFirst, getSecond и getThird являются корутирами. Каждая из функций get засыпает на одну секунду (строка 3). Три раза по одной секунде составляет три секунды. Это и будет временем, в течение которого вызов cppcoro::sync_wait(runAll()) ожидает корутины. Строка 4 отображает это время.

Теперь, когда мы познакомились с функцией cppcoro::when_all, позвольте мне добавить к ней пулы потоков.

static_thread_pool

  • static_thead_pool: управление работой пула потоков фиксированного размера.

cppcoro::static_thread_pool можно вызывать с числовым параметром или без. Это число обозначает количество созданных потоков. Если вы не укажете число, то вместо него будет использована функция C++11 std::thread::hardware_concurrency(). std::thread::hardware_concurrency предоставляет вам информацию о количестве аппаратных потоков, поддерживаемых вашей системой. Это может быть количество процессоров или ядер, которые находятся в вашем распоряжении.

Позвольте мне продемонстрировать это на практике. Следующий пример я вляется модификацией предыдущего — он одновременно выполняет корутины getFirst, getSecond и getThird .

// cppcoroWhenAllOnThreadPool.cpp  #include <chrono> #include <iostream> #include <thread>  #include <cppcoro/sync_wait.hpp> #include <cppcoro/task.hpp> #include <cppcoro/static_thread_pool.hpp> #include <cppcoro/when_all.hpp>   using namespace std::chrono_literals;  cppcoro::task<std::string> getFirst() {     std::this_thread::sleep_for(1s);     co_return "First"; }  cppcoro::task<std::string> getSecond() {     std::this_thread::sleep_for(1s);     co_return "Second"; }  cppcoro::task<std::string> getThird() {     std::this_thread::sleep_for(1s);     co_return "Third"; }  template <typename Func> cppcoro::task<std::string> runOnThreadPool(cppcoro::static_thread_pool& tp, Func func) {     co_await tp.schedule();     auto res = co_await func();     co_return res; }  cppcoro::task<> runAll(cppcoro::static_thread_pool& tp) {          auto[fir, sec, thi] = co_await cppcoro::when_all(    // (3)         runOnThreadPool(tp, getFirst),         runOnThreadPool(tp, getSecond),          runOnThreadPool(tp, getThird));          std::cout << fir << " " << sec << " " << thi << std::endl;      }      int main() {          std::cout << std::endl;          auto start = std::chrono::steady_clock::now();      cppcoro::static_thread_pool tp;                         // (1)     cppcoro::sync_wait(runAll(tp));                         // (2)          std::cout << std::endl;          auto end = std::chrono::high_resolution_clock::now();     std::chrono::duration<double> elapsed = end - start;    // (4)     std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;          std::cout << std::endl;  }

Сейчас я объясню основные отличия от предыдущей программы cppcoroWhenAll.cpp. Я создал в строке (1) пул потоков tp и использовал его в качестве аргумента для функции runAll(tp) (строка 2). Функция runAll использует пул потоков для одновременного запуска корутин. Благодаря структурным привязкам (строка 3) значения каждой корутины можно легко агрегировать и присвоить переменной. В итоге функция main выполняется за одну, а не за три секунды, как раньше.

Возможно, вы знаете, что мы получаем с защелками и барьерами C++20. Защелки (latches) и барьеры (barrier) — это механизмы синхронизации потоков, которые позволяют некоторым потокам блокироваться до тех пор, пока счетчик не станет равным нулю. cppcoro также поддерживает защелки и барьеры.

async_latch

  • async_latch: позволяет заставить корутину асинхронно ожидать, пока счетчик не станет равным нулю

Следующая программа cppcoroLatch.cpp демонстрирует синхронизацию потоков с помощью cppcoro::async_latch.

// cppcoroLatch.cpp  #include <chrono> #include <iostream> #include <future>  #include <cppcoro/sync_wait.hpp> #include <cppcoro/async_latch.hpp> #include <cppcoro/task.hpp>  using namespace std::chrono_literals;   cppcoro::task<> waitFor(cppcoro::async_latch& latch) {     std::cout << "Before co_await" << std::endl;     co_await latch;                              // (3)     std::cout << "After co_await" << std::endl; }  int main() {          std::cout << std::endl;      cppcoro::async_latch latch(3);              // (1)                                                  // (2)     auto waiter = std::async([&latch]{ cppcoro::sync_wait(waitFor(latch)); });       auto counter1 = std::async([&latch] {       // (2)         std::this_thread::sleep_for(2s);         std::cout << "counter1: latch.count_down() " << std::endl;         latch.count_down();     });              auto counter2 = std::async([&latch] {      // (2)         std::this_thread::sleep_for(1s);         std::cout << "counter2: latch.count_down(2) " << std::endl;         latch.count_down(2);     });      waiter.get(), counter1.get(), counter2.get();          std::cout << std::endl;  }

В строке (1) я создаю cppcoro::asynch_latch и инициализирую счетчик равным 3. На этот раз я использую std::async (строка (2)) для одновременного запуска трех корутин. Каждый std::async получает latch для каждой ссылки. В строке (3) корутина waitFor ожидает, пока счетчик не станет равным нулю. Корутина counter1 засыпает на 2 секунды, а затем отсчитывает 1 событие. Напротив, counter2 засыпает на 1 секунду и отсчитает 2 события. На скриншоте показано это чередование потоков.

Что дальше?

На данный момент я успел написать о трех из большой четверки C++20: концептах, диапазонах и корутинах. В моем путешествии по большой четверке все еще отсутствуют модули. Они и станут темой моих следующих постов.

Кстати, если кто-то хочет написать пост о возможностях C++20, о которых я тоже планирую писать в будущем, свяжитесь со мной. Я с удовольствием опубликую ее и переведу на английский/немецкий язык, если это необходимо.

Программа наставничества

Моя новая программа наставничества «Fundamentals for C++ Professionals» стартует в апреле. Получить дополнительную информацию можно здесь.


Всех желающих приглашаем на открытый урок в OTUS «Умные указатели», на котором разберем, что такое умные указатели и зачем они нужны. Проведем обзор умных указателей входящих в stl, unique_ptr, Shared_ptr, weak_ptr.


ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/654159/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *