Multithreading

от автора

В преддверии старта курса «C++ Developer. Professional« делимся с вами полезной статьей, автором которой является Анатолий Махаев — преподаватель данного курса.


Введение

Разработчики часто сталкиваются с необходимостью разработки многопоточных приложений, поэтому вопросы многопоточности требуют детального изучения. Давайте познакомимся с основными терминами, используемыми в источниках информации о многопоточности, рассмотрим задачи и проблемы многопоточности и изучим средства стандартной библиотеки C++, которые помогут создавать многопоточные приложения.

Основные определения

Многозадачность и многопоточность

Многозадачность (multitasking) — свойство операционной системы или среды выполнения обеспечивать возможность параллельной (или псевдопараллельной) обработки нескольких задач.

Многопоточность (multithreading) — свойство платформы (например, операционной системы, виртуальной машины и т. д.) или приложения, состоящее в том, что процесс, порождённый в операционной системе, может состоять из нескольких потоков, выполняющихся «параллельно», то есть без предписанного порядка во времени. При выполнении некоторых задач такое разделение может достичь более эффективного использования ресурсов вычислительной машины.

По-настоящему параллельное выполнение задач возможно только в многопроцессорной системе, поскольку только в них присутствуют несколько системных конвейеров для исполнения команд.
В однопроцессорной многозадачной системе поддерживается так называемое псевдопараллельное исполнение, при котором создается видимость параллельной работы нескольких процессов. В таких системах, однако, процессы выполняются последовательно, занимая малые кванты процессорного времени.

Процессы и потоки

В различных источниках информации можно найти много разных определений процессов и потоков. Такой разброс определений обусловлен, во-первых, эволюцией операционных систем, которая приводила к изменению понятий о процессах и потоках, во-вторых, различием точек зрения, с которых рассматриваются эти понятия.

В рамках данной статьи предлагаю придерживаться следующих определений…

С точки зрения пользователя:
Процесс — экземпляр программы во время выполнения.
Потоки — ветви кода, выполняющиеся «параллельно», то есть без предписанного порядка во времени.

С точки зрения операционной системы:
Процесс — это абстракция, реализованная на уровне операционной системы. Процесс был придуман для организации всех данных, необходимых для работы программы.
Процесс — это просто контейнер, в котором находятся ресурсы программы:

  • адресное пространство

  • потоки

  • открытые файлы

  • дочерние процессы

  • и т.д.

Поток — это абстракция, реализованная на уровне операционной системы. Поток был придуман для контроля выполнения кода программы.
Поток — это просто контейнер, в котором находятся:

  • Счётчик команд

  • Регистры

  • Стек

Поток легче, чем процесс, и создание потока стоит дешевле. Потоки используют адресное пространство процесса, которому они принадлежат, поэтому потоки внутри одного процесса могут обмениваться данными и взаимодействовать с другими потоками.

Почему нужна поддержка множества потоков внутри одного процесса?
В случае, когда одна программа выполняет множество задач, поддержка множества потоков внутри одного процесса позволяет:

  • Разделить ответственность за разные задачи между разными потоками

  • Повысить быстродействие

Кроме того, часто задачам необходимо обмениваться данными, использовать общие данные или результаты других задач. Такую возможность предоставляют потоки внутри процесса, так как они используют адресное пространство процесса, которому принадлежат. Конечно, можно было бы создать под разные задачи дополнительные процессы, но:

  • у процесса будет отдельное адресное пространство и данные, что затруднит взаимодействие частей программы

  • создание и уничтожение процесса дороже, чем создание потока

Отличие процесса от потока
Процесс рассматривается ОС, как заявка на все виды ресурсов (память, файлы и пр.), кроме одного — процессорного времени. Поток — это заявка на процессорное время. Процесс — это всего лишь способ сгруппировать взаимосвязанные данные и ресурсы, а потоки — это единицы выполнения (unit of execution), которые выполняются на процессоре.

Планирование, состояния потоков, приоритеты

Выбор текущего потока из нескольких активных потоков, пытающихся получить доступ к процессору называется планированием. Процедура планирования обычно связана с весьма затратной процедурой диспетчеризации — переключением процессора на новый поток, поэтому планировщик должен заботиться об эффективном использовании процессора.

Поток может находиться в одном из трёх состояний:

  • Выполняемый (Executing) — поток, который выполняется в текущий момент на процессоре.

  • Готовый (Runnable) — поток ждет получения кванта времени и готов выполнять назначенные ему инструкции. Планировщик выбирает следующий поток для выполнения только из готовых потоков.

  • Заблокированный (Waiting) — работа потока заблокирована в ожидании блокирующей операции.

В реальных задачах важность работы разных потоков может сильно различаться. Для контроля этого процесса был придуман приоритет работы. У каждого потока есть такое числовое значение приоритета. Если есть несколько спящих потоков, которые нужно запустить, то ОС сначала запустит поток с более высоким приоритетом. ОС управляет потоками так, как посчитает нужным. Потоки с низким приоритетом не будут простаивать, просто они будут получать меньше времени, чем другие, но выполняться все равно будут. Потоки с одинаковыми приоритетами запускаются в порядке очереди. Приоритет потока может меняться в процессе выполнения. Например, после завершения операции ввода-вывода могут увеличивать приоритет потока, чтобы дать ему возможность быстрее начать выполнение и, может быть, вновь инициировать операцию ввода-вывода. Таким способом система поощряет интерактивные потоки и поддерживает занятость устройств ввода-вывода.

Потоки могут быть созданы не только в режиме ядра, но и в режиме пользователя, в зависимости от того, какой планировщик потоков используется:

  • Центральный планировщик ОС режима ядра, который распределяет время между любым потоком в системе.

  • Планировщик библиотеки потоков. У библиотеки потоков режима пользователя может быть свой планировщик, который распределяет время между потоками различных процессов режима пользователя.

  • Планировщик потоков процесса. К примеру свой Thread Manager есть у каждого процесса Mac OS X, написанного с использованием библиотеки Carbon.

Системные вызовы, режимы доступа

Системный вызов — это вызов функции ядра, из приложения пользователя.

Чтобы защитить жизненно важные системные данные от доступа и (или) внесения изменений со стороны пользовательских приложений, в WIndows и Linux используются два процессорных режима доступа (даже если процессор поддерживает более двух режимов): пользовательский режим и режим ядра.
Код пользовательского приложения запускается в пользовательском режиме, а код операционной системы (например, системные службы и драйверы устройств) запускается в режиме ядра. Режим ядра — такой режим работы процессора, в котором предоставляется доступ ко всей системной памяти и ко всем инструкциям центрального процессора. Предоставляя программному обеспечению операционной системы более высокий уровень привилегий, нежели прикладному программному обеспечению, процессор гарантирует, что приложения с неправильным поведением не смогут в целом нарушить стабильность работы системы.
Также следует отметить, что в случае выполнения системного вызова потоком и перехода из режима пользователя, в режим ядра, происходит смена стека потока на стек ядра. При переключении выполнения потока одного процесса, на поток другого, ОС обновляет некоторые регистры процессора, которые ответственны за механизмы виртуальной памяти (например CR3), так как разные процессы имеют разное виртуальное адресное пространство. Здесь я специально не затрагиваю аспекты относительно режима ядра, так как подобные вещи специфичны для отдельно взятой ОС.
Старайтесь не злоупотреблять средствами синхронизации, которые требуют системных вызовов ядра (например мьютексы). Переключение в режим ядра — дорогостоящая операция!

Задачи и проблемы многопоточности

Какие задачи решает многопоточная система?

К достоинствам многопоточной реализации той или иной системы перед однопоточной можно отнести следующее:

  • Упрощение программы в некоторых случаях, за счёт вынесения механизмов чередования выполнения различных слабо взаимосвязанных подзадач, требующих одновременного выполнения, в отдельную подсистему многопоточности.

  • Повышение производительности процесса за счёт распараллеливания процессорных вычислений и операций ввода-вывода.

К достоинствам многопоточной реализации той или иной системы перед многопроцессной можно отнести следующее:

  • Упрощение программы (взаимодействия её параллельных частей) в некоторых случаях за счёт использования общего адресного пространства.

  • Меньшие относительно процесса временные затраты на создание потока.

Распараллеливать работу приложения бывает удобно в самых разных ситуациях. Вот несколько примеров:

  • Многопоточность широко используется в приложениях с пользовательским интерфейсом. В этом случае за работу интерфейса отвечает один поток, а какие-либо вычисления выполняются в других потоках. Это позволяет пользовательскому интерфейсу не подвисать, когда приложение занято другими вычислениями.

  • Многие алгоритмы легко разбиваются на независимые подзадачи, которые можно выполнять в разных потоках для повышения производительности. Например, при фильтрации изображения разные потоки могут заниматься фильтрацией разных частей изображения.

  • Если некоторые части приложения вынуждены ждать ответа от сервера/пользователя/устройства, то эти операции можно выделить в отдельный поток, чтобы в основном потоке можно было продолжать работу, пока другой поток ждёт ответа.

  • и т.д.

Кроме того, многопоточную систему можно реализовать с возможностью масштабирования производительности. Например, при распараллеливании алгоритма количество создаваемых потоков может зависеть от количества процессорных ядер. Это позволит ускорять работу программы в определённых пределах, улучшая железо и не изменяя код.

Какие проблемы несёт реализация многопоточных приложений?

Когда потоки должны взаимодействовать друг с другом или работать с общими данными, могут возникать проблемы. Часто проблемы многопоточности иллюстрируются на следующих задачах:

Рассмотрим некоторые проблемы синхронизации.

Состояние гонки (race condition)

Состояние гонки — ошибка проектирования многопоточной системы или приложения, при которой работа системы или приложения зависит от того, в каком порядке выполняются части кода.
Состояние гонки — «плавающая» ошибка (гейзенбаг), проявляющаяся в случайные моменты времени и «пропадающая» при попытке её локализовать.

Рассмотрим пример.
Допустим, каждый из двух потоков должен увеличить значение глобальной переменной 1. В идеальной ситуации последовательность операций должна быть следующая:

Thread 1

Thread 2

Integer value

0

read value

0

increase value

0

write back

1

read value

1

increase value

1

write back

2

В результате мы получаем значение 2, как и ожидали. Однако, если два потока работают одновременно, и их работа не синхронизируется, результат операции может быть неправильным. Возможна следующая последовательность операций:

Thread 1

Thread 2

Integer value

0

read value

0

read value

0

increase value

0

increase value

0

write back

1

write back

1

В этом случае результат будет равен 1, хотя ожидалось значение 2.

Код на C++, приводящий к состоянию гонки:

#include <iostream> #include <thread>  int main() {     unsigned long long g_count = 0;      std::thread t1([&]()     {         for(auto i = 0; i < 1'000'000; ++i)             ++g_count;     });          std::thread t2([&]()     {         for(auto i = 0; i < 1'000'000; ++i)             ++g_count;     });          t1.join();     t2.join();          std::cout << g_count;      return 0; } 

В данном примере решить проблему можно либо использованием атомарных операций вместо нескольких инструкций чтение-изменение-запись, либо ограничивая доступ потоков к переменной так, чтобы в один момент времени только один поток мог изменять переменную.

Использование атомарных операций:

#include <iostream> #include <thread> #include <atomic>  int main() {     std::atomic<unsigned long long> g_count { 0 };      std::thread t1([&]()     {         for(auto i = 0; i < 1'000'000; ++i)             g_count.fetch_add(1);     });          std::thread t2([&]()     {         for(auto i = 0; i < 1'000'000; ++i)             g_count.fetch_add(1);     });          t1.join();     t2.join();          std::cout << g_count;      return 0; } 

Подробнее про atomic:
std::atomic
std::atomic. Модель памяти C++ в примерах

Ограничение доступа к переменной так, чтобы только один поток в один момент времени мог изменять переменную:

int main() {     unsigned long long g_count = 0;     std::mutex g_count_mutex;      std::thread t1([&]()     {         for(auto i = 0; i < 1'000'000; ++i) {             g_count_mutex.lock();             g_count += 1;             g_count_mutex.unlock();         }     });          std::thread t2([&]()     {         for(auto i = 0; i < 1'000'000; ++i) {             g_count_mutex.lock();             g_count += 1;             g_count_mutex.unlock();         }     });          t1.join();     t2.join();          std::cout << g_count;      return 0; } 

В этом примере поток перед тем как изменить переменную захватывает mutex (устанавливает флаг о том, что переменная занята), а другой поток, пытаясь захватить тот же mutex в это же время, обнаруживает, что первый поток уже работает с переменной, и дожидается её освобождения.

Подробнее про mutex:
std::mutex

Используя mutex в примере выше, мы синхронизируем работу потоков. Mutex является примитивом синхронизации.
Примитивы синхронизации — механизмы, позволяющие реализовать взаимодействие потоков, например, единовременный доступ только одного потока к критической области.
Примитивы синхронизации преследуют различные задачи:

  • Взаимное исключение потоков — примитивы синхронизации гарантируют то, что единовременно с критической областью будет работать только один поток.

  • Синхронизация потоков — примитивы синхронизации помогают отслеживать наступление тех или иных конкретных событий, то есть поток не будет работать, пока не наступило какое-то событие. Другой поток в таком случае должен гарантировать наступление данного события.

Однако если взаимоотношения между потоками более сложные, то неаккуратные блокировки потоков могут приводить к новой проблеме — взаимным блокировкам (deadlock).

Взаимная блокировка (deadlock)

Deadlock — ситуация, при которой несколько потоков находятся в состоянии ожидания ресурсов, занятых друг другом, и ни один из них не может продолжать выполнение.

Представим, что поток-1 работает с каким-то Объектом-1, а поток-2 работает с Объектом-2. При этом программа написана так:

  • Поток-1 перестанет работать с Объектом-1 и переключится на Объект-2, как только Поток-2 перестанет работать с Объектом 2 и переключится на Объект-1.

  • Поток-2 перестанет работать с Объектом-2 и переключится на Объект-1, как только Поток-1 перестанет работать с Объектом 1 и переключится на Объект-2.

Даже не обладая глубокими знаниями в многопоточности легко понять, что ничего из этого не получится. Потоки никогда не поменяются местами и будут ждать друг друга вечно. Ошибка кажется очевидной, но на самом деле это не так. Допустить ее в программе можно запросто.

Кстати, на Quora есть отличные примеры из реальной жизни, объясняющие что такое deadlock.

Пример возникновения взаимной блокировки в программе на C++:

#include <iostream> #include <mutex> #include <thread> #include <mutex>  int main() {     std::mutex m1;     std::mutex m2;          auto f1 = [&m1, &m2]() {         std::lock_guard<std::mutex> lg1(m1);         std::this_thread::sleep_for(std::chrono::milliseconds(10));         std::lock_guard<std::mutex> lg2(m2);     };          auto f2 = [&m1, &m2]() {         std::lock_guard<std::mutex> lg1(m2);         std::this_thread::sleep_for(std::chrono::milliseconds(10));         std::lock_guard<std::mutex> lg2(m1);     };          std::thread thread1([&f1, &f2]() {         f1();     });          std::thread thread2([&f1, &f2]() {         f2();     });          thread1.join();     thread2.join();      return 0; } 

Менее наглядный, но более жизненный пример можно посмотреть тут.

Классический способ борьбы с взаимными блокировками состоит в том, чтобы захватывать несколько мьютексов всегда в одинаковом порядке.
Более строго, это значит, что между блокировками устанавливается отношение сравнения и вводится правило о запрете захвата «большей» блокировки в состоянии, когда уже захвачена «меньшая». Таким образом, если процессу нужно несколько блокировок, ему нужно всегда начинать с самой «большой» — предварительно освободив все захваченные «меньшие», если такие есть — и затем в нисходящем порядке. Это может привести к лишним действиям (если «меньшая» блокировка нужна и уже захвачена, она освобождается только чтобы тут же быть захваченной снова), зато гарантированно решает проблему.

С учётом этого пример принимает следующий вид:

#include <iostream> #include <mutex> #include <thread> #include <mutex>  int main() {     std::mutex m1;     std::mutex m2;          auto f1 = [&m1, &m2]() {         std::lock_guard<std::mutex> lg1(m1);         std::this_thread::sleep_for(std::chrono::milliseconds(10));         std::lock_guard<std::mutex> lg2(m2);     };          auto f2 = [&m1, &m2]() {         std::lock_guard<std::mutex> lg1(m1);         std::this_thread::sleep_for(std::chrono::milliseconds(10));         std::lock_guard<std::mutex> lg2(m2);     };          std::thread thread1([&f1, &f2]() {         f1();     });          std::thread thread2([&f1, &f2]() {         f2();     });          thread1.join();     thread2.join();      return 0; } 

В нашем простом примере легко было вручную задать верный порядок блокировки мьютексов, однако, это не всегда так легко. Например, в ситуации, когда два мьютекса передаются в функцию по ссылке и блокируются ею, порядок блокировки будет зависеть от порядка переданных аргументов. Поэтому для блокировки мьютексов одинаковом порядке стандартная библиотека предоставляет функцию std::lock (аналог std::mutex::lock()) и класс std::scoped_lock (аналог std::lock_guard).

std::scoped_lock — это улучшенная версия std::lock_guard, конструктор которого блокирует произвольное количество мьютексов в фиксированном порядке (как и std::lock). В новом коде следует использовать std::scoped_lock, std::lock_guard остался в языке для обратной совместимости. Пример:

#include <iostream> #include <mutex> #include <thread> #include <mutex>  int main() {     std::mutex m1;     std::mutex m2;          auto f1 = [&m1, &m2]() {         std::scoped_lock lg(m1, m2);         std::this_thread::sleep_for(std::chrono::milliseconds(10));     };          auto f2 = [&m1, &m2]() {         std::scoped_lock lg(m1, m2);         std::this_thread::sleep_for(std::chrono::milliseconds(10));     };          std::thread thread1([&f1, &f2]() {         f1();     });          std::thread thread2([&f1, &f2]() {         f2();     });          thread1.join();     thread2.join();      return 0; } 

Аналогичный код с std::lock и std::lock_guard выглядел бы следующим образом:

#include <iostream> #include <mutex> #include <thread> #include <mutex>  int main() {     std::mutex m1;     std::mutex m2;          auto f1 = [&m1, &m2]() {         std::lock(m1, m2);         std::lock_guard<std::mutex> lk1(m1, std::adopt_lock);         std::lock_guard<std::mutex> lk2(m2, std::adopt_lock);         std::this_thread::sleep_for(std::chrono::milliseconds(10));     };          auto f2 = [&m1, &m2]() {         std::lock(m1, m2);         std::lock_guard<std::mutex> lk1(m1, std::adopt_lock);         std::lock_guard<std::mutex> lk2(m2, std::adopt_lock);         std::this_thread::sleep_for(std::chrono::milliseconds(10));     };          std::thread thread1([&f1, &f2]() {         f1();     });          std::thread thread2([&f1, &f2]() {         f2();     });          thread1.join();     thread2.join();      return 0; } 

Если требуется больше гибкости, например, при использовании condition variables, можно использовать std::unique_lock:

#include <iostream> #include <mutex> #include <thread> #include <mutex>  int main() {     std::mutex m1;     std::mutex m2;          auto f1 = [&m1, &m2]() {         std::unique_lock<std::mutex> lk1(m1, std::defer_lock);         std::unique_lock<std::mutex> lk2(m2, std::defer_lock);         std::lock(lk1, lk2);         std::this_thread::sleep_for(std::chrono::milliseconds(10));     };          auto f2 = [&m1, &m2]() {         std::unique_lock<std::mutex> lk1(m1, std::defer_lock);         std::unique_lock<std::mutex> lk2(m2, std::defer_lock);         std::lock(lk1, lk2);         std::this_thread::sleep_for(std::chrono::milliseconds(10));     };          std::thread thread1([&f1, &f2]() {         f1();     });          std::thread thread2([&f1, &f2]() {         f2();     });          thread1.join();     thread2.join();      return 0; } 

Подробнее про unique_lock и lock_guard

Другие проблемы

Кроме описанных выше проблем, иногда можно столкнуться с проблемой голодания потоков и с проблемой livelock.

Голодание потоков — это ситуация, в которой поток не может получить доступ к общим ресурсам, потому что на эти ресурсы всегда претендуют какие-то другие потоки, которым отдаётся предпочтение.

Поток часто действует в ответ на действие другого потока. Если действие другого потока также является ответом на действие первого потока, то может возникнуть livelock. Потоки не блокируются — они просто слишком заняты, реагируя на действия друг друга, чтобы возобновить работу.

Подробнее о других проблемах
What is starvation?

Средства стандартной библиотеки C++

Управление потоками

У каждой программы на C++ есть как минимум один поток, запускаемый средой выполнения C++, — поток, выполняющий функцию main(). Затем программа может запустить дополнительные потоки, точкой входа в которые служит другая функция. После чего эти потоки и начальный поток выполняются одновременно. Аналогично завершению программы при выходе из main() поток завершается при возвращении из функции, указанной в качестве точки входа.

std::thread

Основной класс для создания новых потоков в C++ — это std::thread.

Кратко:

  • Объект класса представляет собой один поток выполнения.

  • Новый поток начинает выполнение сразу же после построения объекта std::thread. Выполнение начинается с функции верхнего уровня, которая передаётся в качестве аргумента в конструктор std::thread.

  • Возвращаемое значение этой функции игнорируется, а если в ней будет брошено исключение, которое не будет обработано в этом же потоке, то вызовется std::terminate.

  • Передать возвращаемое значение или исключение из нового потока наружу можно через std::promise или через глобальные переменные (работа с которыми потребует синхронизации, см. std::mutex и std::atomic).

  • Объекты std::thread также могут быть не связаны ни с каким потоком (после default construction, move from, detach или join), и поток выполнения может быть не связан ни с каким объектом std::thread (после detach).

  • Никакие два объекта std::thread не могут представлять один и тот же поток выполнения; std::thread нельзя копировать (не является CopyConstructible или CopyAssignable), но можно перемещать (является MoveConstructible и MoveAssignable).

Потоки запускаются созданием объекта std::thread, в котором определяется выполняемая в потоке задача. В простейшем случае эта задача представляет собой обычную функцию. Эта функция выполняется в собственном потоке, пока не вернет управление, после чего поток останавливается. Что бы ни собирался делать поток и откуда бы он ни запускался, его запуск с использованием стандартной библиотеки C++ всегда сводится к созданию объекта std::thread:

void do_some_work(); std::thread my_thread(do_some_work);

std::thread работает с любым вызываемым типом, поэтому конструктору std::thread можно также передать экземпляр класса с оператором вызова функции:

class background_task{ public:     void operator()() const {         do_something();         do_something_else();     } }; background_task f; std::thread my_thread(f); 

В данном случае предоставленный функциональный объект копируется в хранилище, принадлежащее вновь созданному потоку выполнения, и вызывается оттуда. Поэтому важно, чтобы копия действовала аналогично оригиналу, иначе результат может не соответствовать ожидаемому.

С помощью лямбда-выражения предыдущий пример можно записать следующим образом:

std::thread my_thread([]{     do_something();     do_something_else(); });

После запуска потока, нужно принять однозначное решение, ждать ли его завершения (join) или пустить его на самотек (detach). Если не принять решение до уничтожения объекта std::thread, то программа завершится (деструктор std::thread вызовет std::terminate()). Решение нужно принимать до того, как объект std::thread будет уничтожен. Сам же поток вполне мог бы завершиться задолго до его присоединения или отсоединения. Если его отсоединить, то при условии, что он все еще выполняется, он и будет выполняться, и этот процесс может продолжаться еще долго и после уничтожения объекта std::thread. Выполнение будет прекращено, только когда в конце концов произойдет возвращение из функции потока. Если не дожидаться завершения потока, необходимо убедиться, что данные, к которым он обращается, будут действительны, пока он не закончит работать с ними.

Дождаться завершения потока можно, вызвав join() для связанного экземпляра std::thread. Вызов join() приводит к очистке объекта std::thread, поэтому объект std::thread больше не связан с завершенным потоком. Мало того, он не связан ни с одним потоком. Это означает, что join() можно вызвать для конкретного потока только один раз: как только вызван метод join(), объект std::thread утрачивает возможность присоединения, а метод joinable() вернет значение false.

Вызов метода detach() для объекта std::thread позволяет потоку выполняться в фоновом режиме, непосредственное взаимодействие с ним не требуется. Возможность дождаться завершения этого потока исчезает: если поток отсоединяется, получить ссылающийся на него объект std::thread невозможно, поэтому такой поток больше нельзя присоединить. Отсоединенные потоки фактически выполняются в фоновом режиме, владение и управление ими передаются в библиотеку среды выполнения C++, которая гарантирует правильное высвобождение ресурсов, связанных с потоком, при выходе из него. Как правило, такие потоки являются весьма продолжительными, работая в течение практически всего времени жизни приложения и выполняя фоновую задачу, например отслеживая состояние файловой системы, удаляя неиспользуемые записи из кэш-памяти объектов или оптимизируя структуры данных. Метод detach() нельзя вызывать для объекта std::thread, не имеющего связанного с ним потока выполнения. Это требование аналогично тому, которое предъявляется к вызову метода join(), и проверку можно провести точно таким же образом — вызывать для объекта t типа std::thread метод t.detach() возможно, только если метод t.joinable() вернет значение true.

Передача аргументов вызываемому объекту или функции сводится к простой передаче дополнительных аргументов конструктору std::thread. Но важно учесть, что по умолчанию аргументы копируются во внутреннее хранилище, где к ним может получить доступ вновь созданный поток выполнения, а затем передаются вызываемому объекту или функции как r-значения (rvalues), как будто они временные. Так делается, даже если соответствующий параметр в функции ожидает ссылку. Рассмотрим пример:

void f(int i,std::string const& s); std::thread t(f,3,"hello");

В результате создается новый поток выполнения, связанный с t, который вызывает функцию f(3,"hello"). Обратите внимание: даже если f в качестве второго параметра принимает std::string, строковый литерал передается как char const* и преобразуется в std::string только в контексте нового потока. Это становится особенно важным, когда, как показано далее, предоставленный аргумент является указателем на локальную переменную:

void f(int i,std::string const& s); void oops(int some_param) {     char buffer[1024];     sprintf(buffer, "%i",some_param);     std::thread t(f,3,buffer);     t.detach(); }

Здесь это указатель на буфер локальной переменной, который передается в новый поток. И высока вероятность того, что выход из функции oops произойдет, прежде чем буфер будет в новом потоке преобразован в std::string, что вызовет неопределенное поведение. Решением является приведение к типу std::string перед передачей буфера в конструктор std::thread:

void f(int i,std::string const& s); void oops(int some_param) {     char buffer[1024];     sprintf(buffer, "%i", some_param);     std::thread t(f, 3, std::string(buffer));     t.detach(); }
void update_data_for_widget(widget_id w, widget_data& data); void oops_again(widget_id w){         widget_data data;     std::thread t(update_data_for_widget,w,data);     display_status();     t.join();     process_widget_data(data); }

Хотя update_data_for_widget ожидает, что второй параметр будет передан по ссылке, конструктор std::thread не знает об этом, он не обращает внимания на типы аргументов, которые ожидает функция, и слепо копирует предоставленные значения. Но внутренний код передает скопированные аргументы в качестве r-значений, чтобы работать с типами, предназначенными только для перемещений, и пытается таким образом вызвать update_data_for_widget с r-значением. Этот код не скомпилируется, так как нельзя передать r-значение функции, ожидающей не-const-ссылку. Для тех, кто знаком с std::bind, решение будет очевидным: аргументы, которые должны быть ссылками, следует заключать в std::ref. В этом случае при изменении вызова потока на:

std::thread t(update_data_for_widget,w,std::ref(data));

update_data_for_widget будет корректно передана ссылка на данные, а не временная копия данных, и код успешно скомпилируется. Если работать с std::bind уже приходилось, то в семантике передачи параметров не будет ничего нового, поскольку и операция конструктора std::thread, и операция std::bind определены в рамках одного и того же механизма.

Чтобы вызвать в отдельном потоке метод какого-ибо объекта, нужно передать указатель на объект в качестве первого аргумента этого метода:

class X { public:     void do_lengthy_work(); }; X my_x; std::thread t(&X::do_lengthy_work, &my_x);

Этот код вызовет my_x.do_lengthy_work() в новом потоке, поскольку в качестве указателя на объект предоставляется адрес my_x.

Еще один интересный сценарий предоставления аргументов применяется, когда аргументы нельзя скопировать, а можно только переместить. Примером может послужить тип std::unique_ptr, обеспечивающий автоматическое управление памятью для динамически выделяемых объектов. В одно и то же время на данный объект может указывать только один экземпляр std::unique_ptr, и когда этот экземпляр уничтожается, объект, на который он указывал, удаляется. Перемещающий конструктор и перемещающий оператор присваивания позволяют передавать права владения объектом между экземплярами std::unique_ptr. В результате этого исходный объект остается с нулевым указателем. Такое перемещение значений позволяет принимать объекты данного типа в качестве параметров функции или возвращать их из функций. Если исходный объект временный, перемещение выполняется автоматически, но если источником является именованное значение, передача должна быть запрошена напрямую путем вызова метода std::move(). В следующем примере показано использование std::move для передачи потоку права владения динамическим объектом:

void process_big_object(std::unique_ptr<big_object>); std::unique_ptr<big_object> p(new big_object); p->prepare_data(42); std::thread t(process_big_object,std::move(p));

Поскольку при вызове конструктора std::thread указан метод std::move(p), право владения big_object сначала передается внутреннему хранилищу вновь созданного потока, а затем переходит к process_big_object.

Мы разобрали основы использования класса std::thread для создания потоков. У объектов std::thread есть ещё пара полезных методов:

  • std::thread::get_id() возвращает id потока. Можно использовать для логирования или в качестве ключа ассоциативного контейнера потоков.

  • std::thread::native_handle() возвращает специфичный для операционной системы handle потока, который можно передавать в методы WinAPI или pthreads для более гибкого управления потоками.

Выбор количества потоков в ходе выполнения программы

