
Этот пост является заключительным в моей мини-серии из трех постов о cppcoro. cppcoro — это библиотека абстракций корутин от Льюиса Бейкера (Lewis Baker). Сегодня я покажу вам пулы потоков (thread pools).

Чтобы получить максимум пользы от этого поста, вам следовало бы ознакомиться с двумя моими предыдущим постами про cppcoro.
-
C++20: Корутины с cppcoro: введение в cppcoro, простейшие задачи и генератор корутин.
-
C++20: Мощные корутины с cppcoro: более мощные корутины с потоками.
В дополнение к функции cppcoro::sync_wait, которую можно использовать для ожидания указанный Awaitable завершается, cppcoro предлагает довольно интересную функцию cppcoro::when_all.
when_all
-
when_all: создаетAwaitable, который ожидает все своиInput-Awaitableи возвращает совокупность их результатов.
Я упростил определение функции cpporo::when_all для этой статьи. Следующий пример должен помочь вам понять что к чему.
// cppcoroWhenAll.cpp #include <chrono> #include <iostream> #include <thread> #include <cppcoro/sync_wait.hpp> #include <cppcoro/task.hpp> #include <cppcoro/when_all.hpp> using namespace std::chrono_literals; cppcoro::task<std::string> getFirst() { std::this_thread::sleep_for(1s); // (3) co_return "First"; } cppcoro::task<std::string> getSecond() { std::this_thread::sleep_for(1s); // (3) co_return "Second"; } cppcoro::task<std::string> getThird() { std::this_thread::sleep_for(1s); // (3) co_return "Third"; } cppcoro::task<> runAll() { // (2) auto[fir, sec, thi] = co_await cppcoro::when_all(getFirst(), getSecond(), getThird()); std::cout << fir << " " << sec << " " << thi << std::endl; } int main() { std::cout << std::endl; auto start = std::chrono::steady_clock::now(); cppcoro::sync_wait(runAll()); // (1) std::cout << std::endl; auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration<double> elapsed = end - start; // (4) std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl; std::cout << std::endl; }
Задача верхнего уровня cppcoro::sync_wait(runAll()) (строка 1) ожидает Awaitable runAll. runAll ожидает Awaitable getFirst, getSecond и getThird (строка 2). Awaitable runAll, getFirst, getSecond и getThird являются корутирами. Каждая из функций get засыпает на одну секунду (строка 3). Три раза по одной секунде составляет три секунды. Это и будет временем, в течение которого вызов cppcoro::sync_wait(runAll()) ожидает корутины. Строка 4 отображает это время.

Теперь, когда мы познакомились с функцией cppcoro::when_all, позвольте мне добавить к ней пулы потоков.
static_thread_pool
-
static_thead_pool: управление работой пула потоков фиксированного размера.
cppcoro::static_thread_pool можно вызывать с числовым параметром или без. Это число обозначает количество созданных потоков. Если вы не укажете число, то вместо него будет использована функция C++11 std::thread::hardware_concurrency(). std::thread::hardware_concurrency предоставляет вам информацию о количестве аппаратных потоков, поддерживаемых вашей системой. Это может быть количество процессоров или ядер, которые находятся в вашем распоряжении.
Позвольте мне продемонстрировать это на практике. Следующий пример я вляется модификацией предыдущего — он одновременно выполняет корутины getFirst, getSecond и getThird .
// cppcoroWhenAllOnThreadPool.cpp #include <chrono> #include <iostream> #include <thread> #include <cppcoro/sync_wait.hpp> #include <cppcoro/task.hpp> #include <cppcoro/static_thread_pool.hpp> #include <cppcoro/when_all.hpp> using namespace std::chrono_literals; cppcoro::task<std::string> getFirst() { std::this_thread::sleep_for(1s); co_return "First"; } cppcoro::task<std::string> getSecond() { std::this_thread::sleep_for(1s); co_return "Second"; } cppcoro::task<std::string> getThird() { std::this_thread::sleep_for(1s); co_return "Third"; } template <typename Func> cppcoro::task<std::string> runOnThreadPool(cppcoro::static_thread_pool& tp, Func func) { co_await tp.schedule(); auto res = co_await func(); co_return res; } cppcoro::task<> runAll(cppcoro::static_thread_pool& tp) { auto[fir, sec, thi] = co_await cppcoro::when_all( // (3) runOnThreadPool(tp, getFirst), runOnThreadPool(tp, getSecond), runOnThreadPool(tp, getThird)); std::cout << fir << " " << sec << " " << thi << std::endl; } int main() { std::cout << std::endl; auto start = std::chrono::steady_clock::now(); cppcoro::static_thread_pool tp; // (1) cppcoro::sync_wait(runAll(tp)); // (2) std::cout << std::endl; auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration<double> elapsed = end - start; // (4) std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl; std::cout << std::endl; }
Сейчас я объясню основные отличия от предыдущей программы cppcoroWhenAll.cpp. Я создал в строке (1) пул потоков tp и использовал его в качестве аргумента для функции runAll(tp) (строка 2). Функция runAll использует пул потоков для одновременного запуска корутин. Благодаря структурным привязкам (строка 3) значения каждой корутины можно легко агрегировать и присвоить переменной. В итоге функция main выполняется за одну, а не за три секунды, как раньше.

Возможно, вы знаете, что мы получаем с защелками и барьерами C++20. Защелки (latches) и барьеры (barrier) — это механизмы синхронизации потоков, которые позволяют некоторым потокам блокироваться до тех пор, пока счетчик не станет равным нулю. cppcoro также поддерживает защелки и барьеры.
async_latch
-
async_latch: позволяет заставить корутину асинхронно ожидать, пока счетчик не станет равным нулю
Следующая программа cppcoroLatch.cpp демонстрирует синхронизацию потоков с помощью cppcoro::async_latch.
// cppcoroLatch.cpp #include <chrono> #include <iostream> #include <future> #include <cppcoro/sync_wait.hpp> #include <cppcoro/async_latch.hpp> #include <cppcoro/task.hpp> using namespace std::chrono_literals; cppcoro::task<> waitFor(cppcoro::async_latch& latch) { std::cout << "Before co_await" << std::endl; co_await latch; // (3) std::cout << "After co_await" << std::endl; } int main() { std::cout << std::endl; cppcoro::async_latch latch(3); // (1) // (2) auto waiter = std::async([&latch]{ cppcoro::sync_wait(waitFor(latch)); }); auto counter1 = std::async([&latch] { // (2) std::this_thread::sleep_for(2s); std::cout << "counter1: latch.count_down() " << std::endl; latch.count_down(); }); auto counter2 = std::async([&latch] { // (2) std::this_thread::sleep_for(1s); std::cout << "counter2: latch.count_down(2) " << std::endl; latch.count_down(2); }); waiter.get(), counter1.get(), counter2.get(); std::cout << std::endl; }
В строке (1) я создаю cppcoro::asynch_latch и инициализирую счетчик равным 3. На этот раз я использую std::async (строка (2)) для одновременного запуска трех корутин. Каждый std::async получает latch для каждой ссылки. В строке (3) корутина waitFor ожидает, пока счетчик не станет равным нулю. Корутина counter1 засыпает на 2 секунды, а затем отсчитывает 1 событие. Напротив, counter2 засыпает на 1 секунду и отсчитает 2 события. На скриншоте показано это чередование потоков.

Что дальше?
На данный момент я успел написать о трех из большой четверки C++20: концептах, диапазонах и корутинах. В моем путешествии по большой четверке все еще отсутствуют модули. Они и станут темой моих следующих постов.
Кстати, если кто-то хочет написать пост о возможностях C++20, о которых я тоже планирую писать в будущем, свяжитесь со мной. Я с удовольствием опубликую ее и переведу на английский/немецкий язык, если это необходимо.
Программа наставничества
Моя новая программа наставничества «Fundamentals for C++ Professionals» стартует в апреле. Получить дополнительную информацию можно здесь.
Всех желающих приглашаем на открытый урок в OTUS «Умные указатели», на котором разберем, что такое умные указатели и зачем они нужны. Проведем обзор умных указателей входящих в stl, unique_ptr, Shared_ptr, weak_ptr.
ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/654159/
Добавить комментарий