Hazard pointers на пальцах

от автора

Привет, Хабр.

Я потратил кучу времени на прочтение статей и книжек про эти указатели, много комплексного текста и мало схемок и примеров, а как по мне, они упрощают в разы понимание.

Буду называть «Hazard pointers» как «Хазарды». Тут я не буду вдаваться в детали и теорию про memory ordering, а буду стараться сформировать «прикладное» понимание вопроса. Эта статья больше как поверхностное введение, например для параграфа про одноименные указатели в «C++ Concurrency in Action» от Энтони Уильямса, нежели что‑то на чем стоит останавливаться слишком надолго.

Перво‑наперво, Хазарды позволяют безопасно освобождать память. Заодно они решают и ABA‑проблему — пока поток держит указатель в своём HP, этот узел никто не удалит и его адрес не переиспользует.

Нужда в первом возникает, когда больше одного потока работают с структурой. Например поток X взял указатель на вершину стэка → X засыпает → поток Y удаляет это вершину → X просыпается и читает значение по удаленному адресу.

Ну а на счет ABA проблемы. Что‑же это такое и когда она возникает?

Сперва стоит понимать, что она появляется в алгоритмах, где используется CAS, например Lock‑free стэк.

Допустим у нас 2 потока X и Y.

  • X: заходит в функцию стэка

  • X: берет указатель на верхний элемент структуры и считывает значение

  • X: Засыпает по воле планировщика

  • Y: выталкивает текущую вершину

  • Y: кладет новый элемент в стэк, но оказалось, что этот новый узел имеет тот же адрес, потому что память от удалённого узла была переиспользована аллокатором

  • X: через CAS проверяет что текущий узел лежит по тому же адресу, успех и возвращает значение, которое уже было удалено

Это гадкая проблема, она приводит к инвалидации структуры, так и к её полному разрушению или даже завершению процесса.

Теперь перейдем к самим Хазардам, как бы нам решить эти проблемы? Перечислим, что нам надо:

  • Понимать, что эту вершину удалять сейчас нельзя

  • Как‑то откладывать удаление и производить его, когда никто не держит эту вершину

Из этого следует, что нам нужен общий менеджер, который курирует всеми хазардами, удаляет ненужные и запоминает те, что еще используются кем‑то.

Этим условиям соответствует данная схема.

Hazard Pointers Architecture

Hazard Pointers Architecture

В данной архитектуре каждый поток держит у себя hp_ptr(указатель), с которым он работает. Этот указатель могут читать другие потоки. Также имеется личный retired_list, в который он складывает то, что надо будет удалить. Вектор all_records держит указатели на эти хранилища, чтобы каждый поток мог прочитать их. Число потоков может быть любым.

Сформулируем необходимый перечень функционала для HazardManager:

  • Защитить указатель

  • Снять защиту

  • Запомнить указатель для последующего удаления

  • Сбор используемых указателей и удаление ненужных

  • Регистрация новых потоков для all_records

Если вам интересна реализация или вы хотите увидеть пример кода, то идем дальше. Но я бы советовал самостоятельно реализовать для понимания. Все таки решить эту задачу можно разными способами. Учтите, код далее должен быть аккуратно внедрен в алгоритм структуры при использовании.

Я решил использовать Meyers singleton для глобального и локального состояния. На namespace LFA не обращайте внимание, это моя локальная библиотека велосипедов.

LIMIT — число элементов в retired_list после которого поток пытается произвести очистку. TAG — для множественного инстанцирования в разных модулях и участках кода.

template<std::size_t LIMIT, std::size_t TAG>class HazardPointersManager {private:  struct ThreadRecord {      std::atomic<void*> hp_ptr{ nullptr };      std::vector<std::pair<void*, void(*)(void*)>> retired_list;      // Макрооптимизация, сюда мы складываем чужие указатели во время scan      std::vector<void*> active_hps_buffer;   };    struct GlobalState {      std::vector<ThreadRecord*> all_records;      LFA::shared_mutex<Backoff::pause> registration_mutex;  };    static GlobalState& get_global() {      static GlobalState state;      return state;  }    static ThreadRecord& get_local_record() {      static thread_local ThreadRecord record;      return record;  }public:    // Единжды вызываем    static void register_thread() {        auto& record = get_local_record();        auto& state = get_global();        LFA::unique_lock_guard lock(state.registration_mutex);        state.all_records.push_back(&record);    }  ...}

Вроде пока просто, всего лишь обертка для того чтобы выполнялись условия ТЗ и схемки. Тут используется трюк с thread_local, чтобы каждый поток сам для себя создавал ThreadRecord.

Ну и регистрация как видите также делается примитивным образом, под защитой мьютекса.

Далее пойдут функции, учитывайте, что они также внутри класса HazardPointersManager.

static void protect(void* ptr) {    auto& record = get_local_record();    record.hp_ptr.store(ptr, std::memory_order::release);}static void unprotect() {    auto& record = get_local_record();    record.hp_ptr.store(nullptr, std::memory_order::release);}

Как я и говорил, protect, просто атомарная запись, ничего сверхъестественного. УЧТИТЕ: вы должны сами проверять после protect, вы защищаете валидный указатель и он или сама структура не изменились так, что он более нам не подходит.

static void retire(void* ptr, void (*deleter)(void*)) {    auto& vec = get_local_record().retired_list;    vec.push_back({ ptr, deleter });    if (vec.size() >= LIMIT) {        auto& active_hps = get_local_record().active_hps_buffer;        scan(vec, active_hps);    }}

Функция retire Принимает сам указатель и функцию удаления и кладет их в свой внутренний вектор. Если мы превышаем LIMIT отправляемся сканировать и удалять ненужные.

static void scan(std::vector<std::pair<void*, void(*)(void*)>>& retired_list,                  std::vector<void*>& active_hps) {    active_hps.clear();    auto& global_hp_vec = get_global().all_records;      // 1. Собираем все указатели у каждого потока,     // защищая глобальный вектор через shared_mutex.    {        LFA::shared_lock_guard lock(get_global().registration_mutex);        for (auto* t_hp : global_hp_vec) {            void* p = t_hp->hp_ptr.load(std::memory_order_acquire);            if (p) active_hps.push_back(p);        }    }      // 2. Сортируем указатели по возрастанию для последующего бинарного поиска.    std::sort(active_hps.begin(), active_hps.end());      // 3. Если не находим указатель из retired_list в active_hps,     // значит удалять можно спокойно.     auto it = std::remove_if(retired_list.begin(), retired_list.end(), [&](const auto& item) {        if (!std::binary_search(active_hps.begin(), active_hps.end(), item.first)) {            item.second(item.first);            return true;         }        return false;         });      // 4. Очищаем retired_list от удаленных элементов.    retired_list.erase(it, retired_list.end());}

Ну и финальная функция. Извне получаем retired_list и active_hps — список претендентов на удаление и просто буферный вектор соответственно.

Полный код

Спасибо за внимание, буду раз вашим замечаниям и пожеланиям.

ссылка на оригинал статьи https://habr.com/ru/articles/1048074/