Привет, Хабр.
Я потратил кучу времени на прочтение статей и книжек про эти указатели, много комплексного текста и мало схемок и примеров, а как по мне, они упрощают в разы понимание.
Буду называть «Hazard pointers» как «Хазарды». Тут я не буду вдаваться в детали и теорию про memory ordering, а буду стараться сформировать «прикладное» понимание вопроса. Эта статья больше как поверхностное введение, например для параграфа про одноименные указатели в «C++ Concurrency in Action» от Энтони Уильямса, нежели что‑то на чем стоит останавливаться слишком надолго.
Перво‑наперво, Хазарды позволяют безопасно освобождать память. Заодно они решают и ABA‑проблему — пока поток держит указатель в своём HP, этот узел никто не удалит и его адрес не переиспользует.
Нужда в первом возникает, когда больше одного потока работают с структурой. Например поток X взял указатель на вершину стэка → X засыпает → поток Y удаляет это вершину → X просыпается и читает значение по удаленному адресу.
Ну а на счет ABA проблемы. Что‑же это такое и когда она возникает?
Сперва стоит понимать, что она появляется в алгоритмах, где используется CAS, например Lock‑free стэк.
Допустим у нас 2 потока X и Y.
-
X: заходит в функцию стэка
-
X: берет указатель на верхний элемент структуры и считывает значение
-
X: Засыпает по воле планировщика
-
Y: выталкивает текущую вершину
-
Y: кладет новый элемент в стэк, но оказалось, что этот новый узел имеет тот же адрес, потому что память от удалённого узла была переиспользована аллокатором
-
X: через CAS проверяет что текущий узел лежит по тому же адресу, успех и возвращает значение, которое уже было удалено
Это гадкая проблема, она приводит к инвалидации структуры, так и к её полному разрушению или даже завершению процесса.
Теперь перейдем к самим Хазардам, как бы нам решить эти проблемы? Перечислим, что нам надо:
-
Понимать, что эту вершину удалять сейчас нельзя
-
Как‑то откладывать удаление и производить его, когда никто не держит эту вершину
Из этого следует, что нам нужен общий менеджер, который курирует всеми хазардами, удаляет ненужные и запоминает те, что еще используются кем‑то.
Этим условиям соответствует данная схема.
В данной архитектуре каждый поток держит у себя hp_ptr(указатель), с которым он работает. Этот указатель могут читать другие потоки. Также имеется личный retired_list, в который он складывает то, что надо будет удалить. Вектор all_records держит указатели на эти хранилища, чтобы каждый поток мог прочитать их. Число потоков может быть любым.
Сформулируем необходимый перечень функционала для HazardManager:
-
Защитить указатель
-
Снять защиту
-
Запомнить указатель для последующего удаления
-
Сбор используемых указателей и удаление ненужных
-
Регистрация новых потоков для all_records
Если вам интересна реализация или вы хотите увидеть пример кода, то идем дальше. Но я бы советовал самостоятельно реализовать для понимания. Все таки решить эту задачу можно разными способами. Учтите, код далее должен быть аккуратно внедрен в алгоритм структуры при использовании.
Я решил использовать Meyers singleton для глобального и локального состояния. На namespace LFA не обращайте внимание, это моя локальная библиотека велосипедов.
LIMIT — число элементов в retired_list после которого поток пытается произвести очистку. TAG — для множественного инстанцирования в разных модулях и участках кода.
template<std::size_t LIMIT, std::size_t TAG>class HazardPointersManager {private: struct ThreadRecord { std::atomic<void*> hp_ptr{ nullptr }; std::vector<std::pair<void*, void(*)(void*)>> retired_list; // Макрооптимизация, сюда мы складываем чужие указатели во время scan std::vector<void*> active_hps_buffer; }; struct GlobalState { std::vector<ThreadRecord*> all_records; LFA::shared_mutex<Backoff::pause> registration_mutex; }; static GlobalState& get_global() { static GlobalState state; return state; } static ThreadRecord& get_local_record() { static thread_local ThreadRecord record; return record; }public: // Единжды вызываем static void register_thread() { auto& record = get_local_record(); auto& state = get_global(); LFA::unique_lock_guard lock(state.registration_mutex); state.all_records.push_back(&record); } ...}
Вроде пока просто, всего лишь обертка для того чтобы выполнялись условия ТЗ и схемки. Тут используется трюк с thread_local, чтобы каждый поток сам для себя создавал ThreadRecord.
Ну и регистрация как видите также делается примитивным образом, под защитой мьютекса.
Далее пойдут функции, учитывайте, что они также внутри класса HazardPointersManager.
static void protect(void* ptr) { auto& record = get_local_record(); record.hp_ptr.store(ptr, std::memory_order::release);}static void unprotect() { auto& record = get_local_record(); record.hp_ptr.store(nullptr, std::memory_order::release);}
Как я и говорил, protect, просто атомарная запись, ничего сверхъестественного. УЧТИТЕ: вы должны сами проверять после protect, вы защищаете валидный указатель и он или сама структура не изменились так, что он более нам не подходит.
static void retire(void* ptr, void (*deleter)(void*)) { auto& vec = get_local_record().retired_list; vec.push_back({ ptr, deleter }); if (vec.size() >= LIMIT) { auto& active_hps = get_local_record().active_hps_buffer; scan(vec, active_hps); }}
Функция retire Принимает сам указатель и функцию удаления и кладет их в свой внутренний вектор. Если мы превышаем LIMIT отправляемся сканировать и удалять ненужные.
static void scan(std::vector<std::pair<void*, void(*)(void*)>>& retired_list, std::vector<void*>& active_hps) { active_hps.clear(); auto& global_hp_vec = get_global().all_records; // 1. Собираем все указатели у каждого потока, // защищая глобальный вектор через shared_mutex. { LFA::shared_lock_guard lock(get_global().registration_mutex); for (auto* t_hp : global_hp_vec) { void* p = t_hp->hp_ptr.load(std::memory_order_acquire); if (p) active_hps.push_back(p); } } // 2. Сортируем указатели по возрастанию для последующего бинарного поиска. std::sort(active_hps.begin(), active_hps.end()); // 3. Если не находим указатель из retired_list в active_hps, // значит удалять можно спокойно. auto it = std::remove_if(retired_list.begin(), retired_list.end(), [&](const auto& item) { if (!std::binary_search(active_hps.begin(), active_hps.end(), item.first)) { item.second(item.first); return true; } return false; }); // 4. Очищаем retired_list от удаленных элементов. retired_list.erase(it, retired_list.end());}
Ну и финальная функция. Извне получаем retired_list и active_hps — список претендентов на удаление и просто буферный вектор соответственно.
Спасибо за внимание, буду раз вашим замечаниям и пожеланиям.
ссылка на оригинал статьи https://habr.com/ru/articles/1048074/