Потоки, блокировки и условные переменные в C++11 [Часть 1]

от автора

В первой части этой статьи основное внимание будет уделено потокам и блокировкам в С++11, условные переменные во всей своей красе будут подробно рассмотрены во второй части…

Потоки

В C++11, работа с потокам осуществляется по средствам класса std::thread (доступного из заголовочного файла <thread>), который может работать с регулярными функциями, лямбдами и функторами. Кроме того, он позволяет вам передавать любое число параметров в функцию потока.

#include <thread>   void threadFunction() {      // do smth }   int main() {      std::thread thr(threadFunction);      thr.join();      return 0; } 

В этом примере, thr — это объект, представляющий поток, в котором будет выполняться функция threadFunction(). Вызов join блокирует вызывающий поток (в нашем случае — поток main) до тех пор, пока thr (а точнее threadFunction()) не выполнит свою работу. Если функция потока возвращает значение — оно будет проигнорировано. Однако принять функция может любое количество параметров.

void threadFunction(int i, double d, const std::string &s) {      std::cout << i << ", " << d << ", " << s << std::endl; }   int main() {      std::thread thr(threadFunction, 1, 2.34, "example");      thr.join();      return 0; } 

Несмотря на то, что передавать можно любое число параметров, все они были переданы по значению Если в функцию необходимо передать параметры по ссылке, они должны быть обернуты в std::ref или std::cref, как в примере:

void threadFunction(int &a) {      a++; }   int main() {      int a = 1;      std::thread thr(threadFunction, std::ref(a));      thr.join();      std::cout << a << std::endl;       return 0; } 

Программа напечатает в консоль 2. Если не использовать std::ref, то результатом работы программы будет 1.

Помимо метода join, следует рассмотреть еще один, похожий метод — detach.
detach позволяет отсоединить поток от объекта, иными словами, сделать его фоновым. К отсоединенным потокам больше нельзя применять join.

int main() {      std::thread thr(threadFunction);      thr.detach();      return 0; } 

Также следует отметить, что если функция потока кидает исключение, то оно не будет поймано try-catch блоком. Т.е. следующий код не будет работать (точнее работать то будет, но не так как было задумано: без перехвата исключений):

try {      std::thread thr1(threadFunction);      std::thread thr2(threadFunction);        thr1.join();      thr2.join(); }  catch (const std::exception &ex) {      std::cout << ex.what() << std::endl; } 

Для передачи исключений между потоками, необходимо ловить их в функции потока и хранить их где-то, чтобы, в дальнейшем, получить к ним доступ.

std::mutex                       g_mutex; std::vector<std::exception_ptr>  g_exceptions;  void throw_function() {      throw std::exception("something wrong happened"); }  void threadFunction() {      try      {           throw_function();      }      catch (...)      {           std::lock_guard<std::mutex> lock(g_mutex);           g_exceptions.push_back(std::current_exception());      } }  int main() {      g_exceptions.clear();      std::thread thr(threadFunction);      thr.join();       for(auto &e: g_exceptions)      {           try            {                if(e != nullptr)                     std::rethrow_exception(e);           }           catch (const std::exception &e)           {                std::cout << e.what() << std::endl;           }      }      return 0; } 

Прежде, чем двигаться дальше, хочу отметить некоторые полезные функции, предоставляемые <thread>, в пространстве имен std::this_thread:

  • get_id: возвращает id текущего потока
  • yield: говорит планировщику выполнять другие потоки, может использоваться при активном ожидании
  • sleep_for: блокирует выполнение текущего потока в течение установленного периода
  • sleep_until: блокирует выполнение текущего потока, пока не будет достигнут указанный момент времени

Блокировки

В последнем примере, я должен был синхронизировать доступ к вектору g_exceptions, чтобы быть уверенным, что только один поток одновременно может вставить новый элемент. Для этого я использовал мьютекс и блокировку на мьютекс. Мьютекс — базовый элемент синхронизации и в С++11 представлен в 4 формах в заголовочном файле <mutex>:

Приведу пример использования std::mutex с упомянутыми ранее функциями-помощниками get_id() и sleep_for():