Одна из функций стандартной библиотеки C++, помогающая решить данную задачу, — std::thread::hardware_concurrency(). Она возвращает то количество потоков, которые действительно могут работать одновременно в ходе выполнения программы. Например, в многоядерной системе оно может быть увязано с числом ядер центрального процессора. Функция дает всего лишь подсказку и может вернуть 0, если информация недоступна, но ее данные способны принести пользу при разбиении задачи на потоки.

std::jthread

В С++20 появился новый класс для создания потоков и управления ими std::jthread.

Класс jthread представляет собой один поток выполнения. Он имеет то же поведение, что и std::thread, за исключением того, что jthread автоматически join’ится при уничтожении и предлагает интерфейс для остановки потока.
В отличие от std::thread, jthread содержит внутренний закрытый член типа std::stop_source, который хранит stop-state. Конструктор jthread принимает функцию, которая принимает std::stop_token в качестве своего первого аргумента. Этот аргумент передаётся в функцию из stop_source, и позволяет функции проверить, была ли запрошена остановка во время ее выполнения, и завершиться при необходимости.

Подробнее о jthread
Так же существует возможность связать callback функции с событием остановки потока

Управление текущим потоком

Стандартная библиотека предоставляет несколько методов для управления текущим потоком. Все они находятся в пространстве имён std::this_thread:

  • std::this_thread::yield() подсказывает планировщику потоков перепланировать выполнение, приостановив текущий поток и отдав преимущество другим потокам. Точное поведение этой функции зависит от реализации, в частности от механики используемого планировщика ОС и состояния системы. Например, планировщик реального времени first-in-first-out (SCHED_FIFO в Linux) приостанавливает текущий поток и помещает его в конец очереди потоков с одинаковым приоритетом, готовых к запуску (если нет других потоков с таким же приоритетом, yield не делает ничего).

  • std::this_thread::get_id() работает аналогично std::thread::get_id().

  • std::this_thread::sleep_for(sleep_duration) блокирует выполнение текущего потока на время sleep_duration.

  • std::this_thread::sleep_until(sleep_time) блокирует выполнение текущего потока до наступления момента времени sleep_time.

Взаимное исключение потоков (Mutual exclusion)

Одним из ключевых преимуществ (перед использованием нескольких процессов) применения потоков для конкурентности является возможность совместного использования (разделения) данных несколькими потоками.

Представьте на минуту, что вы живете в одной квартире с приятелем. У вас одна кухня и одна ванная на двоих. Обычно ванной не пользуются одновременно несколько человек, и то, что сосед слишком долго плещется в воде, вынуждая вас дожидаться своей очереди, не может не раздражать. Возможно, одному из вас захочется запечь в духовке колбаски, в то время как у другого там готовятся кексы, и из этого тоже не выйдет ничего хорошего. Ну и всем знакомо чувство досады, когда при совместно используемом оборудовании вы на полпути к решению какой-нибудь задачи вдруг обнаруживаете, что кто-то взял что-то нужное вам в данный момент или что-то изменил, а вы рассчитывали, что все останется в прежнем состоянии или на своих местах.

То же самое происходит и с потоками. Если они совместно используют данные, для них нужны правила, определяющие, какой поток и к каким данным может получить доступ, когда и как любые обновления данных будут передаваться другим потокам, интересующимся этими данными. Некорректная работа с общими данными — одна из основных причин ошибок, связанных с конкурентностью.

Когда дело доходит до совместной работы с данными нескольких потоков, то все проблемы возникают из-за последствий изменения этих данных. Если все совместно используемые данные доступны только для чтения, проблем не будет, поскольку данные, считываемые одним потоком, не зависят от того, читает другой поток те же данные или нет. Но если один или несколько потоков, совместно использующих данные, начинают вносить вних изменения, создаются серьезные предпосылки для возникновения проблем. В таком случае следует обеспечить приемлемость конечных результатов.

Предположим, вы покупаете билет в кино. Если кинотеатр большой, билеты будут продавать сразу несколько кассиров, обслуживая одновременно несколько человек. Если кто-то в это же время покупает билет на тот же сеанс в другой кассе, то выбор места зависит от того, кто первым его закажет, вы или другой. Если осталось всего несколько мест, очередность может стать решающей: возможна настоящая гонка за последними билетами. Это пример состояния гонки: какое место вы получите и получите ли вообще, зависит от порядка двух покупок.

При конкурентности состоянием гонки является все, что зависит от порядка выполнения операций в двух и более потоках относительно друг друга: потоки участвуют в гонке по выполнению соответствующих операций. В стандарте C++ также определяется понятие гонки за данными, обозначающее конкретный тип состояния гонки, возникающий из-за одновременного изменения одного и того же объекта. Гонки за данными вызывают опасное неопределенное поведение.

Ошибки в состоянии гонки возникают, когда для завершения операции требуется выполнение нескольких инструкций процессора. Состояние гонки зачастую трудно определить и сложно воспроизвести. Обычно, вероятность, что между последовательно выполняемыми инструкциями вклинится другой поток, невелика. По мере увеличения нагрузки на систему и количества выполнения операции возрастает и вероятность возникновения проблемной последовательности выполнения. Проблемы практически неизменно появляются в самое неподходящее время. При запуске приложения под отладчиком проблема вообще может исчезать, поскольку отладчик влияет на выполнение программы.

Есть несколько способов, позволяющих справиться с проблемными состояниями гонок. Самый простой вариант — заключить структуру данных в механизм защиты, чтобы гарантировать, что промежуточные состояния, в которых нарушены инварианты, будут видны только потоку, выполняющему изменения. С позиции других потоков, обращающихся к этой же структуре данных, такие изменения либо еще не начнутся, либо уже завершатся. Стандартная библиотека C++ предоставляет несколько таких механизмов.

Есть еще один вариант — изменить конструкцию структуры данных и ее инвариантов так, чтобы модификации вносились в виде серии неделимых изменений, каждая из которых сохраняет инварианты. Обычно это называется программированием без блокировок (lock-free programming), и реализовать ее нелегко.

Простая защита данных с помощью мьютекса

std::mutex

Основным механизмом защиты совместно используемых данных, обеспеченным стандартом C++, является мьютекс.

Итак, имеется совместно используемая структура данных, например связный список, и его нужно защитить от состояния гонки и возможных нарушений инвариантов. Наверное, неплохо было бы получить возможность помечать все фрагменты кода, обращающиеся к структуре данных, как взаимоисключающие, чтобы при выполнении одного из них каким-либо потоком любой другой поток, пытающийся получить доступ к этой структуре данных, был бы вынужден ждать, пока первый поток не завершит выполнение такого фрагмента. Тогда поток не смог бы увидеть нарушенный инвариант, кроме тех случаев, когда он сам выполнял бы модификацию. Именно это будет получено при использовании примитива синхронизации под названием «мьютекс», означающего взаимное исключение (mutual exclusion). Перед получением доступа к совместно используемой структуре данных мьютекс, связанный с ней, блокируется, а когда доступ к ней заканчивается, блокировка с него снимается. Библиотека потоков гарантирует, что, как только один поток заблокирует определенный мьютекс, все остальные потоки, пытающиеся его заблокировать, должны будут ждать, пока поток, который успешно заблокировал мьютекс, его не разблокирует. Тем самым гарантируется, что все потоки видят непротиворечивое представление совместно используемых данных без нарушенных инвариантов. Мьютексы — главный механизм защиты данных, доступный в C++, но панацеей от всех бед их не назовешь: важно структурировать код таким образом, чтобы защитить нужные данные и избежать состояний гонки, присущих используемым интерфейсам. У мьютексов имеются и собственные проблемы ввиде взаимной блокировки и защиты либо слишком большого, либо слишком малого объема данных.

Класс std::mutex — это примитив синхронизации, который может использоваться для защиты общих данных от одновременного доступа нескольких потоков.

std::mutex предлагает эксклюзивную, нерекурсивную семантику владения:

  • Вызывающий поток владеет мьютексом с момента успешного вызова методов lock или try_lock до вызова unlock.

  • Когда поток владеет мьютексом, все остальные потоки блокируются (при вызове lock) или получают false (при вызове try_lock), если они пытаются претендовать на владение мьютексом.

  • Вызывающий поток не должен владеть мьютексом до вызова lock или try_lock.

Поведение программы не определено, если мьютекс уничтожается, все еще будучи заблокированным, или если поток завершается, не разблокировав мьютекс.

std::mutex не является ни копируемым, ни перемещаемым.

Если метод lock вызывается потоком, который уже владеет мьютексом, поведение не определено: например, программа может попасть в deadlock.

Метод try_lock может ошибаться и возвращать false, даже если мьютекс в данный момент не заблокирован никаким другим потоком.

Если try_lock вызывается потоком, который уже владеет мьютексом, поведение не определено.

Мьютекс должен быть разблокирован тем потоком выполнения, который его заблокировал, в противном случае поведение не определено.

std::mutex обычно не захватывается напрямую, поскольку при этом нужно помнить о необходимости вызова unlock() на всех путях выхода из функции, в том числе возникающих из-за выдачи исключений. Стандартной библиотекой C++ предоставляются классы std::unique_lock, std::lock_guard или std::scoped_lock (начиная с C++17) для более безопасного управления захватом мьютексов.

Мьютекс является объектом операционной системы, поэтому для работы с ним через API ОС, можно получить handle с помощью метода native_handle.

Пример использования мьютекса:

#include <iostream> #include <map> #include <string> #include <chrono> #include <thread> #include <mutex>   std::map<std::string, std::string> g_pages; std::mutex g_pages_mutex;   void save_page(const std::string &url) {     // simulate a long page fetch     std::this_thread::sleep_for(std::chrono::seconds(2));     std::string result = "fake content";       std::lock_guard<std::mutex> guard(g_pages_mutex);     g_pages[url] = result; }   int main()  {     std::thread t1(save_page, "http://foo");     std::thread t2(save_page, "http://bar");     t1.join();     t2.join();       // safe to access g_pages without lock now, as the threads are joined     for (const auto &pair : g_pages) {         std::cout << pair.first << " => " << pair.second << '\n';     } } 

В примере выше используются глобальные переменные для структуры данных и мьютекса. Иногда в таком использовании глобальных переменных есть определенный смысл, однако в большинстве случаев мьютекс и защищенные данные помещаются в один класс, а не в глобальные переменные. Это соответствует стандартным правилам объектно-ориентированного проектирования: помещение их в один класс служит признаком связанности друг с другом, позволяя инкапсулировать функциональность и обеспечить защиту. В данном случае save_page станет методом класса, а мьютекс и защищаемые данные— закрытыми членами класса, что значительно упростит определение того, какой код имеет доступ к данным и, следовательно, какой код должен заблокировать мьютекс. Если все методы класса блокируют мьютекс перед доступом к защищаемым данным и разблокируют его по завершении доступа, данные будут надежно защищены от любого обращающегося к ним кода. Однако, это не всегда так: если один из методов класса возвращает указатель или ссылку на защищаемые данные, то в защите будет проделана большая дыра. Теперь обратиться к защищенным данным и, возможно, их изменить, не блокируя мьютекс, сможет любой код, имеющий доступ к этому указателю или ссылке. Поэтому защита данных с помощью мьютекса требует тщательной проработки интерфейса. Помимо проверки того, что методы не возвращают указатели или ссылки вызывающему их коду, важно также убедиться, что они не передают эти указатели или ссылки тем функциям, которые вызываются ими и не контролируются вами. Такая передача не менее опасна: эти функции могут хранить указатель или ссылку в том месте, где их позже можно использовать без защиты, предоставляемой мьютексом. В этом смысле особенно опасны функции, которые предоставляются во время выполнения программы в виде аргументов или другим способом. К сожалению, помочь справиться с проблемой такого рода библиотека потоков C++ не в состоянии, задача блокировки нужного мьютекса для защиты данных возлагается на программиста. В то же время можно воспользоваться рекомендацией, которая поможет в подобных случаях: не передавайте указатели и ссылки на защищенные данные за пределы блокировки никаким способом: ни возвращая их из функции, ни сохраняя во внешне видимой памяти, ни передавая в качестве аргументов функциям, предоставленным пользователем.

Применение мьютекса или другого механизма для защиты совместно используемых данных не дает полной гарантии защищенности от состояния гонки. Рассмотрим структуру данных стека. Пусть над нашим стеком можно проводить следующие операции: можно поместить в стек новый элемент методом push(), извлечь элемент из стека методом pop(), прочитать верхний элемент с помощью top(), проверить, не является ли стек пустым, с помощью empty(), и прочитать количество элементов стека методом size().

#include <deque> #include <cstddef> template<typename T,typename Container=std::deque<T> > class stack { public:     explicit stack(const Container&);     explicit stack(Container&& = Container());     template <class Alloc> explicit stack(const Alloc&);     template <class Alloc> stack(const Container&, const Alloc&);     template <class Alloc> stack(Container&&, const Alloc&);     template <class Alloc> stack(stack&&, const Alloc&);      bool empty() const;     size_t size() const;     T& top();     T const& top() const;     void push(T const&);     void push(T&&);     void pop();     void swap(stack&&); }; 

Даже функция top() возвращает копию, а не ссылку, и внутренние данные защищены с помощью мьютекса, этот интерфейс все равно не будет застрахован от возникновения гонки. Проблема в том, что полагаться на результаты работы функций empty() и size() нельзя. Хотя на момент вызова они, вероятно, и были достоверными, но после возврата из функции любой другой поток может обратиться к стеку и затолкнуть в него новые элементы (push()), либо забрать существующие (pop()), причем до того, как поток, вызывающий empty() или size(), сможет воспользоваться этой информацией.

Более безопасный вариант реализации стека с упрощённым интерфейсом:

#include <exception> #include <stack> #include <mutex> #include <memory>  struct empty_stack: std::exception {     const char* what() const throw()     {         return "empty stack";     }      };  template<typename T> class threadsafe_stack { private:     std::stack<T> data;     mutable std::mutex m; public:     threadsafe_stack(){}     threadsafe_stack(const threadsafe_stack& other)     {         std::lock_guard<std::mutex> lock(other.m);         data=other.data;     }     threadsafe_stack& operator=(const threadsafe_stack&) = delete;      void push(T new_value)     {         std::lock_guard<std::mutex> lock(m);         data.push(new_value);     }     std::shared_ptr<T> pop()     {         std::lock_guard<std::mutex> lock(m);         if(data.empty()) throw empty_stack();         std::shared_ptr<T> const res(std::make_shared<T>(data.top()));         data.pop();         return res;     }     void pop(T& value)     {         std::lock_guard<std::mutex> lock(m);         if(data.empty()) throw empty_stack();         value=data.top();         data.pop();     }     bool empty() const     {         std::lock_guard<std::mutex> lock(m);         return data.empty();     } };  int main() {     threadsafe_stack<int> si;     si.push(5);     si.pop();     if(!si.empty())     {         int x;         si.pop(x);     } } 

std::timed_mutex

Класс timed_mutex — это примитив синхронизации, который может использоваться для защиты общих данных от одновременного доступа нескольких потоков.

Подобно мьютексу, timed_mutex предлагает эксклюзивную, нерекурсивную семантику владения. Кроме того, timed_mutex предоставляет возможность попытаться захватить timed_mutex с таймаутом с помощью методов try_lock_for() и try_lock_until().

Метод try_lock_for():

  • Пытается заблокировать мьютекс. Поток ожидает до тех пор, пока не истечет указанное время ожидания или не будет получена блокировка, в зависимости от того, что наступит раньше. При успешном получении блокировки возвращает true, в противном случае возвращает false.

  • Если timeout_duration меньше или равно timeout_duration.zero(), то функция ведет себя как try_lock().

  • Эта функция может блокировать поток дольше, чем timeout_duration, из-за задержек в работе планировщика или конкуренции за ресурсы между потоками.

  • Стандарт рекомендует использовать steady_clock для измерения длительности. Если реализация использует вместо этого system_clock, время ожидания также может быть чувствительно к корректировке часов.

  • Как и в случае с try_lock(), этой функции разрешено ложно возвращать false, даже если мьютекс не был заблокирован каким-либо другим потоком в какой-то момент во время timeout_duration.

  • Если try_lock_for вызывается потоком, который уже владеет мьютексом, поведение не определено.

Пример:

#include <iostream> #include <mutex> #include <thread> #include <vector> #include <sstream>   std::mutex cout_mutex; // control access to std::cout std::timed_mutex mutex;   void job(int id)  {     using Ms = std::chrono::milliseconds;     std::ostringstream stream;       for (int i = 0; i < 3; ++i) {         if (mutex.try_lock_for(Ms(100))) {             stream << "success ";             std::this_thread::sleep_for(Ms(100));             mutex.unlock();         } else {             stream << "failed ";         }         std::this_thread::sleep_for(Ms(100));     }       std::lock_guard<std::mutex> lock(cout_mutex);     std::cout << "[" << id << "] " << stream.str() << "\n"; }   int main()  {     std::vector<std::thread> threads;     for (int i = 0; i < 4; ++i) {         threads.emplace_back(job, i);     }       for (auto& i: threads) {         i.join();     } }  /* Возможный вывод: [0] failed failed failed  [3] failed failed success  [2] failed success failed  [1] success failed success */ 

Метод try_lock_until() работает так же, как try_lock_for(), но принимает std::chrono::time_point в качестве аргумента. Если timeout_time уже прошел, эта функция ведет себя как try_lock().

Пример:

#include <thread> #include <iostream> #include <chrono> #include <mutex>   std::timed_mutex test_mutex;   void f() {     auto now=std::chrono::steady_clock::now();     test_mutex.try_lock_until(now + std::chrono::seconds(10));     std::cout << "hello world\n"; }   int main() {     std::lock_guard<std::timed_mutex> l(test_mutex);     std::thread t(f);     t.join(); } 

RAII механизмы для блокировки мьютекса

std::lock_guard

Не рекомендуется использовать класс std::mutex напрямую, так как нужно помнить о вызове unlock на всех путях выполнения функции, в том числе на тех, которые завершаются броском исключения. То есть если между вызовами lock и unlock будет сгенерировано исключение, а вы этого не предусмотрите, то мьютекс не освободится, а заблокированные потоки так и останутся ждать. Проблема безопасности блокировок мьютексов в C++ threading library решена довольно обычным для C++ способом — применением техники RAII (Resource Acquisition Is Initialization). Оберткой служит шаблонный класс std::lock_guard. Это простой класс, конструктор которого вызывает метод lock для заданного объекта, а деструктор вызывает unlock. Также в конструктор класса std::lock_guard можно передать аргумент std::adopt_lock — индикатор, означающий, что mutex уже заблокирован и блокировать его заново не надо. std::lock_guard не содержит никаких других методов, и его нельзя копировать, переносить или присваивать.

Пример:

#include <thread> #include <mutex> #include <iostream>   int g_i = 0; std::mutex g_i_mutex;  // protects g_i   void safe_increment() {     const std::lock_guard<std::mutex> lock(g_i_mutex);     ++g_i;       std::cout << "g_i: " << g_i << "; in thread #"               << std::this_thread::get_id() << '\n';       // g_i_mutex is automatically released when lock     // goes out of scope }   int main() {     std::cout << "g_i: " << g_i << "; in main()\n";       std::thread t1(safe_increment);     std::thread t2(safe_increment);       t1.join();     t2.join();       std::cout << "g_i: " << g_i << "; in main()\n"; }  /* Возможный вывод: g_i: 0; in main() g_i: 1; in thread #140487981209344 g_i: 2; in thread #140487972816640 g_i: 2; in main() */ 

После появления std::scoped_lock в std::lock_guard пропала необходимость, он остаётся в языке лишь для обратной совместимости.

std::unique_lock

Класс unique_lock — это универсальная оболочка владения мьютексом, предоставляющая отсроченную блокировку, ограниченные по времени попытки блокировки, рекурсивную блокировку, передачу владения блокировкой и использование с condition variables.

Ограниченные по времени попытки блокировки работают так же, как и в классе std::timed_mutex. Для этого связанный мьютекс должен быть TimedLockable.

Отсроченная блокировка:

Класс std::unique_lock обеспечивает немного более гибкий подход, по сравнению с std::lock_guard: экземпляр std::unique_lock не всегда владеет связанным с ним мьютексом. Конструктору в качестве второго аргумента можно передавать не только объект std::adopt_lock, заставляющий объект блокировки управлять блокировкой мьютекса, но и объект отсрочки блокировки std::defer_lock, показывающий, что мьютекс при конструировании должен оставаться разблокированным. Блокировку можно установить позже, вызвав функцию lock() для объекта std::unique_lock (но не мьютекса) или же передав объект std::unique_lock функции std::lock().

std::unique_lock занимает немного больше памяти и работает несколько медленнее по сравнению с std::lock_guard. За гибкость, заключающуюся в разрешении экземпляру std::unique_lock не владеть мьютексом, приходится расплачиваться тем, что информация о состоянии должна храниться, обновляться и проверяться: если экземпляр действительно владеет мьютексом, деструктор должен вызвать функцию unlock(), в ином случае — не должен. Этот флаг можно запросить, вызвав метод owns_lock(). Если передача владения блокировкой или какие-то другие действия, требующие std::unique_lock, не предусматриваются, лучше воспользоваться классом std::scoped_lock из C++17.

Пример:

#include <mutex> #include <thread> #include <chrono>   struct Box {     explicit Box(int num) : num_things{num} {}       int num_things;     std::mutex m; };   void transfer(Box &from, Box &to, int num) {     // don't actually take the locks yet     std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);     std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);       // lock both unique_locks without deadlock     std::lock(lock1, lock2);       from.num_things -= num;     to.num_things += num;       // 'from.m' and 'to.m' mutexes unlocked in 'unique_lock' dtors }   int main() {     Box acc1(100);     Box acc2(50);       std::thread t1(transfer, std::ref(acc1), std::ref(acc2), 10);     std::thread t2(transfer, std::ref(acc2), std::ref(acc1), 5);       t1.join();     t2.join(); } 

Рекурсивная блокировка:

std::unique_lock можно использовать с мьютексами, поддерживающими рекурсивную блокировку. Это не значит, что для одного и того же unique_lock можно несколько раз вызвать метод lock(). Это значит, что в одном потоке несколько разных экземпляров std::unique_lock могут вызвать метод lock() для одного и того же мьютекса. Повторный же вызов метода lock() для одного и того же экземпляра std::unique_lock приводит к исключению. Подробнее про работу рекурсивных мьютексов будет написано далее.

Передача владения блокировкой:

Объекты std::unique_lock являются перемещаемыми. Владение мьютексом может передаваться между экземплярами std::unique_lock путем перемещения. В некоторых случаях, например, при возвращении экземпляра из функции, оно происходит автоматически, а в других случаях его необходимо выполнять явным образом вызовом функции std::move(). По сути, все зависит от того, является ли источник l-значением— реальной переменной или ссылкой на таковую — или r-значением— неким временным объектом. Владение передается автоматически, если источник является r-значением, или же должно передаваться явным образом, если он является l-значением, во избежание случайной передачи владения за пределы переменной. Класс std::unique_lock— это пример перемещаемого, но не копируемого типа.

Один из вариантов возможного использования заключается в разрешении функции заблокировать мьютекс, а затем передать владение этой блокировкой вызывающему коду, который впоследствии сможет выполнить дополнительные действия под защитой этой же самой блокировки. Соответствующий пример показан в следующем фрагменте кода, где функция get_lock() блокирует мьютекс, а затем подготавливает данные перед тем, как вернуть блокировку вызывающему коду:

std::unique_lock<std::mutex> get_lock() {     extern std::mutex some_mutex;     std::unique_lock<std::mutex> lk(some_mutex);     prepare_data();     return lk; } void process_data() {     std::unique_lock<std::mutex> lk(get_lock());     do_something(); } 

Поскольку lk — локальная переменная, объявленная внутри функции, ее можно возвратить напрямую, без вызова функции std:move(). О вызове конструктора перемещения позаботится компилятор. Затем функция process_data() сможет передать владение непосредственно в собственный экземпляр std::unique_lock, а вызов функции do_something() может полагаться на правильную подготовку данных. Обычно такой шаблон следует применять, когда блокируемый мьютекс зависит от текущего состояния программы или от аргумента, переданного в функцию, возвращающую объект std::unique_lock.

Использование с condition variables:

Подробнее про использование условных переменных будет написано ниже. А пока кратко…

Есть две реализации условных переменных, доступных в заголовке <condition_variable>:

  • condition_variable: требует от любого потока перед ожиданием сначала выполнить блокировку std::unique_lock

  • condition_variable_any: более общая реализация, которая работает с любым типом, который можно заблокировать. Эта реализация может быть более дорогой (с точки зрения ресурсов и производительности), поэтому ее следует использовать только если необходимы те дополнительные возможности, которые она предоставляет

Как использовать условные переменные:

  • Должен быть хотя бы один поток, ожидающий, пока какое-то условие станет истинным. Ожидающий поток должен сначала выполнить блокировку unique_lock. Эта блокировка передается методу wait(), который освобождает мьютекс и приостанавливает поток, пока не будет получен сигнал от условной переменной. Когда это произойдет, поток пробудится и мьютекс снова заблокируется.

  • Должен быть хотя бы один поток, сигнализирующий о том, что условие стало истинным. Сигнал может быть послан с помощью notify_one(), при этом будет разблокирован один (любой) поток из ожидающих, или notify_all(), что разблокирует все ожидающие потоки.

  • В виду некоторых сложностей при создании пробуждающего условия, могут происходить ложные пробуждения (spurious wakeup). Это означает, что поток может быть пробужден, даже если никто не сигнализировал условной переменной. Поэтому необходимо проверять, верно ли условие пробуждения уже после того, как поток был пробужден.

Пример:

#include <iostream> #include <vector> #include <thread>   std::vector<int> data; std::condition_variable data_cond; std::mutex m;   void thread_func1() {    std::unique_lock<std::mutex> lock(m);    data.push_back(10);    data_cond.notify_one(); }   void thread_func2() {    std::unique_lock<std::mutex> lock(m);    data_cond.wait(lock, [] {       return !data.empty();    });    std::cout << data.back() << std::endl; }   int main() {    std::thread th1(thread_func1);    std::thread th2(thread_func2);    th1.join();    th2.join(); } 

Рекурсивная блокировка мьютекса

Попытка потока заблокировать мьютекс, которым он уже владеет, приводит при использовании std::mutex к ошибке и неопределенному поведению. Но порой бывает нужно, чтобы поток многократно получал один и тот же мьютекс, не разблокируя его предварительно. Для этой цели в стандартной библиотеке C++ предусмотрен класс std::recursive_mutex. Он работает так же, как и std::mutex, за тем лишь исключением, что на один его экземпляр можно из одного и того же потока получить несколько блокировок. Прежде чем мьютекс сможет быть заблокирован другим потоком, нужно будет снять все ранее установленные блокировки, поэтому, если функция lock() вызывается три раза, то три раза должна быть вызвана и функция unlock(). При правильном применении std::lock_guard и std::unique_lock все это будет сделано за вас автоматически.

В большинстве случаев при возникновении желания воспользоваться рекурсивным мьютексом, скорее всего, требуется внести изменения в архитектуру приложения.

std::recursive_mutex

Класс recursive_mutex — это примитив синхронизации, который может использоваться для защиты общих данных от одновременного доступа нескольких потоков.

recursive_mutex предлагает эксклюзивную рекурсивную семантику владения:

  • Вызывающий поток владеет recursive_mutex в течение периода времени, который начинается, когда он успешно вызывает либо lock, либо try_lock. В течение этого периода поток может совершать дополнительные вызовы lock или try_lock. Период владения заканчивается, когда поток делает соответствующее количество вызовов unlock.

  • Когда поток владеет recursive_mutex, все остальные потоки будут ждать (для lock) или получать false (для try_lock), если они попытаются захватить recursive_mutex.

  • Максимальное количество раз, которое recursive_mutex может быть заблокирован, в стандарте не указано, но после достижения этого числа вызовы lock будут бросать std::system_error, а вызовы try_lock будут возвращать false.

Поведение программы не определено, если recursive_mutex уничтожается, все еще будучи заблокированным.

Пример:

#include <iostream> #include <thread> #include <mutex>   class X {     std::recursive_mutex m;     std::string shared;   public:     void fun1() {       std::lock_guard<std::recursive_mutex> lk(m);       shared = "fun1";       std::cout << "in fun1, shared variable is now " << shared << '\n';     }     void fun2() {       std::lock_guard<std::recursive_mutex> lk(m);       shared = "fun2";       std::cout << "in fun2, shared variable is now " << shared << '\n';       fun1(); // recursive lock becomes useful here       std::cout << "back in fun2, shared variable is " << shared << '\n';     }; };   int main()  {     X x;     std::thread t1(&X::fun1, &x);     std::thread t2(&X::fun2, &x);     t1.join();     t2.join(); }  /* Возможный вывод: in fun1, shared variable is now fun1 in fun2, shared variable is now fun2 in fun1, shared variable is now fun1 back in fun2, shared variable is fun1 */ 

std::recursive_timed_mutex

std::recursive_timed_mutex работает аналогично тому, как работает std::timed_mutex, но предоставляет возможность многократной блокировки одного мьютекса в одном потоке, как std::recursive_mutex.

Мьютексы чтения-записи для защиты часто читаемых и редко обновляемых структур данных

Если мы производим только чтение данных, то гонки данных не возникает. Однако, если мы хотим изменять данные, то мы вынуждены защищать их от одновременного доступа. Но что делать, если большую часть времени структура данных используется только для чтения, а в защите мы нуждаемся только при редких обновлениях этой структуры. Блокировать потоки при каждом чтении без необходимости не хотелось бы, потому что от этого пострадает производительность. Поэтому применение std::mutex для защиты такой структуры данных имеет мрачные перспективы, поскольку при этом исключается возможность реализовать конкурентность при чтении структуры данных в тот период, когда она не подвергается модификации, так что нужен другой вид мьютекса. Этот другой тип мьютекса обычно называют мьютексом чтения — записи, поскольку он допускает два различных типа использования: монопольный доступ для одного потока записи или общий одновременный доступ для нескольких потоков чтения. Стандартная библиотека C++17 предоставляет два полностью готовых мьютекса такого вида, std::shared_mutex и std::shared_timed_mutex.

Для операций записи вместо можно использовать std::lock_guard<std::shared_mutex>и std::unique_lock<std::shared_mutex>. Они обеспечивают монопольный доступ, как и при использовании std::mutex. В потоках, которым не нужно обновлять структуру данных, для получения совместного доступа вместо этого можно воспользоваться std::shared_lock<std::shared_mutex>. Этот шаблон класса RAII был добавлен в C++14 и применяется так же, как и std::unique_lock, за исключением того, что несколько потоков могут одновременно получить общую блокировку на один и тот же мьютекс std::shared_mutex. Ограничение заключается в том, что, если какой-либо имеющий shared блокировку поток попытается получить монопольную блокировку, он будет ждать до тех пор, пока все другие потоки не снимут свои блокировки. Аналогично, если какой-либо поток имеет монопольную блокировку, никакой другой поток не может получить shared или монопольную блокировку, пока не снимет свою блокировку первый поток.

std::shared_mutex

Класс shared_mutex — это примитив синхронизации, который может использоваться для защиты общих данных от одновременного доступа нескольких потоков. В отличие от других типов мьютексов, которые обеспечивают эксклюзивный доступ, shared_mutex имеет два уровня доступа:

  • общий доступ — несколько потоков могут совместно владеть одним и тем же мьютексом.

  • эксклюзивный доступ (исключительная блокировка) — только один поток может владеть мьютексом.

Если один поток получил эксклюзивный доступ (через lock, try_lock), то никакие другие потоки не могут получить блокировку (включая общую).

Если один поток получил общую блокировку (через lock_shared, try_lock_shared), ни один другой поток не может получить эксклюзивную блокировку, но может получить общую блокировку.

Только если исключительная блокировка не была получена ни одним потоком, общая блокировка может быть получена несколькими потоками.

В пределах одного потока одновременно может быть получена только одна блокировка (общая или эксклюзивная).

shared_mutex особенно полезны, когда общие данные могут быть безопасно считаны любым количеством потоков одновременно, но поток может перезаписывать данные только тогда, когда ни один другой поток не читает и не записывает в это время.

Пример использования:

#include <iostream> #include <mutex>  // For std::unique_lock #include <shared_mutex> #include <thread>   class ThreadSafeCounter {  public:   ThreadSafeCounter() = default;     // Multiple threads/readers can read the counter's value at the same time.   unsigned int get() const {     std::shared_lock lock(mutex_);     return value_;   }     // Only one thread/writer can increment/write the counter's value.   void increment() {     std::unique_lock lock(mutex_);     value_++;   }     // Only one thread/writer can reset/write the counter's value.   void reset() {     std::unique_lock lock(mutex_);     value_ = 0;   }    private:   mutable std::shared_mutex mutex_;   unsigned int value_ = 0; };   int main() {   ThreadSafeCounter counter;     auto increment_and_print = [&counter]() {     for (int i = 0; i < 3; i++) {       counter.increment();       std::cout << std::this_thread::get_id() << ' ' << counter.get() << '\n';         // Note: Writing to std::cout actually needs to be synchronized as well       // by another std::mutex. This has been omitted to keep the example small.     }   };     std::thread thread1(increment_and_print);   std::thread thread2(increment_and_print);     thread1.join();   thread2.join(); } 

std::shared_timed_mutex

std::shared_timed_mutex предлагает такую же семантику владения мьютексом, как std::shared_mutex.

Кроме того, std::shared_timed_mutex подобно timed_mutex предоставляет возможность попытаться претендовать на владение shared_timed_mutex с таймаутом с помощью методов try_lock_for(), try_lock_until(), try_lock_shared_for(), try_lock_shared_until().

std::shared_lock

Класс shared_lock — это аналог std::unique_lock для получения общего доступа к данным, защищаемым с помощью shared_mutex. Он позволяет отсроченную блокировку, попытку блокировки с таймаутом и передачу права владения блокировкой. Блокировка shared_lock блокирует shared_mutex в общем режиме (чтобы заблокировать его в эксклюзивном режиме, можно использовать std::unique_lock).

Класс shared_lock является перемещаемым, но не копируемым.

Для работы с условными переменными можно использовать std::condition_variable_any (std::condition_variable требует std::unique_lock и поэтому поддерживает только исключительное владение).

Захват нескольких мьютексов одновременно

std::lock

При малой глубине детализации блокировок для какой-либо операции может быть необходимо заблокировать два или более мьютекса. При этом может возникнуть еще одна проблема — взаимная блокировка. При взаимной блокировке один поток ждет завершения выполнения операции другим, поэтому ни один из потоков не выполняет работы.

Представьте себе игрушку, например, барабан с палочками. Играть с ним можно только при наличии обеих частей, из которых он состоит. А теперь представьте двух малышей, желающих с ним поиграть. Если у одного из них будут и барабан, и палочки, он сможет весело играть, пока не надоест. Если другому тоже захочется поиграть, ему, как ни досадно, придется подождать. Допустим, барабан и палочки валяются по отдельности в коробке с игрушками, а обоим малышам вдруг захотелось поиграть на барабане и они стали рыться в ней. Один нашел барабан, а другой — палочки. Возникла тупиковая ситуация: пока кто-нибудь не уступит и не даст поиграть другому, каждый останется при своем, требуя отдать ему недостающее, при этом никто не сможет играть на барабане.

Теперь представьте, что спорят не малыши из-за игрушек, а потоки из-за блокировок мьютексов: чтобы выполнить некую операцию, каждая пара потоков нуждается в блокировке каждой пары мьютексов, у каждого потока имеется один заблокированный мьютекс и он ожидает разблокировки другого мьютекса. Продолжить выполнение не может ни один из потоков, поскольку каждый ждет, когда другой разблокирует свой мьютекс. Такой сценарий называется взаимной блокировкой и представляет собой серьезную проблему при необходимости заблокировать для выполнения одной операции два мьютекса и более.

Общий совет по обходу взаимной блокировки заключается в постоянной блокировке двух мьютексов в одном и том же порядке: если всегда блокировать мьютекс А перед блокировкой мьютекса Б, то взаимной блокировки никогда не произойдет. Иногда это условие выполнить несложно, поскольку мьютексы служат разным целям, но кое-когда все гораздо сложнее, например, когда каждый из мьютексов защищает отдельный экземпляр одного и того же класса. Рассмотрим пример, в котором какая-то функция выполняет действие над двумя объектами одного класса. Чтобы обеспечить корректную работу и при этом избежать влияния изменений, вносимых в режиме конкурентности, следует заблокировать мьютексы на обоих экземплярах. Но если выбрать определенный порядок, например сначала блокировать мьютекс для экземпляра, переданного в качестве первого параметра, а затем мьютекс для экземпляра, переданного в качестве второго параметра, то можно получить обратный эффект: стоит всего другому потоку вызвать функцию с переставленными местами параметрами, и вы получите взаимную блокировку. В стандартной библиотеке C++ есть средство от этого в виде std::lock — функции, способной одновременно заблокировать два и более мьютекса, не рискуя вызвать взаимную блокировку.

#include <mutex> #include <thread> #include <iostream> #include <vector> #include <functional> #include <chrono> #include <string>   struct Employee {     Employee(std::string id) : id(id) {}     std::string id;     std::vector<std::string> lunch_partners;     std::mutex m;     std::string output() const     {         std::string ret = "Employee " + id + " has lunch partners: ";         for( const auto& partner : lunch_partners )             ret += partner + " ";         return ret;     } };   void send_mail(Employee &, Employee &) {     // simulate a time-consuming messaging operation     std::this_thread::sleep_for(std::chrono::seconds(1)); }   void assign_lunch_partner(Employee &e1, Employee &e2) {     static std::mutex io_mutex;     {         std::lock_guard<std::mutex> lk(io_mutex);         std::cout << e1.id << " and " << e2.id << " are waiting for locks" << std::endl;     }       // use std::lock to acquire two locks without worrying about      // other calls to assign_lunch_partner deadlocking us     {         std::lock(e1.m, e2.m);         std::lock_guard<std::mutex> lk1(e1.m, std::adopt_lock);         std::lock_guard<std::mutex> lk2(e2.m, std::adopt_lock); // Equivalent code (if unique_locks are needed, e.g. for condition variables) //        std::unique_lock<std::mutex> lk1(e1.m, std::defer_lock); //        std::unique_lock<std::mutex> lk2(e2.m, std::defer_lock); //        std::lock(lk1, lk2); // Superior solution available in C++17 //        std::scoped_lock lk(e1.m, e2.m);         {             std::lock_guard<std::mutex> lk(io_mutex);             std::cout << e1.id << " and " << e2.id << " got locks" << std::endl;         }         e1.lunch_partners.push_back(e2.id);         e2.lunch_partners.push_back(e1.id);     }     send_mail(e1, e2);     send_mail(e2, e1); }   int main() {     Employee alice("alice"), bob("bob"), christina("christina"), dave("dave");       // assign in parallel threads because mailing users about lunch assignments     // takes a long time     std::vector<std::thread> threads;     threads.emplace_back(assign_lunch_partner, std::ref(alice), std::ref(bob));     threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(bob));     threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(alice));     threads.emplace_back(assign_lunch_partner, std::ref(dave), std::ref(bob));       for (auto &thread : threads) thread.join();     std::cout << alice.output() << '\n'  << bob.output() << '\n'               << christina.output() << '\n' << dave.output() << '\n'; } 

Корректная разблокировка мьютексов при выходе из функции в этом примере обеспечивается с помощью использования std::lock_guard. В дополнение к мьютексу предоставляется параметр std::adopt_lock, чтобы показать объектам std::lock_guard, что мьютексы уже заблокированы. Объекты должны овладеть существующей блокировкой мьютекса, а не пытаться заблокировать его в конструкторе. Следует также отметить, что блокировка одного из мьютексов внутри вызова std::lock может привести к выдаче исключения, в таком случае исключение распространяется из std::lock. Если функцией std::lock успешно заблокирован один мьютекс, а исключение выдано при попытке заблокировать другой, первый мьютекс разблокируется автоматически: в отношении блокировки предоставленных мьютексов функция std::lock обеспечивает семантику «все или ничего».

Применение std::lock позволяет избавиться от взаимных блокировок, когда нужно завладеть сразу двумя и более блокировками, однако оно не поможет, если блокировки захватываются разобщенно. В таком случае, чтобы гарантировать обход взаимных блокировок, разработчикам приходится полагаться на самодисциплину. А это не так-то просто: взаимоблокировки относятся к одной из самых неприятных проблем, с которой приходится сталкиваться в многопоточном коде, их возникновение зачастую невозможно предсказать, поскольку в большинстве случаев все работает нормально. И тем не менее существует ряд относительно простых правил, помогающих создавать код, не подверженный взаимным блокировкам.

Все рекомендации по обходу взаимных блокировок сводятся к одному: не ждать завершения операции другим потоком, если есть вероятность, что он также ждет завершения операции текущим потоком:

  • Избегайте вложенных блокировок. Не устанавливайте блокировку, если уже есть какая-либо блокировка.

  • При удержании блокировки вызова избегайте кода, предоставленного пользователем. Если при удержании блокировки вызвать пользовательский код, устанавливающий блокировку, окажется нарушена рекомендация, предписывающая избегать вложенных блокировок, и может возникнуть взаимная блокировка.

  • Устанавливайте блокировки в фиксированном порядке. Если есть настоятельная необходимость установить две и более блокировки, но в рамках единой операции с помощью std::lock это невозможно, лучшее, что можно сделать, — установить их в каждом потоке в одном и том же порядке.

  • Используйте иерархию блокировок. Являясь частным случаем определения порядка блокировок, иерархия блокировок позволяет обеспечить средство проверки соблюдения соглашения в ходе выполнения программы. Такую проверку можно произвести в ходе выполнения программы, назначив номера уровней каждому мьютексу и сохранив записи о том, какие мьютексы заблокированы каждым потоком. Этот шаблон получил очень широкое распространение, но его прямая поддержка в стандартной библиотеке C++ не обеспечивается, поэтому нужно создать собственный тип мьютекса hierarchical_mutex.

std::try_lock

Аналог std::lock для попытки блокировки нескольких мьютексов. try_lock не приведёт к взаимной блокировке, даже если не будет определённого порядка блокировок. Поэтому он пытается заблокировать каждый из переданных блокируемых объектов lock_1, lock_2, …, lock_n, вызывая их метод try_lock в том порядке, в котором они переданы.

Если вызов try_lock для какого-либо аргумента завершается неудачно, дальнейшие вызовы try_lock не выполняются, а вызывается unlock для всех заблокированных объектов и возвращается индекс объекта, который не удалось заблокировать, начиная с 0.

Если вызов try_lock для какого-либо аргумента приводит к исключению, вызывается unlock для всех заблокированных объектов перед пробросом исключения наверх.

Возвращаемое значение: -1 при успешном выполнении или 0-based индекс объекта, который не удалось заблокировать.

std::scoped_lock

В C++17 предоставляется способ блокировки нескольких мьютексов одновременно в виде нового RAII-шаблона std::scoped_lock<>. Он практически эквивалентен std::lock_guard<>, за исключением того, что является вариационным шаблоном, принимающим в качестве параметров шаблона список типов мьютексов, а в качестве аргументов конструктора — список мьютексов. Предоставленные конструктору мьютексы блокируются с использованием такого же алгоритма, как и в std::lock, и, когда конструктор завершает работу, они оказываются заблокированными, а затем разблокируются в деструкторе.

Однократный вызов функции с помощью std::call_once и std::once_flag

Предположим, что есть совместно используемый ресурс, создание которого настолько затратно, что заниматься этим хочется лишь в крайней необходимости, когда пользователь обратился к этому ресурсу: возможно, он открывает подключение к базе данных или выделяет слишком большой объем памяти. Подобная отложенная (или ленивая) инициализация (lazy initialization) довольно часто встречается в однопоточном коде — каждая операция, требующая ресурса, сначала проверяет, не был ли он инициализирован, и, если не был, прежде чем воспользоваться этим ресурсом, инициализирует его. Если совместно используемый ресурс безопасен при получении к нему конкурентного доступа, единственной частью, нуждающейся в защите при преобразовании кода в многопоточный, является инициализация. Можно было бы защитить инициализацию мьютексом в многопоточном приложении:

#include <memory> #include <mutex>  struct some_resource {     void do_something()     {}      };  std::shared_ptr<some_resource> resource_ptr; std::mutex resource_mutex; void foo() {     std::unique_lock<std::mutex> lk(resource_mutex);     if(!resource_ptr)     {         resource_ptr.reset(new some_resource);     }     lk.unlock();     resource_ptr->do_something(); }  int main() {     foo(); } 

Но это может привести к ненужным блокировкам потоков, использующих ресурс. Причина в том, что каждый поток будет вынужден ожидать разблокировки мьютекса, чтобы проверить, не был ли ресурс уже инициализирован. Эта проблема настолько распространена, что многие пытались придумать более подходящий способ решения данной задачи, включая небезызвестный шаблон блокировки с двойной проверкой: сначала указатель считывается без получения блокировки, которая устанавливается, только если он имеет значение NULL. После получения блокировки указатель проверяется еще раз на тот случай, если между первой проверкой и получением блокировки данным потоком инициализация была выполнена каким-нибудь другим потоком:

void undefined_behaviour_with_double_checked_locking() {     if (!resource_ptr) {         std::lock_guard<std::mutex> lk(resource_mutex);         if (!resource_ptr) {             resource_ptr.reset(new some_resource);         }     }     resource_ptr->do_something(); }

Чтобы справиться с данной ситуацией, стандартная библиотека C++ предоставляет компоненты std::once_flag и std::call_once. Вместо блокировки мьютекса и явной проверки указателя каждый поток может безопасно воспользоваться функцией std::call_once, зная, что к моменту возвращения управления из этой функции указатель будет инициализирован каким-либо потоком. Необходимые для этого данные синхронизации хранятся в экземпляре std::once_flag, и каждый экземпляр std::once_flag соответствует другой инициализации. Задействование функции std::call_once обычно связано с меньшими издержками по сравнению с явным использованием мьютекса, особенно когда инициализация уже была выполнена. Поэтому предпочтение следует отдавать именно ей. Пример выше можно было бы изменить так:

std::shared_ptr<some_resource> resource_ptr; std::once_flag resource_flag; void init_resource() {     resource_ptr.reset(new some_resource); } void foo() {     std::call_once(resource_flag, init_resource);     resource_ptr->do_something(); }

Один из сценариев, предполагающих вероятность состояния гонки при инициализации, до C++11 был связан с применением локальной переменной, объявленной с ключевым словом static. Инициализация такой переменной определена так, чтобы она выполнялась при первом прохождении потока управления через ее объявление. Это означало, что несколько потоков, вызывающих функцию, в стремлении первыми выполнить определение могли вызвать состояние гонки. На многих компиляторах, предшествующих C++11, это создавало реальные проблемы, поскольку начать инициализацию могли сразу несколько потоков или же они могли пытаться использовать во время инициализации, запущенной в другом потоке. В C++11 эта проблема была решена: инициализация определена так, что выполняется только в одном потоке, и никакие другие потоки не будут продолжать выполнение до тех пор, пока эта инициализация не будет завершена. Когда нужна только одна глобальная переменная, этим свойством можно воспользоваться в качестве альтернативы std::call_once:

class MyClass; MyClass& get_instance() {     static MyClass instance;     return instance; }

Итак, std::call_once:

  • Выполняет вызываемый объект f ровно один раз, даже если он вызывается одновременно из нескольких потоков.

  • Если к моменту вызова call_once флаг указывает, что f уже был вызван, call_once сразу же завершается (пассивный вызов call_once).

  • В противном случае call_once вызывает std::forward(f) с аргументами std::forward(args). В отличие от конструктора std::thread или std::async, аргументы не перемещаются и не копируются, поскольку их не нужно передавать в другой поток выполнения. (активный вызов call_once).

  • Если вызов функции бросает исключение, оно передается в call_once, и флаг не устанавливается, чтобы был совершён другой вызов (exceptional вызов call_once).

  • Если этот вызов функции завершился успешно (returning вызов call_once), флаг устанавливается, и все остальные вызовы call_once с тем же флагом гарантированно будут пассивными.

  • Все активные вызовы с одним и тем же флагом образуют последовательность, состоящую из нуля или более exceptional вызовов, за которыми следует один returning вызов.

  • Если параллельные вызовы call_once выполняют различные функции f, то не определено, какая именно функция f будет вызвана. Выполняемая функция выполняется в том же потоке, что и call_once.

  • Инициализация локальной статической переменной гарантированно происходит только один раз, даже при вызове из нескольких потоков, и может быть более эффективной, чем эквивалентный код, использующий std::call_once.

  • POSIX-эквивалентом этой функции является pthread_once.

Условные переменные (Condition variables)

Представьте, что вы едете в ночном поезде. Чтобы гарантированно сойти на нужной станции, придется не спать всю ночь и внимательно отслеживать все остановки. Свою станцию вы не пропустите, но сойдете с поезда уставшим. Но есть и другой способ: заглянуть в расписание, увидеть предполагаемое время прибытия поезда на нужную станцию, поставить будильник на нужное время с небольшим запасом и лечь спать. Этого будет вполне достаточно, и вы не пропустите свою станцию, но, если поезд задержится, пробуждение окажется слишком ранним. Идеальным решением было бы лечь спать, попросив проводника разбудить вас на нужной станции.

Какое отношение все это имеет к потокам? Если какой-то поток ожидает, пока другой поток завершит выполнение своей задачи, есть несколько вариантов развития событий. Во-первых, первый поток может постоянно проверять состояние флага в совместно используемых данных, защищенных мьютексом, а второй поток будет обязан установить флаг по завершении своей задачи. Это весьма накладно по двум соображениям: постоянно проверяя состояние флага, поток впустую тратит ценное процессорное время, а когда мьютекс заблокирован ожидающим потоком, его нельзя заблокировать никаким другим потоком. Второй вариант предполагает введение ожидающего потока в спящий режим на короткий промежуток времени между проверками с помощью функции std::this_thread::sleep_for(). Это уже гораздо лучше, поскольку поток, находясь в спящем режиме, не тратит процессорное время впустую, но хороший период пребывания в нем подобрать довольно трудно. Слишком короткий период спячки между проверками — и поток по-прежнему тратит впустую время процессора на слишком частые проверки, слишком длинный период спячки — и поток не выйдет из нее позже положенного, что приведет к ненужной задержке. Позднее пробуждение напрямую влияет на работу программы довольно редко, но в приложении реального времени это может быть критичным. Третьим и наиболее предпочтительным вариантом является использование средств из стандартной библиотеки C++, предназначенных для ожидания наступления какого-либо события. Основным механизмом для реализации такого ожидания является условная переменная. Концептуально она связана с каким-либо условием, и один или несколько потоков могут ожидать выполнения этого условия. Когда другой поток обнаружит, что условие выполнено, он может известить об этом один или несколько потоков, ожидающих условную переменную, чтобы разбудить их и позволить продолжить работу.

Стандартная библиотека C++ предоставляет не одну, а две реализации условной переменной: std::condition_variable и std::condition_variable_any. Обе они объявлены в заголовке <condition_variable>. В обоих случаях для соответствующей синхронизации им нужно работать с мьютексом: первая реализация ограничивается работой только с std::mutex, а вторая может работать с любыми типами, которые работают как мьютекс, о чем свидетельствует суффикс _any. Если не требуется дополнительная гибкость, предпочтение следует отдавать реализации std::condition_variable.

std::condition_variable

Класс condition_variable — это примитив синхронизации, который может использоваться для блокировки потока или нескольких потоков до тех пор, пока другой поток не изменит общую переменную (не выполнит условие) и не уведомит об этом condition_variable.

Поток, который намеревается изменить общую переменную, должен:

  • захватить std::mutex (обычно через std::lock_guard)

  • выполнить модификацию, пока удерживается блокировка мьютекса

  • выполнить notify_one или notify_all на std::condition_variable (блокировка не должна удерживаться для уведомления)

Даже если общая переменная является атомарной, всё равно требуется использовать мьютекс для корректного оповещения ожидающих потоков.

Любой поток, который ожидает наступления события от std::condition_variable, должен:

  • С помощью std::unique_lock<std::mutex> получить блокировку того же мьютекса, который используется для защиты общей переменной.

  • Проверить, что необходимое условие ещё не выпонлено.

  • Вызвать метод wait, wait_for или wait_until. Операции ожидания освобождают мьютекс и приостанавливают выполнение потока.

  • Когда получено уведомление, истёк тайм-аут или произошло ложное пробуждение, поток пробуждается, и мьютекс повторно блокируется. Затем поток должен проверить, что условие, действительно, выполнено, и возобновить ожидание, если пробуждение было ложным.

Вместо трёх последних шагов можно воспользоваться перегрузкой методов wait, wait_for и wait_until, которая принимает предикат для проверки условия и выполняет три последних шага.

std::condition_variable работает только с std::unique_lock<std::mutex>; это ограничение обеспечивает максимальную эффективность на некоторых платформах. std::condition_variable_any работает с любым BasicLockable объектом, например, с std::shared_lock.

Condition variables допускают одновременный вызов методов wait, wait_for, wait_until, notify_one и notify_all из разных потоков.

Пример использования:

#include <iostream> #include <string> #include <thread> #include <mutex> #include <condition_variable>   std::mutex m; std::condition_variable cv; std::string data; bool ready = false; bool processed = false;   void worker_thread() {     // Wait until main() sends data     std::unique_lock<std::mutex> lk(m);     cv.wait(lk, []{return ready;});       // after the wait, we own the lock.     std::cout << "Worker thread is processing data\n";     data += " after processing";       // Send data back to main()     processed = true;     std::cout << "Worker thread signals data processing completed\n";       // Manual unlocking is done before notifying, to avoid waking up     // the waiting thread only to block again (see notify_one for details)     lk.unlock();     cv.notify_one(); }   int main() {     std::thread worker(worker_thread);       data = "Example data";     // send data to the worker thread     {         std::lock_guard<std::mutex> lk(m);         ready = true;         std::cout << "main() signals data ready for processing\n";     }     cv.notify_one();       // wait for the worker     {         std::unique_lock<std::mutex> lk(m);         cv.wait(lk, []{return processed;});     }     std::cout << "Back in main(), data = " << data << '\n';       worker.join(); } /* Возможный вывод: main() signals data ready for processing Worker thread is processing data Worker thread signals data processing completed Back in main(), data = Example data after processing */ 

std::condition_variable_any

Этот тип условной переменной имеет такой же интерфейс, как std::condition_variable, но может использоваться не только с std::unique_lock<std::mutex>, а с любым блокируемым типом. Работает медленее, чем std::condition_variable. Может использоваться, например, для работы с std::shared_lock.

std::notify_all_at_thread_exit

Стандартная библиотека предоставляет ещё одну функцию для использования в ситуациях, когда мы с помощью condition_variable хотим дождаться завершения потока.

Зачем это нужно?

N2880: C++ object lifetime interactions with the threads API

N3070 — Handling Detached Threads and thread_local Variables

Допустим, мы хотим дождаться завершения detached потока, в этом случае мы не можем использовать метод join для ожидания завершения потока. Тогда мы решаем, что нужно использовать condition_variable для уведомления о том, что поток завершается. Но если мы просто в конец функции, выполняемой в отдельном потоке, добавим cv.notify_all();, то получим поведение отличное от того, которое нам нужно. Несмотря на то, что эта команда будет последней в функции потока, поток на ней ещё не заканчивает выполнение. После вызова notify_all в этом же потоке будет происходить уничтожение thread_local переменных, будут вызываться их деструкторы и выполняться какие-либо действия. То есть, на самом деле, уведомление было отправлено ещё до того, как поток завершился.

Тогда как на самом деле дождаться полного завершения detached потока? Для этого стандартная библиотека предоставляет функцию std::notify_all_at_thread_exit. Она дожидается завершения потока, в том числе уничтожения thread_local переменных, и последними действиями в потоке выполняет:

lk.unlock(); cond.notify_all();

Эквивалентный эффект может быть достигнут с помощью средств, предоставляемых std::promise или std::packaged_task.

Пример использования:

#include <mutex> #include <thread> #include <condition_variable>   #include <cassert> #include <string>   std::mutex m; std::condition_variable cv;   bool ready = false; std::string result; // some arbitrary type   void thread_func() {     thread_local std::string thread_local_data = "42";       std::unique_lock<std::mutex> lk(m);       // assign a value to result using thread_local data     result = thread_local_data;     ready = true;       std::notify_all_at_thread_exit(cv, std::move(lk));   }   // 1. destroy thread_locals;     // 2. unlock mutex;     // 3. notify cv.   int main() {     std::thread t(thread_func);     t.detach();       // do other work     // ...       // wait for the detached thread     std::unique_lock<std::mutex> lk(m);     cv.wait(lk, []{ return ready; });       // result is ready and thread_local destructors have finished, no UB     assert(result == "42"); } 

Семафоры (Semaphores)

В C++20 в стандартной библиотеке появились семафоры.
Семафор (semaphore) — примитив синхронизации работы процессов и потоков, в основе которого лежит счётчик, над которым можно производить две атомарные операции: увеличение и уменьшение значения на единицу, при этом операция уменьшения для нулевого значения счётчика является блокирующей. Служит для построения более сложных механизмов синхронизации и используется для синхронизации параллельно работающих задач, для защиты передачи данных через разделяемую память, для защиты критических секций, а также для управления доступом к аппаратному обеспечению.

Семафоры могут быть двоичными и вычислительными. Вычислительные семафоры могут принимать целочисленные неотрицательные значения и используются для работы с ресурсами, количество которых ограничено, либо участвуют в синхронизации параллельно исполняемых задач. Двоичные семафоры могут принимать только значения 0 и 1 и используются для взаимного исключения одновременного нахождения двух или более процессов в своих критических секциях.

Мьютексные семафоры(мьютексы) являются упрощённой реализацией семафоров, аналогичной двоичным семафорам с тем отличием, что мьютексы должны отпускаться тем же потоком, который осуществляет их захват. Мьютексы наряду с двоичными семафорами используются в организации критических участков кода. В отличие от двоичных семафоров, начальное состояние мьютекса не может быть захваченным.

С помощью семафоров можно решать много разных задач синхронизации.

Проблемы, с которыми можно столкнуться при использовании семафоров

Стандартная библиотека C++ предлагает к использованию вычислительные и двоичные семафоры, представленные классами std::counting_semaphore, std::binary_semaphore.

counting_semaphore — это примитив синхронизации, который может управлять доступом к общему ресурсу. В отличие от мьютекса std::mutex, counting_semaphore допускает более одного параллельного доступа к одному и тому же ресурсу.

counting_semaphore содержит внутренний счетчик, который инициализируется конструктором. Этот счетчик уменьшается при вызовах метода acquire() и связанных с ним методов и увеличивается при вызовах метода release(). Когда счетчик равен нулю, acquire() блокирует поток до тех пор, пока счетчик не увеличится. Кроме того, для использования доступны методы:

  • try_acquire() не блокирует поток, а возвращает вместо этого false. Подобно std::condition_variable::wait(), метод try_acquire() может ошибочно возвращать false.

  • try_acquire_for() и try_acquire_until() блокируют до тех пор, пока счетчик не увеличится или не будет достигнут таймаут.

Семафоры нельзя копировать и перемещать.

В отличие от std::mutex, counting_semaphore не привязан к потокам выполнения — освобождение release() и захват acquire() семафора могут производиться в разных потоках (блокировка и освобождение мьютекса должны производиться одним и тем же потоком). Все операции над counting_semaphore могут выполняться одновременно и без какой-либо связи с конкретными потоками выполнения.

Семафоры также часто используются для реализации уведомлений. При этом семафор инициализируется значением 0, и потоки, ожидающие события блокируются методом acquire(), пока уведомляющий поток не вызовет release(n). В этом отношении семафоры можно рассматривать как альтернативу std::condition_variables.

Методы acquire() уменьшают значение счётчика семафора на 1. Методу release() можно передать в качестве параметра значение, на которое должен быть увеличен счётчик, по умолчанию значение равно 1.

std::counting_semaphore<std::ptrdiff_t LeastMaxValue = /* implementation-defined */> является шаблонным классом. В качестве параметра шаблона принимает значение, которое является нижней границей для максимально возможного значения счётчика. Фактический же максимум значений счётчика определяется реализацией и может быть больше, чем LeastMaxValue.

binary_semaphore — это просто псевдоним using binary_semaphore = std::counting_semaphore<1>;.

Пример использования:

#include <iostream> #include <thread> #include <chrono> #include <semaphore>     // global binary semaphore instances // object counts are set to zero // objects are in non-signaled state std::binary_semaphore     smphSignalMainToThread(0),     smphSignalThreadToMain(0);   void ThreadProc() {         // wait for a signal from the main proc     // by attempting to decrement the semaphore     smphSignalMainToThread.acquire();       // this call blocks until the semaphore's count     // is increased from the main proc       std::cout << "[thread] Got the signal\n"; // response message       // wait for 3 seconds to imitate some work     // being done by the thread     using namespace std::literals;     std::this_thread::sleep_for(3s);       std::cout << "[thread] Send the signal\n"; // message       // signal the main proc back     smphSignalThreadToMain.release(); }   int main() {     // create some worker thread     std::thread thrWorker(ThreadProc);       std::cout << "[main] Send the signal\n"; // message       // signal the worker thread to start working     // by increasing the semaphore's count     smphSignalMainToThread.release();       // wait until the worker thread is done doing the work     // by attempting to decrement the semaphore's count     smphSignalThreadToMain.acquire();       std::cout << "[main] Got the signal\n"; // response message     thrWorker.join(); } /* Вывод: [main] Send the signal [thread] Got the signal [thread] Send the signal [main] Got the signal */ 

Защёлки и барьеры (Latches and Barriers)

В C++20 в стандартной библиотеке появились барьеры.
Защелки latches и барьеры barriers — это механизм синхронизации потоков, который позволяет блокировать любое количество потоков до тех пор, пока ожидаемое количество потоков не достигнет барьера. Защелки нельзя использовать повторно, барьеры можно использовать повторно.

Эти механизмы синхронизации используются, когда выполнение параллельного алгоритма можно разделить на несколько этапов, разделённых барьерами. В частности, с помощью барьера можно организовать точку сбора частичных результатов вычислений, в которой подводится итог этапа вычислений. Например, если стоит задача отфильтровать изображение с помощью двух разных фильтров, и разные потоки фильтруют разные части изображения, то перед началом второй фильтрации следует дождаться, когда первая фильтрация будет полностью завершена, то есть все потоки должны дойти до барьера между двумя этапами фильтрации.

Барьер для группы потоков в исходном коде означает, что каждый поток должен остановиться в этой точке и подождать достижения барьера другими потоками группы. Когда все потоки достигли барьера, их выполнение продолжается.

std::latch

std::latch — это уменьшающийся счетчик. Значение счетчика инициализируется при создании. Потоки уменьшают значение счётчика и блокируются на защёлке до тех пор, пока счетчик не уменьшится до нуля. Нет возможности увеличить или сбросить счетчик, что делает защелку одноразовым барьером.

В отличие от std::barrier, std::latch может быть уменьшен одним потоком более одного раза.

Использовать защёлки очень просто. В нашем распоряжении несколько методов:

  • count_down(value) уменьшает значение счётчика на value (по умолчанию 1) без блокировки потока. Если значение счётчика становится отрицательным, то поведение не определено.

  • try_wait() позволяет проверить, не достигло ли значение счётчика нуля. С низкой вероятностью может ложно возвращать false.

  • wait() блокирует текущий поток до тех пор, пока счётчик не достигнет нулевого значения. Если значение счётчика уже равно 0, то управление возвращается немедленно.

  • arrive_and_wait(value) уменьшает значение счётчика на value (по умолчанию 1) и блокирует текущий поток до тех пор, пока счётчик не достигнет нулевого значения. Если значение счётчика становится отрицательным, то поведение не определено.

Пример:

void DoWork(threadpool* pool) {     latch completion_latch(NTASKS);     for (int i = 0; i < NTASKS; ++i) {       pool->add_task([&] {         // perform work         ...         completion_latch.count_down();       }));     }     // Block until work is done     completion_latch.wait(); }

std::barrier

Используется почти так же, как std::latch, но является многоразовым: как только ожидающие потоки разблокируются, значение счётчика устанавливается в начальное, и тот же самый барьер может быть использован повторно.

Работу барьера можно разбить на фазы. Фаза заканчивается, когда счётчик барьера обнуляется, затем начинается новая фаза. Фазы работы имеют идентификаторы, которые возвращаются некоторыми методами. Это нужно для того, чтобы мы не ждали конца фазы, которая уже завершена.

Итак, как пользоваться барьерами? В нашем распоряжении следующие методы:

  • arrive(value) уменьшает текущее значение счётчика на value (по умолчанию 1). Поведение не определено, если значение счётчика станет отрицательным. Метод возвращает идентификатор фазы, который имеет тип arrival_token.

  • wait(arrival_token) блокирует текущий поток до тех пор, пока указанная фаза не завершится. Принимает идентификатор фазы в качестве параметра.

  • arrive_and_wait() уменьшает текущее значение счётчика на 1 и блокирует текущий поток до тех пор, пока счётчик не обнулится. Эквивалентно вызову wait(arrive());. Поведение не определено, если вызов происходит, когда значение счётчика равно нулю. Поэтому количество потоков, уменьшающих счётчик барьера, не должно быть больше начального значения счётчика.

  • arrive_and_drop() уменьшает на 1 начальное значение счётчика для следующих фаз, а так же текущее значение счётчика. Поведение не определено, если вызов происходит, когда значение счётчика равно нулю.

Пример:

int n_threads; std::vector<thread*> workers; std::barrier task_barrier(n_threads);  for (int i = 0; i < n_threads; ++i) {     workers.push_back(new thread([&] {         for(int step_no = 0; step_no < 5; ++step_no) {             // perform step             ...             task_barrier.arrive_and_wait();         }     }); }

Возврат значений и проброс исключений (Futures)

Предположим, что имеются какие-то длительные вычисления, которые, как ожидается, вернут со временем полезный результат, значение которого вам пригодится позже. Для выполнения вычислений можно запустить новый поток, но это будет означать, что следует позаботиться о передаче результата в основную программу, поскольку std::thread не предоставляет непосредственного механизма для выполнения этой задачи.

Стандартная библиотека предоставляет средства для получения возвращаемых значений и перехвата исключений, создаваемых асинхронными задачами (т. е. функциями, запущенными в отдельных потоках). Эти значения передаются через общие объекты состояния выполнения, в которые асинхронная задача может записать свое возвращаемое значение или сохранить исключение, и которые могут быть проверены другими потоками, содержащими экземпляры std::future или std::shared_future, ссылающиеся на это общее состояние.

Кроме непосредственно механизма возврата значений с помощью std::future и std::promise, стандартная библиотека предоставляет более высокоуровневые средства для запуска задач, которые должны вернуть значение. std::packaged_task является обёрткой для ваших функций, которая позволяет автоматизировать сохранение результата в std::promise. А std::async является наиболее высокоуровневым инструментом для автоматического запуска задачи в отдельном потоке с возможностью позже запросить результат выполнения. Начнём рассмотрение с базовых низкоуровневых инструментов, чтобы понимать механику работы.

Низкоуровневые средства: возврат значений и проброс исключений с помощью std::future и std::promise

std::promise — это базовый механизм, позволяющий передавать значение между потоками. Каждый объект std::promise связан с объектом std::future. Это пара классов, один из которых (std::promise) отвечает за установку значения, а другой (std::future) — за его получение. Первый поток может ожидать установки значения с помощью вызова метода std::future::wait или std::future::get, в то время, как второй поток установит это значение с помощью вызова метода std::promise::set_value, или передаст первому исключение вызовом метода std::promise::set_exception.

Возврат значения

Шаблон класса std::promise предоставляет средство для сохранения значения или исключения, которое позже асинхронно забирается через объект std::future, созданный объектом std::promise.

Шаблон класса std::future предоставляет механизм доступа к результату асинхронных операций.

Пара объектов std::promise и связанный с ним std::future образуют канал связи между потоками. std::promise предоставляет операцию push для этого канала связи. Значение, записанное с помощью promise, может быть прочитано с помощью объекта future.

Каждый объект promise связан с общим состоянием выполнения, которое может быть еще не установлено или может хранить значение или исключение. Когда асинхронная операция готова вернуть результат, она может сделать это, изменив общее состояние (например, с помощью метода std::promise::set_value). Объект std::future (его можно получить с помощью метода std::promise::get_future) связывается с этим же самым общим состоянием. Поток, запустивший асинхронную операцию может затем использовать различные методы для проверки, ожидания готовности или извлечения значения из std::future. Эти методы могут блокировать выполнение, если асинхронная операция еще не предоставила значение.

Сохранение результата или исключения в std::promise приводит операцию в состояние готовности. Эта операция разблокирует поток, ожидающий результата. Если объект promise был уничтожен, а результат (значение или исключение) не был сохранён, то сохраняется исключение типа std::future_error с кодом ошибки std::future_errc::broken_promise, происходит приведение в состояние готовности.

Обратите внимание:

  • объект std::promise предназначен для использования только один раз, запросить значение (get()) из std::future можно только один раз.

  • с помощью std::future результата может дожидаться только один поток.. Параллельный доступ к одному и тому же общему состоянию может приводить к конфликтам.

Итак, как этим пользоваться?

Шаблон std::promise<T> позволяет устанавливать значение (типа T), которое позже можно прочитать через связанный объект std::future<T>. Ожидающий поток может заблокироваться на фьючерсе, а поток, предоставляющий данные,— воспользоваться другой половиной пары, промисом (promise, иногда называют обещанием), для установки связанного значения и приведения фьючерса в состояние готовности. Получить объект фьючерса std::future, связанный с заданным объектом std::promise, можно вызовом метода get_future(). Когда значение в promise установлено (с помощью метода set_value()), фьючерс приводится в состояние готовности и может использоваться для извлечения сохраненного значения. Если объект std::promise уничтожить без установки значения, вместо него будет сохранено исключение.

В нашем распоряжении есть несколько методов:

  • std::promise::

    • get_future() позволяет получить объект std::future, связанный с нашим объектом std::promise

    • set_value(value) сохраняет значение, которое можно запросить с помощью связанного объекта std::future

    • set_exception(exception) сохраняет исключение, которое будет брошено в потоке, запросившем значение из объекта std::future

    • set_value_at_thread_exit() и set_exception_at_thread_exit() сохраняют значение или исключение после завершения потока аналогично тому, как работает std::notify_all_at_thread_exit

  • std::future::

    • get() Дожидается, когда promise сохранит результат, и возвращает его. После вызова метода объект future удаляет ссылку на общее состояние, и метод valid() начинает возвращать false. Вызов для невалидного (valid() возвращает false) объекта приводит к неопределённому поведению или исключению (зависит от реализации). Если в promise было записано исключение, то оно будет брошено при вызове.

    • valid() Проверяет, связан ли объект future с каким-то общим состоянием. Вызов других методов для невалидного объекта приводит к неопределённому поведению или исключению (зависит от реализации).

    • wait() Блокирует текущий поток, пока promise не запишет значение. Вызов для невалидного (valid() возвращает false) объекта приводит к неопределённому поведению или исключению (зависит от реализации).

    • wait_for() и wait_until() Работают аналогично методу wait, но с ограничением на время ожидания. Возвращают future_status.

    • share() Конструирует и возвращает shared_future. Несколько объектов std::shared_future могут ссылаться на одно и то же общее состояние, что невозможно для std::future. После вызова метода объект future удаляет ссылку на общее состояние, и метод valid() начинает возвращать false.

Пример:

#include <vector> #include <thread> #include <future> #include <numeric> #include <iostream> #include <chrono>   void accumulate(std::vector<int>::iterator first,                 std::vector<int>::iterator last,                 std::promise<int> accumulate_promise) {     int sum = std::accumulate(first, last, 0);     accumulate_promise.set_value(sum);  // Notify future }   int main() {     // Demonstrate using promise<int> to transmit a result between threads.     std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };     std::promise<int> accumulate_promise;     std::future<int> accumulate_future = accumulate_promise.get_future();     std::thread work_thread(accumulate, numbers.begin(), numbers.end(),                             std::move(accumulate_promise));       // future::get() will wait until the future has a valid result and retrieves it.     // Calling wait() before get() is not needed     //accumulate_future.wait();  // wait for result     std::cout << "result=" << accumulate_future.get() << '\n';     work_thread.join();  // wait for thread completion } 

Проброс исключения

Предположим, что вызываемая в отдельном потоке функция может выдавать исключение:

double square_root(double x){     if(x<0) {         throw std::out_of_range("x<0");     }     return sqrt(x); } 

Если в функцию square_root() передается значение –1, она выдает исключение, которое становится видимым вызывающему коду. В идеале при выполнении этой функции в отдельном потоке хотелось бы получить точно такое же поведение, как при однопоточном варианте выполнения: было бы неплохо, чтобы код, вызвавший future::get(), мог видеть исключение.

std::promise предоставляет возможности сохранить исключение. Если вместо значения требуется сохранить исключение, то вместо set_value() вызывается метод set_exception(). Исключение сохраняется во фьючерсе на месте сохраненного значения, фьючерс приводится в состояние готовности и вызов get() бросает сохраненное исключение. (Примечание: в стандарте не указано, является ли повторно выдаваемое исключение исходным объектом исключения или его копией, разные компиляторы и библиотеки делают выбор по своему усмотрению.)

Обычно для исключения, выдаваемого в качестве части алгоритма, это делается в блоке catch:

extern std::promise<double> some_promise; try{     some_promise.set_value(square_root(x)); } catch(...){     some_promise.set_exception(std::current_exception()); }
some_promise.set_exception(std::make_exception_ptr(std::logic_error("foo ")));

Такой код выглядит намного понятнее, чем код с применением блока try-catch, — это не только упрощает код, но и расширяет возможности компилятора в области оптимизации кода.

То же самое происходит, если функция заключена в std::packaged_task: когда при вызове задачи этой функцией выдается исключение, оно сохраняется во фьючерсе на месте результата, готового к выдаче при вызове функции get().

Аналогичное поведение может быть достигнуто с помощью std::async:

std::future<double> f = std::async(square_root, -1); double y = f.get();

Еще один способ сохранения исключения во фьючерсе заключается в уничтожении связанного с фьючерсом объекта std::promise или объекта std::packaged_task без вызова каких-либо set-функций в отношении promise или без обращения к упакованной задаче. В этом случае деструктор std::promise или std::packaged_task сохранит исключение std::future_error с кодом ошибки std::future_errc::broken_promise в связанном состоянии, если фьючерс еще не перешел в состояние готовности: созданием фьючерса дается обещание предоставить значение или исключение, а уничтожением источника этого значения или исключения без их предоставления это обещание нарушается. Если бы компилятор в таком случае ничего не сохранял во фьючерсе, ожидающие потоки могли бы ожидать бесконечно.

Передача событий без состояния

promise-future можно использовать не только для передачи значения, но и просто для уведомления (хотя для этого можно использовать condition variables), если сохранить тип void. Например, можно сделать барьер (в С++20 для этого есть специальные средства).

Пример:

#include <vector> #include <thread> #include <future> #include <numeric> #include <iostream> #include <chrono>  void do_work(std::promise<void> barrier) {     std::this_thread::sleep_for(std::chrono::seconds(1));     barrier.set_value(); }   int main() {     // Demonstrate using promise<void> to signal state between threads.     std::promise<void> barrier;     std::future<void> barrier_future = barrier.get_future();     std::thread new_work_thread(do_work, std::move(barrier));     barrier_future.wait();     new_work_thread.join(); } 

Среднеуровневые средства: обёртка для функций и callable объектов std::packaged_task

Использование promise — это не единственный способ возврата значения из функции, выполняемой в другом потоке. Сделать это можно также заключением задачи в экземпляр std::packaged_task<>. Шаблон класса std::packaged_task является абстракцией более высокого уровня, чем std::promise.

Шаблон класса std::packaged_task обёртывает любую вызываемую цель (функцию, лямбда-выражение, bind expression или другой callable объект), чтобы ее можно было вызвать асинхронно с получением возвращаемого значения или исключения. Возвращаемое значение или вызванное исключение хранится в общем состоянии, доступ к которому можно получить через объекты std::future.

std::packaged_task работает так же, как если бы мы создали объект std::promise и сохранили в него результат работы функции.

Шаблон класса std::packaged_task<> привязывает фьючерс к функции или вызываемому объекту. Когда вызывается объект std::packaged_task<>, он вызывает связанную функцию или объект и приводит фьючерс в состояние готовности после возврата функцией значения или броска исключения. Этим классом можно воспользоваться как строительным блоком для пула потоков или других схем управления задачами, например, для запуска всех задач в специально выделенном потоке, работающем в фоновом режиме. Таким образом удается абстрагироваться от подробностей задач — диспетчер имеет дело только с экземплярами std::packaged_task, а не с отдельно взятыми функциями.

Параметром шаблона для std::packaged_task<> является сигнатура функции, например void() для функции, не получающей параметры и не имеющей возвращаемых значений, или int(std::string&,double*) для функции, получающей не-const-ссылку на std::string и указатель на double и возвращающей значение типа int. При создании экземпляра std::packaged_task ему следует передать функцию или вызываемый объект, принимающий указанные параметры, а затем возвращающий тип, который можно преобразовать в указанный тип возвращаемого значения. Точного совпадения типов не требуется, можно сконструировать объект std::packaged_task<double(double)> из функции, принимающей значение типа int и возвращающей значение типа float, поскольку возможно неявное приведение типов. Тип возвращаемого значения, указанный в сигнатуре функции, определяет тип объекта std::future<>, возвращаемого методом get_future(), а заданный в сигнатуре список аргументов используется для определения сигнатуры оператора вызова в классе packaged_task.

Объект std::packaged_task является вызываемым, значит, его можно обернуть объектом std::function или передать конструктору std::thread в качестве функции потока, или даже вызвать напрямую.

Когда std::packaged_task вызывается, аргументы, предоставленные оператору вызова функции, передаются содержащейся в этом объекте функции, а возвращаемое значение сохраняется в качестве результата в объекте std::future, полученном от get_future().Таким образом, задачу можно заключить в объект std::packaged_task и извлечь фьючерс перед передачей объекта std::packaged_task в отдельный поток. Когда понадобится результат, можно будет дождаться готовности фьючерса.

Итак, как это использовать?

В нашем распоряжении несколько методов:

  • get_future() позволяет получить связанный с состоянием задачи объект std::future, с помощью которого можно получить возвращаемое значение функции или брошенное исключение

  • operator() позволяет вызвать обёрнутую функцию, нужно передать аргументы функции

  • make_ready_at_thread_exit() позволяет дождаться полного завершения потока перед тем, как привести future в состояние готовности

  • reset() очищает результаты предыдущего запуска задачи

Пример:

#include <iostream> #include <cmath> #include <thread> #include <future> #include <functional>   // unique function to avoid disambiguating the std::pow overload set int f(int x, int y) { return std::pow(x,y); }   void task_lambda() {     std::packaged_task<int(int,int)> task([](int a, int b) {         return std::pow(a, b);      });     std::future<int> result = task.get_future();       task(2, 9);       std::cout << "task_lambda:\t" << result.get() << '\n'; }   void task_bind() {     std::packaged_task<int()> task(std::bind(f, 2, 11));     std::future<int> result = task.get_future();       task();       std::cout << "task_bind:\t" << result.get() << '\n'; }   void task_thread() {     std::packaged_task<int(int,int)> task(f);     std::future<int> result = task.get_future();       std::thread task_td(std::move(task), 2, 10);     task_td.join();       std::cout << "task_thread:\t" << result.get() << '\n'; }   int main() {     task_lambda();     task_bind();     task_thread(); } 

Пример с ожиданием полного завершения потока:

#include <future> #include <iostream> #include <chrono> #include <thread> #include <functional> #include <utility>   void worker(std::future<void>& output) {     std::packaged_task<void(bool&)> my_task{ [](bool& done) { done=true; } };       auto result = my_task.get_future();       bool done = false;       my_task.make_ready_at_thread_exit(done); // execute task right away       std::cout << "worker: done = " << std::boolalpha << done << std::endl;       auto status = result.wait_for(std::chrono::seconds(0));     if (status == std::future_status::timeout)         std::cout << "worker: result is not ready yet" << std::endl;       output = std::move(result); }     int main() {     std::future<void> result;       std::thread{worker, std::ref(result)}.join();       auto status = result.wait_for(std::chrono::seconds(0));     if (status == std::future_status::ready)         std::cout << "main: result is ready" << std::endl; } 

Пример со сбросом результатов предыдущего выполнения:

#include <iostream> #include <cmath> #include <thread> #include <future>   int main() {     std::packaged_task<int(int,int)> task([](int a, int b) {         return std::pow(a, b);     });     std::future<int> result = task.get_future();     task(2, 9);     std::cout << "2^9 = " << result.get() << '\n';       task.reset();     result = task.get_future();     std::thread task_td(std::move(task), 2, 10);     task_td.join();     std::cout << "2^10 = " << result.get() << '\n'; }

Высокоуровневые средства: запуск задач асинхронно с помощью std::async

Всё, что было описано выше — это хорошо, но может казаться слишком сложным для того, чтобы просто запустить задачу в отдельном потоке и получить значение. Иногда хочется иметь ещё более высокоуровневые инструменты и запускать задачи в одну строчку кода. Стандартная библиотека C++ предоставляет такую возможность.

std::async запускает функцию f асинхронно (потенциально в отдельном потоке, который может быть частью пула потоков) и возвращает std::future, который в конечном итоге будет содержать результат вызова этой функции.

std::async позволяет установить политику запуска задачи:

  • std::launch::async выполняет вызываемый объект f в новом потоке выполнения, как если бы он был запущен с помощью std::thread(std::forward<F>(f), std::forward<Args>(args)...), за исключением того, что если функция f возвращает значение или создает исключение, то оно хранится в общем состоянии, доступном через std::future, которое async возвращает вызывающей стороне.

  • std::launch::deferred не порождает новый поток выполнения. Вместо этого функция выполняется лениво: первый вызов несинхронной функции ожидания в std::future, возвращенном вызывающему объекту, вызовет копию f (как rvalue) с копиями args… (также передается как rvalues) в текущем потоке (который не обязательно должен быть потоком, который изначально вызывал std::async). Результат или исключение помещается в общее состояние, объект future приводится в состояние готовности. Дальнейший запрос результата из того же std::future немедленно вернёт результат.

  • std::launch::async | std::launch::deferred в зависимости от реализации, производится или асинхронное выполнение, или ленивое

  • Если ни std::launch::async, ни std::launch::deferred не установлен, то задаётся политика по умолчанию std::launch::async | std::launch::deferred

std::async возвращает объект std::future для получения значения.

std::async бросает исключение std::system_error, если политика запуска равна std::launch::async, но реализация не может запустить новый поток, или std::bad_alloc, если память для внутренних структур данных не может быть выделена.

Если std::future, полученный из std::async, не сохраняется, деструктор std::future блокирует поток до завершения асинхронной операции, как при синхронном выполнении:

std::async(std::launch::async, []{ f(); }); // temporary's dtor waits for f() std::async(std::launch::async, []{ g(); }); // does not start until f() completes

Обратите внимание, что деструкторы объектов std::future, полученных не из std::async, не блокируют поток.

Пример:

#include <iostream> #include <vector> #include <algorithm> #include <numeric> #include <future> #include <string> #include <mutex>   std::mutex m; struct X {     void foo(int i, const std::string& str) {         std::lock_guard<std::mutex> lk(m);         std::cout << str << ' ' << i << '\n';     }     void bar(const std::string& str) {         std::lock_guard<std::mutex> lk(m);         std::cout << str << '\n';     }     int operator()(int i) {         std::lock_guard<std::mutex> lk(m);         std::cout << i << '\n';         return i + 10;     } };   template <typename RandomIt> int parallel_sum(RandomIt beg, RandomIt end) {     auto len = end - beg;     if (len < 1000)         return std::accumulate(beg, end, 0);       RandomIt mid = beg + len/2;     auto handle = std::async(std::launch::async,                              parallel_sum<RandomIt>, mid, end);     int sum = parallel_sum(beg, mid);     return sum + handle.get(); }   int main() {     std::vector<int> v(10000, 1);     std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';       X x;     // Calls (&x)->foo(42, "Hello") with default policy:     // may print "Hello 42" concurrently or defer execution     auto a1 = std::async(&X::foo, &x, 42, "Hello");     // Calls x.bar("world!") with deferred policy     // prints "world!" when a2.get() or a2.wait() is called     auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");     // Calls X()(43); with async policy     // prints "43" concurrently     auto a3 = std::async(std::launch::async, X(), 43);     a2.wait();                     // prints "world!"     std::cout << a3.get() << '\n'; // prints "53" } // if a1 is not done at this point, destructor of a1 prints "Hello 42" here  /* Возможный вывод: The sum is 10000 43 world! 53 Hello 42 */ 

Ожидание результата в нескольких потоках с помощью std::shared_future

До сих пор во всех примерах использовался объект std::future. Но у него есть ограничения, в частности, результата может дожидаться только один поток. Если наступления одного и того же события нужно дожидаться сразу из нескольких потоков, следует воспользоваться std::shared_future.

Хотя std::future вполне справляется со всей синхронизацией, необходимой для переноса данных из одного потока в другой, вызовы методов std::future не синхронизированы друг с другом. Если обращаться к одному и тому же объекту std::future из нескольких потоков без дополнительной синхронизации, возникнет состояние гонки за данными и неопределенное поведение. std::future моделирует исключительное владение результатом асинхронных вычислений, а одноразовая природа функции get() лишает конкурентный доступ всякого смысла — значение можно извлечь только одним потоком, поскольку после первого же вызова get() значения для извлечения уже не останется.

Если же ваш проект требует, чтобы ожидать результата выполнения функции могли сразу несколько потоков, нужно использовать std::shared_future. Если std::future допускает только перемещение (чтобы право владения передавалось между экземплярами, но чтобы в конкретный момент только один экземпляр ссылался на конкретный результат асинхронного вычисления), экземпляры std::shared_future допускают копирование, поэтому могут существовать сразу несколько объектов, ссылающихся на одно и то же связанное состояние.

Однако работа с одним и тем же объектом std::shared_future из разных потоков по прежнему не синхронизирована, и во избежание проблем, следует передать каждому заинтересованному потоку собственную копию объекта std::shared_future, тогда все внутренние операции будут корректно синхронизированы средствами библиотеки. Таким образом, безопасность доступа к асинхронному состоянию из нескольких потоков обеспечивается, если каждый поток обращается к этому состоянию посредством собственного объекта std::shared_future.

Сконструировать объект std::shared_future можно либо передав право собственности его конструктору из std::future с помощью std::move:

std::shared_future sf(std::move(future));

Для r-value вызов std::move не требуется:

std::promise<int> p; std::shared_future<int> sf(p.get_future());
std::promise<int> p; auto sf = p.get_future().share();

Пример использования std::shared_future для реализации барьера:

#include <iostream> #include <future> #include <chrono>   int main() {        std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;     std::shared_future<void> ready_future(ready_promise.get_future());       std::chrono::time_point<std::chrono::high_resolution_clock> start;       auto fun1 = [&, ready_future]() -> std::chrono::duration<double, std::milli>      {         t1_ready_promise.set_value();         ready_future.wait(); // waits for the signal from main()         return std::chrono::high_resolution_clock::now() - start;     };         auto fun2 = [&, ready_future]() -> std::chrono::duration<double, std::milli>      {         t2_ready_promise.set_value();         ready_future.wait(); // waits for the signal from main()         return std::chrono::high_resolution_clock::now() - start;     };       auto fut1 = t1_ready_promise.get_future();     auto fut2 = t2_ready_promise.get_future();       auto result1 = std::async(std::launch::async, fun1);     auto result2 = std::async(std::launch::async, fun2);       // wait for the threads to become ready     fut1.wait();     fut2.wait();       // the threads are ready, start the clock     start = std::chrono::high_resolution_clock::now();       // signal the threads to go     ready_promise.set_value();       std::cout << "Thread 1 received the signal "               << result1.get().count() << " ms after start\n"               << "Thread 2 received the signal "               << result2.get().count() << " ms after start\n"; }  /* Возможный вывод: Thread 1 received the signal 0.072 ms after start Thread 2 received the signal 0.041 ms after start */

Что почитать?

Thread support library
Энтони Уильямс. C++. Практика многопоточного программирования
The Little Book of Semaphores
20 типичных ошибок многопоточности в C++
Добро пожаловать в параллельный мир. Часть 1: Мир многопоточный
Потоки, блокировки и условные переменные в C++11 [Часть 1]
Потоки, блокировки и условные переменные в C++11 [Часть 2]

Различные заметки по изучению устройства операционных систем
Процессы и потоки
Операционные системы. Процессы и потоки.
Процессы и потоки in-depth. Обзор различных потоковых моделей
В чем разница между потоком и процессом?
Лекция — Процессы и потоки (нити)
Процесс
Поток выполнения
Многозадачность
Многопоточность
Разница понятий
Пользовательский режим (user mode) и режим ядра (kernel mode)
Сравнение режима ядра и пользовательского режима
What is the difference between user and kernel modes in operating systems?
Планирование потоков

Многопоточность в Java: суть, «плюсы» и частые ловушки
Race condition Состояние гонки

Взаимная блокировка
C++ Deadlocks
C++11/C++14 9. Deadlocks — 2020


Узнать подробнее о курсе «C++ Developer. Professional«.

ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/549814/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *