Потоки
В C++11, работа с потокам осуществляется по средствам класса std::thread
(доступного из заголовочного файла <thread>
), который может работать с регулярными функциями, лямбдами и функторами. Кроме того, он позволяет вам передавать любое число параметров в функцию потока.
#include <thread> void threadFunction() { // do smth } int main() { std::thread thr(threadFunction); thr.join(); return 0; }
В этом примере, thr
— это объект, представляющий поток, в котором будет выполняться функция threadFunction()
. Вызов join
блокирует вызывающий поток (в нашем случае — поток main) до тех пор, пока thr
(а точнее threadFunction()
) не выполнит свою работу. Если функция потока возвращает значение — оно будет проигнорировано. Однако принять функция может любое количество параметров.
void threadFunction(int i, double d, const std::string &s) { std::cout << i << ", " << d << ", " << s << std::endl; } int main() { std::thread thr(threadFunction, 1, 2.34, "example"); thr.join(); return 0; }
Несмотря на то, что передавать можно любое число параметров, все они были переданы по значению Если в функцию необходимо передать параметры по ссылке, они должны быть обернуты в std::ref
или std::cref
, как в примере:
void threadFunction(int &a) { a++; } int main() { int a = 1; std::thread thr(threadFunction, std::ref(a)); thr.join(); std::cout << a << std::endl; return 0; }
Программа напечатает в консоль 2. Если не использовать std::ref
, то результатом работы программы будет 1.
Помимо метода join
, следует рассмотреть еще один, похожий метод — detach
.
detach
позволяет отсоединить поток от объекта, иными словами, сделать его фоновым. К отсоединенным потокам больше нельзя применять join
.
int main() { std::thread thr(threadFunction); thr.detach(); return 0; }
Также следует отметить, что если функция потока кидает исключение, то оно не будет поймано try-catch блоком. Т.е. следующий код не будет работать (точнее работать то будет, но не так как было задумано: без перехвата исключений):
try { std::thread thr1(threadFunction); std::thread thr2(threadFunction); thr1.join(); thr2.join(); } catch (const std::exception &ex) { std::cout << ex.what() << std::endl; }
Для передачи исключений между потоками, необходимо ловить их в функции потока и хранить их где-то, чтобы, в дальнейшем, получить к ним доступ.
std::mutex g_mutex; std::vector<std::exception_ptr> g_exceptions; void throw_function() { throw std::exception("something wrong happened"); } void threadFunction() { try { throw_function(); } catch (...) { std::lock_guard<std::mutex> lock(g_mutex); g_exceptions.push_back(std::current_exception()); } } int main() { g_exceptions.clear(); std::thread thr(threadFunction); thr.join(); for(auto &e: g_exceptions) { try { if(e != nullptr) std::rethrow_exception(e); } catch (const std::exception &e) { std::cout << e.what() << std::endl; } } return 0; }
Прежде, чем двигаться дальше, хочу отметить некоторые полезные функции, предоставляемые <thread>
, в пространстве имен std::this_thread
:
- get_id: возвращает id текущего потока
- yield: говорит планировщику выполнять другие потоки, может использоваться при активном ожидании
- sleep_for: блокирует выполнение текущего потока в течение установленного периода
- sleep_until: блокирует выполнение текущего потока, пока не будет достигнут указанный момент времени
Блокировки
В последнем примере, я должен был синхронизировать доступ к вектору g_exceptions
, чтобы быть уверенным, что только один поток одновременно может вставить новый элемент. Для этого я использовал мьютекс и блокировку на мьютекс. Мьютекс — базовый элемент синхронизации и в С++11 представлен в 4 формах в заголовочном файле <mutex>
:
- mutex: обеспечивает базовые функции lock() и unlock() и не блокируемый метод try_lock()
- recursive_mutex: может войти «сам в себя»
- timed_mutex: в отличие от обычного мьютекса, имеет еще два метода: try_lock_for() и try_lock_until()
- recursive_timed_mutex: это комбинация timed_mutex и recursive_mutex
Приведу пример использования std::mutex
с упомянутыми ранее функциями-помощниками get_id()
и sleep_for()
:
#include <iostream> #include <chrono> #include <thread> #include <mutex> std::mutex g_lock; void threadFunction() { g_lock.lock(); std::cout << "entered thread " << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::seconds(rand()%10)); std::cout << "leaving thread " << std::this_thread::get_id() << std::endl; g_lock.unlock(); } int main() { srand((unsigned int)time(0)); std::thread t1(threadFunction); std::thread t2(threadFunction); std::thread t3(threadFunction); t1.join(); t2.join(); t3.join(); return 0; }
Программа должна выдавать примерно следующее:
entered thread 10144 leaving thread 10144 entered thread 4188 leaving thread 4188 entered thread 3424 leaving thread 3424
Перед обращением к общим данным, мьютекс должен быть заблокирован методом lock
, а после окончания работы с общими данными — разблокирован методом unlock
.
Следующий пример показывает простой потокобезопасный контейнер (реализованный на базе std::vector
), имеющий методы add()
для добавления одного элемента и addrange()
для добавления нескольких элементов.
Примечание: и всё же этот контейнер не является полностью потокобезопасным по нескольким причинам, включая использование va_args
. Также, метод dump()
не должен принадлежать контейнеру, а должен быть автономной функцией. Цель этого примера в том, что показать основные концепции использования мьютексов, а не не сделать полноценный, безошибочный, потокобезопасный контейнер.
template <typename T> class container { std::mutex _lock; std::vector<T> _elements; public: void add(T element) { _lock.lock(); _elements.push_back(element); _lock.unlock(); } void addrange(int num, ...) { va_list arguments; va_start(arguments, num); for (int i = 0; i < num; i++) { _lock.lock(); add(va_arg(arguments, T)); _lock.unlock(); } va_end(arguments); } void dump() { _lock.lock(); for(auto e: _elements) std::cout << e << std::endl; _lock.unlock(); } }; void threadFunction(container<int> &c) { c.addrange(3, rand(), rand(), rand()); } int main() { srand((unsigned int)time(0)); container<int> cntr; std::thread t1(threadFunction, std::ref(cntr)); std::thread t2(threadFunction, std::ref(cntr)); std::thread t3(threadFunction, std::ref(cntr)); t1.join(); t2.join(); t3.join(); cntr.dump(); return 0; }
При выполнении этой программы произойдет deadlock (взаимоблокировка, т.е. заблокированный поток так и останется ждать). Причиной является то, что контейнер пытается получить мьютекс несколько раз до его освобождения (вызова unlock
), что невозможно. Здесь и выходит на сцену std::recursive_mutex
, который позволяет получать тот же мьютекс несколько раз. Максимальное количество получения мьютекса не определено, но если это количество будет достигно, то lock
бросит исключение std::system_error. Поэтому, решение проблемы в коде выше (кроме изменения реализации addrange()
, чтобы не вызывались lock
и unlock
), заключается в замене мьютекса на std::recursive_mutex
.
template <typename T> class container { std::recursive_mutex _lock; // ... };
Теперь, результат работы программы будет следующего вида:
6334 18467 41 6334 18467 41 6334 18467 41
Вы, наверное, заметили, что при вызове threadFunction()
, генерируются одни и те же числа. Это происходит потому, что функция void srand (unsigned int seed);
инициализирует seed
только для потока main. В других потоках, генератор псевдо-случайных чисел не инициализируется и получаются каждый раз одни и те же числа.
Явная блокировка и разблокировка могут привести к ошибкам, например, если вы забудете разблокировать поток или, наоборот, будет неправильный порядок блокировок — все это вызовет deadlock. Std предоставляет несколько классов и функций для решения этой проблемы.
Классы «обертки» позволяют непротиворечиво использовать мьютекс в RAII-стиле с автоматической блокировкой и разблокировкой в рамках одного блока. Эти классы:
- lock_guard: когда объект создан, он пытается получить мьютекс (вызывая
lock()
), а когда объект уничтожен, он автоматически освобождает мьютекс (вызываяunlock()
) - unique_lock: в отличие от
lock_guard
, также поддерживает отложенную блокировку, временную блокировку, рекурсивную блокировку и использование условных переменных
С учетом этого, мы можем переписать класс контейнер следующим образом:
template <typename T> class container { std::recursive_mutex _lock; std::vector<T> _elements; public: void add(T element) { std::lock_guard<std::recursive_mutex> locker(_lock); _elements.push_back(element); } void addrange(int num, ...) { va_list arguments; va_start(arguments, num); for (int i = 0; i < num; i++) { std::lock_guard<std::recursive_mutex> locker(_lock); add(va_arg(arguments, T)); } va_end(arguments); } void dump() { std::lock_guard<std::recursive_mutex> locker(_lock); for(auto e: _elements) std::cout << e << std::endl; } };
Можно поспорить насчет того, что метод dump()
должен быть константным, ибо не изменяет состояние контейнера. Попробуйте сделать его таковым и получите ошибку при компиляции:
‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)' : cannot convert parameter 1 from ‘const std::recursive_mutex' to ‘std::recursive_mutex &'
Мьютекс (не зависимо от формы реализации), должен быть получен и освобожден, а это подразумевает использование не константных методов lock()
и unlock()
. Таким образом, аргумент lock_guard
не может быть константой. Решение этой проблемы заключается в том, чтобы сделать мьютекс mutable
, тогда спецификатор const будет игнорироваться и это позволит изменять состояние из константных функций.
template <typename T> class container { mutable std::recursive_mutex _lock; std::vector<T> _elements; public: void dump() const { std::lock_guard<std::recursive_mutex> locker(_lock); for(auto e: _elements) std::cout << e << std::endl; } };
Конструкторы классов «оберток» могут принимать параметр, определяющий политику блокировки:
defer_lock
типаdefer_lock_t
: не получать мьютексtry_to_lock
типаtry_to_lock_t
: попытаться получить мьютекс без блокировкиadopt_lock
типаadopt_lock_t
: предполагается, что у вызывающего потока уже есть мьютекс
Объявлены они следующим образом:
struct defer_lock_t { }; struct try_to_lock_t { }; struct adopt_lock_t { }; constexpr std::defer_lock_t defer_lock = std::defer_lock_t(); constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t(); constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t();
Помимо «оберток» для мьютексов, std
также предоставляет несколько методов для блокировки одного или нескольких мьютексов:
- lock: блокирует мьютекс, используя алгоритм избегания deadlock’ов (используя
lock()
,try_lock()
иunlock()
) - try_lock: пытается блокировать мьютексы в порядке, в котором они были указаны
Вот типичный пример возникновения взаимоблокировки (deadlock): у нас есть некий контейнер с элементами и функция exchange()
, которая меняет местами два элемента разных контейнеров. Для потокобезопасности, функция синхронизирует доступ к этим контейнерам, получая мьютекс, связанный с каждым контейнером.
template <typename T> class container { public: std::mutex _lock; std::set<T> _elements; void add(T element) { _elements.insert(element); } void remove(T element) { _elements.erase(element); } }; void exchange(container<int> &c1, container<int> &c2, int value) { c1._lock.lock(); std::this_thread::sleep_for(std::chrono::seconds(1)); // симулируем deadlock c2._lock.lock(); c1.remove(value); c2.add(value); c1._lock.unlock(); c2._lock.unlock(); }
Предположим, что эта функция вызвана из двух разных потоков, из первого потока: элемент удаляется из 1 контейнера и добавляется во 2, из второго потока, наоборот, элемент удаляется из 2 контейнера и добавляется в 1. Это может вызвать deadlock (если контекст потока переключается от одного потока к другому, сразу после первой блокировки).
int main() { srand((unsigned int)time(NULL)); container<int> cntr1; cntr1.add(1); cntr1.add(2); cntr1.add(3); container<int> cntr2; cntr2.add(4); cntr2.add(5); cntr2.add(6); std::thread t1(exchange, std::ref(cntr1), std::ref(cntr2), 3); std::thread t2(exchange, std::ref(cntr2), std::ref(cntr1), 6); t1.join(); t2.join(); return 0; }
Для решения этой проблемы можно использовать std::lock
, который гарантирует блокировку безопасным (с точки зрения взаимоблокировки) способом:
void exchange(container<int> &c1, container<int> &c2, int value) { std::lock(c1._lock, c2._lock); c1.remove(value); c2.add(value); c1._lock.unlock(); c2._lock.unlock(); }
На этом завершается первая часть данной статьи. Как только будет готова вторая часть (условные переменные) — ссылка на нее будет добавлена.
ссылка на оригинал статьи http://habrahabr.ru/post/182610/
Добавить комментарий