#include <iostream> #include <chrono> #include <thread> #include <mutex>   std::mutex g_lock;   void threadFunction() {      g_lock.lock();        std::cout << "entered thread " << std::this_thread::get_id() << std::endl;      std::this_thread::sleep_for(std::chrono::seconds(rand()%10));      std::cout << "leaving thread " << std::this_thread::get_id() << std::endl;        g_lock.unlock(); }   int main() {      srand((unsigned int)time(0));      std::thread t1(threadFunction);      std::thread t2(threadFunction);      std::thread t3(threadFunction);      t1.join();      t2.join();      t3.join();      return 0; } 

Программа должна выдавать примерно следующее:

entered thread 10144 leaving thread 10144 entered thread 4188 leaving thread 4188 entered thread 3424 leaving thread 3424 

Перед обращением к общим данным, мьютекс должен быть заблокирован методом lock, а после окончания работы с общими данными — разблокирован методом unlock.

Следующий пример показывает простой потокобезопасный контейнер (реализованный на базе std::vector), имеющий методы add() для добавления одного элемента и addrange() для добавления нескольких элементов.
Примечание: и всё же этот контейнер не является полностью потокобезопасным по нескольким причинам, включая использование va_args. Также, метод dump() не должен принадлежать контейнеру, а должен быть автономной функцией. Цель этого примера в том, что показать основные концепции использования мьютексов, а не не сделать полноценный, безошибочный, потокобезопасный контейнер.

template <typename T> class container  {      std::mutex _lock;      std::vector<T> _elements; public:      void add(T element)       {           _lock.lock();           _elements.push_back(element);           _lock.unlock();      }       void addrange(int num, ...)      {           va_list arguments;           va_start(arguments, num);           for (int i = 0; i < num; i++)           {                _lock.lock();                add(va_arg(arguments, T));                _lock.unlock();           }           va_end(arguments);       }      void dump()      {           _lock.lock();           for(auto e: _elements)           std::cout << e << std::endl;           _lock.unlock();      } };   void threadFunction(container<int> &c) {      c.addrange(3, rand(), rand(), rand()); }   int main() {      srand((unsigned int)time(0));      container<int> cntr;      std::thread t1(threadFunction, std::ref(cntr));      std::thread t2(threadFunction, std::ref(cntr));      std::thread t3(threadFunction, std::ref(cntr));      t1.join();      t2.join();      t3.join();      cntr.dump();      return 0; } 

При выполнении этой программы произойдет deadlock (взаимоблокировка, т.е. заблокированный поток так и останется ждать). Причиной является то, что контейнер пытается получить мьютекс несколько раз до его освобождения (вызова unlock), что невозможно. Здесь и выходит на сцену std::recursive_mutex, который позволяет получать тот же мьютекс несколько раз. Максимальное количество получения мьютекса не определено, но если это количество будет достигно, то lock бросит исключение std::system_error. Поэтому, решение проблемы в коде выше (кроме изменения реализации addrange(), чтобы не вызывались lock и unlock), заключается в замене мьютекса на std::recursive_mutex.

template <typename T> class container  {      std::recursive_mutex _lock;      // ... }; 

Теперь, результат работы программы будет следующего вида:

6334 18467 41 6334 18467 41 6334 18467 41 

Вы, наверное, заметили, что при вызове threadFunction(), генерируются одни и те же числа. Это происходит потому, что функция void srand (unsigned int seed); инициализирует seed только для потока main. В других потоках, генератор псевдо-случайных чисел не инициализируется и получаются каждый раз одни и те же числа.
Явная блокировка и разблокировка могут привести к ошибкам, например, если вы забудете разблокировать поток или, наоборот, будет неправильный порядок блокировок — все это вызовет deadlock. Std предоставляет несколько классов и функций для решения этой проблемы.
Классы «обертки» позволяют непротиворечиво использовать мьютекс в RAII-стиле с автоматической блокировкой и разблокировкой в рамках одного блока. Эти классы:

  • lock_guard: когда объект создан, он пытается получить мьютекс (вызывая lock()), а когда объект уничтожен, он автоматически освобождает мьютекс (вызывая unlock())
  • unique_lock: в отличие от lock_guard, также поддерживает отложенную блокировку, временную блокировку, рекурсивную блокировку и использование условных переменных

С учетом этого, мы можем переписать класс контейнер следующим образом:

template <typename T> class container  {      std::recursive_mutex _lock;      std::vector<T> _elements; public:      void add(T element)       {           std::lock_guard<std::recursive_mutex> locker(_lock);           _elements.push_back(element);      }      void addrange(int num, ...)      {           va_list arguments;           va_start(arguments, num);           for (int i = 0; i < num; i++)           {                std::lock_guard<std::recursive_mutex> locker(_lock);                add(va_arg(arguments, T));           }           va_end(arguments);       }      void dump()      {           std::lock_guard<std::recursive_mutex> locker(_lock);           for(auto e: _elements)                std::cout << e << std::endl;      } }; 

Можно поспорить насчет того, что метод dump() должен быть константным, ибо не изменяет состояние контейнера. Попробуйте сделать его таковым и получите ошибку при компиляции:

‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)' : cannot convert parameter 1 from ‘const std::recursive_mutex'                                                    to ‘std::recursive_mutex &' 

Мьютекс (не зависимо от формы реализации), должен быть получен и освобожден, а это подразумевает использование не константных методов lock() и unlock(). Таким образом, аргумент lock_guard не может быть константой. Решение этой проблемы заключается в том, чтобы сделать мьютекс mutable, тогда спецификатор const будет игнорироваться и это позволит изменять состояние из константных функций.

template <typename T> class container  {      mutable std::recursive_mutex _lock;      std::vector<T> _elements; public:      void dump() const      {           std::lock_guard<std::recursive_mutex> locker(_lock);           for(auto e: _elements)                std::cout << e << std::endl;      } }; 

Конструкторы классов «оберток» могут принимать параметр, определяющий политику блокировки:

  • defer_lock типа defer_lock_t: не получать мьютекс
  • try_to_lock типа try_to_lock_t: попытаться получить мьютекс без блокировки
  • adopt_lock типа adopt_lock_t: предполагается, что у вызывающего потока уже есть мьютекс

Объявлены они следующим образом:

struct defer_lock_t { }; struct try_to_lock_t { }; struct adopt_lock_t { };   constexpr std::defer_lock_t defer_lock = std::defer_lock_t(); constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t(); constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t(); 

Помимо «оберток» для мьютексов, std также предоставляет несколько методов для блокировки одного или нескольких мьютексов:

  • lock: блокирует мьютекс, используя алгоритм избегания deadlock’ов (используя lock(), try_lock() и unlock())
  • try_lock: пытается блокировать мьютексы в порядке, в котором они были указаны

Вот типичный пример возникновения взаимоблокировки (deadlock): у нас есть некий контейнер с элементами и функция exchange(), которая меняет местами два элемента разных контейнеров. Для потокобезопасности, функция синхронизирует доступ к этим контейнерам, получая мьютекс, связанный с каждым контейнером.

template <typename T> class container  { public:      std::mutex _lock;      std::set<T> _elements;      void add(T element)       {           _elements.insert(element);      }      void remove(T element)       {           _elements.erase(element);      } };   void exchange(container<int> &c1, container<int> &c2, int value) {      c1._lock.lock();      std::this_thread::sleep_for(std::chrono::seconds(1)); // симулируем deadlock      c2._lock.lock();          c1.remove(value);      c2.add(value);      c1._lock.unlock();      c2._lock.unlock(); } 

Предположим, что эта функция вызвана из двух разных потоков, из первого потока: элемент удаляется из 1 контейнера и добавляется во 2, из второго потока, наоборот, элемент удаляется из 2 контейнера и добавляется в 1. Это может вызвать deadlock (если контекст потока переключается от одного потока к другому, сразу после первой блокировки).

int main() {      srand((unsigned int)time(NULL));      container<int> cntr1;       cntr1.add(1);      cntr1.add(2);      cntr1.add(3);      container<int> cntr2;       cntr2.add(4);      cntr2.add(5);      cntr2.add(6);      std::thread t1(exchange, std::ref(cntr1), std::ref(cntr2), 3);      std::thread t2(exchange, std::ref(cntr2), std::ref(cntr1), 6);      t1.join();      t2.join();      return 0; } 

Для решения этой проблемы можно использовать std::lock, который гарантирует блокировку безопасным (с точки зрения взаимоблокировки) способом:

void exchange(container<int> &c1, container<int> &c2, int value) {      std::lock(c1._lock, c2._lock);       c1.remove(value);      c2.add(value);      c1._lock.unlock();      c2._lock.unlock(); } 

На этом завершается первая часть данной статьи. Как только будет готова вторая часть (условные переменные) — ссылка на нее будет добавлена.

ссылка на оригинал статьи http://habrahabr.ru/post/182610/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *