Привет.
Я потратил кучу времени на прочтение статей и книжек про эти указатели, много комплексного текста и мало схемок и примеров, а как по мне, они упрощают в разы понимание.
Буду называть «Hazard pointers» как «Хазарды». Тут я не буду вдаваться в детали и теорию про memory ordering, а буду стараться сформировать «прикладное» понимание вопроса. Эта статья больше как поверхностное введение, например для параграфа про одноименные указатели в «C++ Concurrency in Action» от Энтони Уильямса, нежели что‑то на чем стоит останавливаться слишком надолго.
Перво‑наперво, Хазарды позволяют безопасно освобождать память. Заодно они решают и ABA‑проблему — пока поток держит указатель в своём HP, этот узел никто не удалит и его адрес не переиспользует.
Нужда в первом возникает, когда больше одного потока работают с структурой. Например поток X взял указатель на вершину стэка → X засыпает → поток Y удаляет это вершину → X просыпается и читает значение по удаленному адресу.
Ну а на счет ABA проблемы. Что‑же это такое и когда она возникает?
Сперва стоит понимать, что она появляется в алгоритмах, где используется CAS, например Lock‑free стэк.
Допустим у нас 2 потока X и Y.
-
X: заходит в функцию стэка
-
X: берет указатель на верхний элемент структуры и считывает значение
-
X: Засыпает по воле планировщика
-
Y: выталкивает текущую вершину
-
Y: кладет новый элемент в стэк, но оказалось, что этот новый узел имеет тот же адрес, потому что память от удалённого узла была переиспользована аллокатором
-
X: через CAS проверяет что текущий узел лежит по тому же адресу, успех и возвращает значение, которое уже было удалено
Это гадкая проблема, она приводит к инвалидации структуры, так и к её полному разрушению или даже завершению процесса.
Теперь перейдем к самим Хазардам, как бы нам решить эти проблемы? Перечислим, что нам надо:
-
Понимать, что эту вершину удалять сейчас нельзя
-
Как‑то откладывать удаление и производить его, когда никто не держит эту вершину
Из этого следует, что нам нужен общий менеджер, который курирует всеми хазардами, удаляет ненужные и запоминает те, что еще используются кем‑то.
Этим условиям соответствует данная схема.
В данной архитектуре каждый поток держит у себя hp_ptr(указатель), с которым он работает. Этот указатель могут читать другие потоки. Также имеется личный retired_list, в который он складывает то, что надо будет удалить. Вектор all_records держит указатели на эти хранилища, чтобы каждый поток мог прочитать их. Число потоков может быть любым.
Сформулируем необходимый перечень функционала для HazardManager:
-
Защитить указатель
-
Снять защиту
-
Запомнить указатель для последующего удаления
-
Сбор используемых указателей и удаление ненужных
-
Регистрация новых потоков для all_records
Если вам интересна реализация или вы хотите увидеть пример кода, то идем дальше. Но я бы советовал самостоятельно реализовать для понимания. Все таки решить эту задачу можно разными способами. Учтите, код далее должен быть аккуратно внедрен в алгоритм структуры при использовании.
Я решил использовать Meyers singleton для глобального и локального состояния. На namespace LFA не обращайте внимание, это моя локальная библиотека велосипедов.
LIMIT — число элементов в retired_list после которого поток пытается произвести очистку. TAG — для множественного инстанцирования в разных модулях и участках кода.
template<std::size_t LIMIT, std::size_t TAG>
class HazardPointersManager {
private:
struct ThreadRecord {
std::atomic<void*> hp_ptr{ nullptr };
std::vector<std::pair<void*, void(*)(void*)>> retired_list;
// Макрооптимизация, сюда мы складываем чужие указатели во время scan
std::vector<void*> active_hps_buffer;
};
struct GlobalState {
std::vector<ThreadRecord*> all_records;
LFA::shared_mutex<Backoff::pause> registration_mutex;
};
static GlobalState& get_global() {
static GlobalState state;
return state;
}
static ThreadRecord& get_local_record() {
static thread_local ThreadRecord record;
return record;
}
public:
// Единжды вызываем
static void register_thread() {
auto& record = get_local_record();
auto& state = get_global();
LFA::unique_lock_guard lock(state.registration_mutex);
state.all_records.push_back(&record);
}
...
}
Вроде пока просто, всего лишь обертка для того чтобы выполнялись условия ТЗ и схемки. Тут используется трюк с thread_local, чтобы каждый поток сам для себя создавал ThreadRecord.
Ну и регистрация как видите также делается примитивным образом, под защитой мьютекса.
Далее пойдут функции, учитывайте, что они также внутри класса HazardPointersManager.
static void protect(void* ptr) {
auto& record = get_local_record();
record.hp_ptr.store(ptr, std::memory_order::release);
}
static void unprotect() {
auto& record = get_local_record();
record.hp_ptr.store(nullptr, std::memory_order::release);
}
Как я и говорил, protect, просто атомарная запись, ничего сверхъестественного. УЧТИТЕ: вы должны сами проверять после protect, вы защищаете валидный указатель и он или сама структура не изменились так, что он более нам не подходит.
static void retire(void* ptr, void (*deleter)(void*)) {
auto& vec = get_local_record().retired_list;
vec.push_back({ ptr, deleter });
if (vec.size() >= LIMIT) {
auto& active_hps = get_local_record().active_hps_buffer;
scan(vec, active_hps);
}
}
Функция retire Принимает сам указатель и функцию удаления и кладет их в свой внутренний вектор. Если мы превышаем LIMIT отправляемся сканировать и удалять ненужные.
static void scan(std::vector<std::pair<void*, void(*)(void*)>>& retired_list,
std::vector<void*>& active_hps) {
active_hps.clear();
auto& global_hp_vec = get_global().all_records;
// 1. Собираем все указатели у каждого потока,
// защищая глобальный вектор через shared_mutex.
{
LFA::shared_lock_guard lock(get_global().registration_mutex);
for (auto* t_hp : global_hp_vec) {
void* p = t_hp->hp_ptr.load(std::memory_order_acquire);
if (p) active_hps.push_back(p);
}
}
// 2. Сортируем указатели по возрастанию для последующего бинарного поиска.
std::sort(active_hps.begin(), active_hps.end());
// 3. Если не находим указатель из retired_list в active_hps,
// значит удалять можно спокойно.
auto it = std::remove_if(retired_list.begin(), retired_list.end(), [&](const auto& item) {
if (!std::binary_search(active_hps.begin(), active_hps.end(), item.first)) {
item.second(item.first);
return true;
}
return false;
});
// 4. Очищаем retired_list от удаленных элементов.
retired_list.erase(it, retired_list.end());
}
Ну и финальная функция. Извне получаем retired_list и active_hps — список претендентов на удаление и просто буферный вектор соответственно.
Спасибо за внимание, буду раз вашим замечаниям и пожеланиям.
Автор: cppmage
