Кто такой Thread Pool и как его написать своими руками на С++

от автора

Для кого статья?

Статья для тех, что хочет разобраться в работе Thread Pool и написать наивную реализацию с использованием С++ 14 и С++ 17. Стоит упомянуть, что представленные реализации будут представлять решение учебной задачи и не подойдут для коммерческого использования.

Что нового я могу узнать из статьи?

  • Кто такой Thread Pool?

  • Зачем использовать Thread Pool?

  • Логика работы Thread Pool

  • Реализация (С++ 14)

  • Реализация (С++ 17)

  • Сравнение реализаций

Кто такой Thread Pool?

Это удобный в использовании паттерн, позволяющий выполнять множество задач используя ресурсы множества потоков. Thread Pool состоит обычно из очереди задач и нескольких потоков, которые достают задачи из очереди и выполняют их параллельно. Есть и реализации с отдельной очередью на каждый поток, но мы их рассматривать не будем.

Зачем использовать Thread Pool?

  • Помогает увеличить производительность программы благодаря созданию потоков один раз (создание потока считается достаточно тяжёлой операцией).

  • Предоставляет удобный интерфейс для работы с многопоточностью.

Немного цифр

Протестировав 3 случая: запуск без дополнительных потоков, с созданием потоков через std::thread и с использованием thread_pool. На программе:

void test_func(int& res, const std::vector<int>& arr) {     res = 0;     for (int i = arr.size() - 1; i >= 0; --i) {         for (int j = 0; j < arr.size(); ++j) {             res += arr[i] + arr[j];         }     } }

Получились следующие результаты:

Способ запуска

Время (миллисекунды)

Кол-во потоков

Без дополнительных потоков

83954

1

std::thred

62386

6

thread_pool

52474

6

У меня на компьютере можно создать максимум 8 потоков и запускался тестовый пример в Visual Studio на платформе Windows. Это значит, что фоновая работа сторонних приложений может создавать флуктуации и при каждом запуске мы будем получать разные времена. Код примеров можно посмотреть ТУТ функция run_test .

Почему 6 потоков не ускорили код в 6 раз?

Дело в том, что Thread Pool в моей реализации сделан не оптимально т.к. использует condition_variable, что сильно замедляет работу, так же присутствуют средства синхронизации (нужно помнить про закон Амдала). По мимо вышеперечисленных фактов тестирование проводилось на Windows, что не исключает разделение общих ресурсов ПК с другими приложениями.

Хорошо, но тогда как Thread Pool работает?

Thread Pool имеет очередь задач, из которой каждый поток достаёт новую задачу при условии, что очередь не пуста и поток свободен. Для более детального описания давайте рассмотрим работу Thread Pool на примере:

Начальная стадия: все потоки свободны, а в очереди присутствует 5 задач.

Стадия 1: Каждый из потоков взял задачу на исполнение, при этом на практике первый поток не обязательно берёт первую задачу, это зависит от того, кто первый получит доступ к общему ресурсу — очереди. В очереди остались только 4 и 5 задача (чёрным цветом обозначены задачи, которые остались в очереди).

Стадия 2: На 3 секунде второй поток завершает выполнение 2 задачи и берёт первую свободную задачу из очереди (4 задачу). В очереди остаётся только 5 задача.

Стадия 3: Третий поток завершил задачу 3 и взял последнюю задачу из очереди (5 задачу). Очередь стала пустой, но наша программа не должна завершать работу, сначала следует дождаться выполнения всех задач.

Стадия 4: Первый поток завершил выполнение своей задачи и не берёт новых задач (т. к. очередь пуста). Может показаться, что если у нас нет задач в очереди, то следующие задачи поток уже не выполняет. На самом деле это не так и как только придёт новая задача, свободный поток (но мы не знаем какой именно) сразу начнёт её исполнение.

Стадия 5: Третий поток закончил выполнять задачу 5.

Стадия 6: Второй поток закончил исполнение 4 задачи.

Итог: Thread Pool выполнил 5 задач за 11 секунд.

Что будет уметь делать наш Thread Pool?

Мы уже разобрались с общим механизмом работы Thread Pool, теперь подумаем о его функциональности.

В нашем Thread Pool будет 2 типа задач: блокирующие (будут тормозить основной поток до тех пор, пока команда не будет исполнена) и не блокирующие (гарантированно исполняются очень быстро и не тормозят основной поток).

Thread Pool будет иметь следующий интерфейс:

  • init(num_threads) — метод, создающий массив из num_threads потоков. В нашей реализации в качестве данного метода будет выступать конструктор.

  • add_task(task_func, args)не блокирующий метод добавления новой задачи. Принимает функцию task_func и аргументы данной функции args и возвращает task_id (уникальный номер задачи).

  • wait(task_id)блокирующий метод, ожидающий выполнения задачи с указанным task_id. В данной реализации мы не будем сохранять результат работы функции (мы исправим данный недостаток чуть позже), при этом функция обязательно должна возвращать void.

  • wait_all()блокирующий метод, дожидающийся завершения всех задач.

  • calculated(task_id)не блокирующий метод, проверяющий была ли выполнена задача с номером task_id.

  • shotdown()блокирующий метод, дожидающийся завершения всех задач и завершающий работу Thread Pool. Для корректного завершения работы программы будем использовать деструктор (хотя можно дополнительно добавить и метод).

Реализация базовой версии (C++ 14)

Рассмотрим переменные, которые будут использоваться в нашем классе:

// очередь задач - хранит функцию(задачу), которую нужно исполнить и номер задачи std::queue<std::pair<std::future<void>, int64_t>> q;   std::mutex q_mtx; std::condition_variable q_cv;  // помещаем в данный контейнер исполненные задачи std::unordered_set<int64_t> completed_task_ids;   std::condition_variable completed_task_ids_cv; std::mutex completed_task_ids_mtx;  std::vector<std::thread> threads;  // флаг завершения работы thread_pool std::atomic<bool> quite{ false };  // переменная хранящая id который будет выдан следующей задаче std::atomic<int64_t> last_idx = 0;

Очередь хранит std::future<void> — объект, который в будущем вернёт тип void, использование std::future позволяет не сразу вычислять функцию, а отложить вызов до нужного нам момента, также можно использовать и std::function<void()> (такой способ тоже допустим).

thread_pool(uint32_t num_threads) {     threads.reserve(num_threads);     for (uint32_t i = 0; i < num_threads; ++i) {         threads.emplace_back(&thread_pool::run, this);     } }

В конструкторе мы создаём указанное число потоков и каждый из потоков запускает единственный приватный метод run.

void run() {     while (!quite) {         std::unique_lock<std::mutex> lock(q_mtx);                  // если есть задачи, то берём задачу, иначе - засыпаем         // если мы зашли в деструктор, то quite будет true и мы не будем          // ждать завершения всех задач и выйдем из цикла         q_cv.wait(lock, [this]()->bool { return !q.empty() || quite; });          if (!q.empty()) {             auto elem = std::move(q.front());             q.pop();             lock.unlock();  // вычисляем объект типа std::future (вычисляем функцию)              elem.first.get();              std::lock_guard<std::mutex> lock(completed_task_ids_mtx);                          // добавляем номер выполненой задачи в список завершённых             completed_task_ids.insert(elem.second);              // делаем notify, чтобы разбудить потоки             completed_task_ids_cv.notify_all();         }     } }

condition_variable на методе wait (q_cv) захватывает мьютекс, проверяет условие, если условие верно, то мы идём дальше по коду, иначе — засыпаем, отпускаем мьютекс и ждём вызов notify из метода добавления задач (когда приходит notify процедура повторяется — захватываем мьютекс и проверяем условие). Таким образом мы берём задачи до тех пор, пока они не кончатся, а когда кончатся и придёт новая задача мы разбудим поток.

template <typename Func, typename ...Args> int64_t add_task(const Func& task_func, Args&&... args) {     // получаем значение индекса для новой задачи     int64_t task_idx = last_idx++;      std::lock_guard<std::mutex> q_lock(q_mtx);     q.emplace(std::async(std::launch::deferred, task_func, args...), task_idx);          // делаем notify_one, чтобы проснулся один спящий поток (если такой есть)     // в методе run     q_cv.notify_one();     return task_idx; }

std::async(std::launch::deferred, task_func, args...) данная функция не смотря на название async ничего не делает асинхронно благодаря параметру  std::launch::deferred. Мы просто запоминаем аргументы функции, как в случае с std::bind . Отличаем является лишь то, bind не требует заполнять все аргументы функции, в отличает от std::async.

void wait(int64_t task_id) {     std::unique_lock<std::mutex> lock(completed_task_ids_mtx);          // ожидаем вызова notify в функции run (сработает после завершения задачи)     completed_task_ids_cv.wait(lock, [this, task_id]()->bool {         return completed_task_ids.find(task_id) != completed_task_ids.end();      }); }  void wait_all() {     std::unique_lock<std::mutex> lock(q_mtx);          // ожидаем вызова notify в функции run (сработает после завершения задачи)     completed_task_ids_cv.wait(lock, [this]()->bool {         std::lock_guard<std::mutex> task_lock(completed_task_ids_mtx);         return q.empty() && last_idx == completed_task_ids.size();     }); }

Обратите внимание, что wait_all внутри wait использует ещё одну блокировку для очереди для проверки на пустоту (мы должны блокировать каждый разделяемый ресурс, чтобы избежать data race).

Так же обратите внимание, что std::lock_guard стоит там, где нет wait для мьютекса и не нужно делать unlock (std::unique_lock в остальных случаях). Если вы будите придерживаться данного правила, то программисты, смотрящие ваш код скажут вам спасибо.

bool calculated(int64_t task_id) {     std::lock_guard<std::mutex> lock(completed_task_ids_mtx);     if (completed_task_ids.find(task_id) != completed_task_ids.end()) {         return true;     }     return false; }

Неблокирующий метод проверки задачи на завершённость возвращает true если задача с данным task_id уже посчитана, иначе — false.

~thread_pool() {     // можно добавить wait_all() если нужно дождаться всех задачь перед удалением     quite = true;     for (uint32_t i = 0; i < threads.size(); ++i) {         q_cv.notify_all();         threads[i].join();     } }

Если экземпляр класса thread_pool удаляется, то мы дожидаемся завершения всех потоков в деструкторе. При этом, если в очереди есть ещё задачи, то каждый поток выполнит ещё одну задачу и завершит работу (это поведение можно поменять и, например, дожидаться выполнения всех задач перед завершением).

Полный код данной реализации можно посмотреть ТУТ.

Пример работы с Thread Pool

void sum(int& ans, std::vector<int>& arr) {     for (int i = 0; i < arr.size(); ++i) {         ans += arr[i];     } }  int main() {     thread_pool tp(3);     std::vector<int> s1 = { 1, 2, 3 };     int ans1 = 0;          std::vector<int> s2 = { 4, 5 };     int ans2 = 0;          std::vector<int> s3 = { 8, 9, 10 };     int ans3 = 0;      // добавляем в thread_pool выполняться 3 задачи     auto id1 = tp.add_task(sum, std::ref(ans1), std::ref(s1));     auto id2 = tp.add_task(sum, std::ref(ans2), std::ref(s2));     auto id3 = tp.add_task(sum, std::ref(ans3), std::ref(s3));      if (tp.calculated(id1)) {         // если результат уже посчитан, то просто выводим ответ         std::cout << ans1 << std::endl;     }     else {         // если результат ещё не готов, то ждём его         tp.wait(id1);         std::cout << ans1 << std::endl;     }     tp.wait_all();      std::cout << ans2 << std::endl;     std::cout << ans3 << std::endl;     return 0; }

Стоит обратить внимание на std::ref благодаря ему будет передана ссылка, а не копия объекта (это особенность передачи аргумента в std::future).

Тут приведён достаточно простой пример работы с Thread Pool. Давайте посмотрим на этот небольшой фрагмент кода и подумаем что можно улучшить.

Недостаток

Последствия

1

Функция обязательно должна быть void

Придётся менять сигнатуру функции, если она возвращала какое-то значение

2

Приходится хранить дополнительно переменную для ответа

Если нам понадобиться несколько значений из thread_pool, то придётся с собой таскать все эти переменные. А если нам нужно 100 значений и больше … ?

К сожалению, у меня не получилось решить эти проблемы средствами C++ 14, но зато C++ 17 позволил избавиться от приведённых выше недостатков.

Улучшаем Thread Pool с помощью C++ 17

Чтобы улучшить нашу версию нужно сначала понять в чём была основная проблема, а проблема была в том, чтобы узнать тип возвращаемого значения функции и при этом суметь положить результат вычислений (может быть разный возвращаемый тип) в 1 объект и тут на помощь приходит std::any.

Теперь мы можем хранить в нашей очереди std::function<std::any()> (запись std::future<std::any>не валидна) . Именно так я и сделал в своей первой попытке и получил очень красивый код, который не сильно отличался от изначальной реализации, но тут я столкнулся с проблемой, что std::any не может быть типа void . Тогда я решил создать класс Task, который бы смог хранить в одном случае std::function<std::any()> а в другом std::function<void()>. Рассмотрим его конструктор:

template <typename FuncRetType, typename ...Args, typename ...FuncTypes> Task(FuncRetType(*func)(FuncTypes...), Args&&... args) :     is_void{ std::is_void_v<FuncRetType> } {      if constexpr (std::is_void_v<FuncRetType>) {         void_func = std::bind(func, args...);         any_func = []()->int { return 0; };     }     else {         void_func = []()->void {};         any_func = std::bind(func, args...);     } }

Мы используем if constexpr для компиляции только одной ветки условия. Если мы будем использовать обычный if, то при получении функции возвращающей void компилятор попробует преобразовать void в std::any и таким образом мы получим ошибку преобразования типа, не смотря на то, что этот каст будет происходить в другой ветке условия.

Используем typename ...Args и typename ...FuncTypes, чтобы был возможен неявный каст между std::referense_wrapper и ссылочным типом, тогда нам в функциях не придётся в сигнатуре явно прописывать std::referense_wrapper.

any_func = []()->int { return 0; }; и void_func = []()->void {}; функции-заглушки. Они позволяют избавиться от лишнего условия при вычислении значения:

void operator() () {     void_func();     any_func_result = any_func(); }

has_result проверяет вернёт ли функция значение или нет, а get_result получит его.

bool has_result() {     return !is_void; }  std::any get_result() const {     assert(!is_void);     assert(any_func_result.has_value());     return any_func_result; }

Ещё один вспомогательный класс: TaskInfo:

enum class TaskStatus {     in_q,     completed };  struct TaskInfo {     TaskStatus status = TaskStatus::in_q;     std::any result; };

Данная структура хранит информацию о задаче: статус и возможный результат. Если структура будет возвращать void, то поле result останется незаполненным.

Рассмотрим приватные поля класса thread_pool

std::vector<std::thread> threads;  // очередь с парой задача, номер задачи std::queue<std::pair<Task, uint64_t>> q;  std::mutex q_mtx; std::condition_variable q_cv;  // Будем создавать ключ как только пришла новая задача и изменять её статус // при завершении std::unordered_map<uint64_t, TaskInfo> tasks_info;  std::condition_variable tasks_info_cv; std::mutex tasks_info_mtx;  std::condition_variable wait_all_cv;  std::atomic<bool> quite{ false }; std::atomic<uint64_t> last_idx{ 0 };  // переменная считающая кол-во выполненых задач std::atomic<uint64_t> cnt_completed_tasks{ 0 };

В отличие от прошлой реализации нам понадобиться переменная cnt_completed_tasks (в прошлой реализации у нас был отдельный контейнер для завершённых задач и кол-во завершённых задач мы получали по размеру этого контейнера), для подсчёта кол-ва завершённых задач. Эта переменная будет использоваться в функции wait_all для определения того, что все задачи завершились.

Так же отдельно рассмотрим 3 разных функции ожидания результата:

void wait(const uint64_t task_id) {     std::unique_lock<std::mutex> lock(tasks_info_mtx);     tasks_info_cv.wait(lock, [this, task_id]()->bool {         return task_id < last_idx&& tasks_info[task_id].status == TaskStatus::completed;     }); }  std::any wait_result(const uint64_t task_id) {     wait(task_id);     return tasks_info[task_id].result; }  template<class T> void wait_result(const uint64_t task_id, T& value) {     wait(task_id);     value = std::any_cast<T>(tasks_info[task_id].result); }
  • void wait(const uint64_t task_id) — используется для ожидании задачи, которая возвращает void.

  • std::any wait_result(const uint64_t task_id) и void wait_result(const uint64_t task_id, T& value) разными способами возвращают результат.

std::any wait_result(const uint64_t task_id) вернёт std::any и пользователь сам должен будет сделать cast к нужному типу. Шаблонная функция void wait_result(const uint64_t task_id, T& value) принимает вторым аргументом ссылку на переменную, куда и будет положено новое значение и явный cast пользователь не должен будет делать.

В остальном код очень похож на предыдущую версию и код новой версии вы можете найти ТУТ.

Использование Thread Pool (С++ 17)

int int_sum(int a, int b) {     return a + b; }  void void_sum(int& c, int a, int b) {     c = a + b; }  void void_without_argument() {     std::cout << "It's OK!" << std::endl; }  int main() {     thread_pool t(3);     int c;     t.add_task(int_sum, 2, 3);               // id = 0     t.add_task(void_sum, std::ref(c), 4, 6); // id = 1     t.add_task(void_without_argument);       // id = 2      {         // variant 1         int res;         t.wait_result(0, res);         std::cout << res << std::endl;          // variant 2         std::cout << std::any_cast<int>(t.wait_result(0)) << std::endl;     }      t.wait(1);     std::cout << c << std::endl;      t.wait_all(); // waiting for task with id 2      return 0; }

В данном примере рассмотрены 2 способа получения значения через функцию wait_result. Мне лично больше нравится 2 вариант. Не смотря на то, что нужно делать каст, получается компактное решение, а так же можно поймать и отработать исключение в случае ошибки.

У нас действительно получилась версия лучше предыдущей?

И да, и нет. После анализа я получил следующие результаты:

Тип передаваемого аргумента при создании новой задачи

thread_pool c++ 14

thread_pool c++ 17

функция возвращающая void

+

+

функция возвращающая всё кроме void

+

std::bind

+

функтор

+

Пример с функтором и std::bind:

class Test { public:     void operator() () {         std::cout << "Working with functors!\n";     } };  void sum(int a, int b) {     std::cout << a + b << std::endl; }  int main() {     Test test;     auto res = std::bind(sum, 2, 3);      thread_pool t(3); // C++ 14     t.add_task(test);     t.add_task(res);     t.wait_all();      return 0; }

А почему не получилось сделать лучше?

Изначально задумывалось реализовать Thread Pool, который сам сможет определять тип возвращаемого значения и исходя из этого типа формировать объект Task, но тип возвращаемого значения std::bind нельзя явно получить через std::invoke_result, поэтому пришлось пойти на некоторые уступки.

Итог

Мы получили 2 разные версии thred_pool. Сложно сказать какая из них лучше. Мне лично больше нравится версия с C++ 17. Она позволяет не таскать за собой много переменных как ссылки на результат, а хранит всё внутри себя. Да, эта версия уступает по функциональности, но использование функторов и std::bind не частая практика, поэтому именно это вариант я и считаю лучшим.


ссылка на оригинал статьи https://habr.com/ru/post/656515/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *