Дедлоки — распространенная проблема в многопоточном программировании. В больших приложениях вручную отслеживать порядок блокировок может быть достаточно сложно, причем эта проблема может не всплыть на этапе тестирования и случиться только в каких-то сложновоспроизводимых кейсах при реальном использовании. Существует много способов их избегания, но здесь мы рассмотрим только один — автоматическое выявление дедлоков на основе графа ожидания.
Эта статья — продолжение серии про многопоточное программирование на Python. Предыдущая была про хранение настроек в многопоточном приложении, но читать её для понимания этой не обязательно. Вся серия предназначена для программистов, знакомых с базовыми концепциями многопоточного программирования.
Примеры кода, которые вы здесь увидите, взяты из библиотеки locklib.
Как можно обнаружить дедлок?
Вообще говоря, существует два основных подхода к обнаружению дедлоков в программе:
-
На этапе статического анализа исходного кода или компиляции.
-
В рантайме.
К сожалению, статический анализ работает только в каких-то строго определённых случаях. Общего решения здесь не существует, поскольку оно было бы эквивалентно решению проблемы останова, а это считается невозможным. Кроме того, даже существующие способы статического обнаружения гораздо лучше работают в статически типизированных языках (например, в Go или C++). Для Python ни одного вменяемого решения при подготовке статьи я не нашёл.
Все способы обнаружения дедлоков в рантайме так или иначе усложняют или замедляют программу. Чем точнее способ, тем больше создаваемый им оверхед. К примеру, в том же Go используется очень неточный, но потому и малозатратный способ обнаружения дедлоков. Там дедлок будет обнаружен только если в программе не останется вообще ни одного «живого» потока. С другой стороны, способы на основе графа ожидания гораздо дороже, но могут обнаруживать любые дедлоки.
Что такое граф ожидания?
Обычно под капотом у мьютекса есть очередь, в которую за ним выстраиваются потоки. Когда один поток освобождает мьютекс, следующий в очереди разблокируется и делает свои дела. Таким образом, каждый из ожидающих потоков всегда либо ожидает какой-то конкретный другой поток, либо (если он первый в очереди) приступает к своим делам сразу. Получается, что мы можем построить граф ожидающих друг друга потоков, где нодами будут потоки, а рёбрами — объекты мьютексов. Дедлок будет выглядеть как цикл на таком графе.
Если мы хотим избежать дедлоков, нам нужно перед каждой операцией взятия лока проверять граф на наличие циклов обходом в глубину и в случае обнаружения — блокировку не давать. Альтернативный вариант (но здесь мы на нём подробно останавливаться не будем) заключается в том, чтобы блокировку дать сразу, но с таймаутом — и идти в граф только в случае его превышения.
Графы ожидания сегодня широко используются, к примеру, в СУБД (PostgreSQL таким образом ищет дедлоки в транзакциях, а в MySQL так делалось в старых версиях, сейчас же применяется похожий, но несколько модифицированный алгоритм), однако этот метод не лишён недостатков. Главный из них состоит в том, что получение каждой блокировки становится не просто дороже (поскольку включает в себя дополнительные действия), но и требует, по крайней мере на некоторое время, взять глобальную блокировку, которой, в свою очередь, защищён сам граф ожидания. Это приводит к увеличению доли последовательного кода в программе и, согласно закону Амдала, бьёт по выгоде от параллелизации вычислений вообще.
Приступим
Наша задача — написать нечто, что для пользователя будет работать примерно как обычный Lock из стандартной библиотеки (правда, без поддержки опциональных аргументов в данной конкретной реализации, то есть, например, без возможности указать продолжительность таймаута), но чтобы при попытке взять блокировку, приводящую к дедлоку, поднималось исключение. Внутри этой штуки, очевидно, где-то должен быть граф — и за него мы возьмёмся в первую очередь.
«Верхушка» класса, объектом которого является граф, выглядит так:
from threading import Lock from collections import defaultdict from locklib.errors import DeadLockError # Исключение, которое должно подниматься при обнаружении дедлока. class LocksGraph: def __init__(self): self.links = defaultdict(set) self.lock = Lock()
Как видно, для защиты доступа к графу используется обычный Lock
. Это необходимо, поскольку иначе разные потоки смогут редактировать его параллельно — и в отдельные моменты времени «посередине» он может оказаться неконсистентным. Кроме того, для хранения связей в графе мы используем словарь, в котором ключи — идентификаторы потоков, а значения — множества, заполненные теми же самыми идентификаторами.
По сути, наш граф должен поддерживать только две базовые операции:
-
Создать связь между двумя потоками. При этом нужно проверить, не образуется ли от этого цикл, — и, если да, поднять
DeadLockError
. -
Удалить её.
Добавление связи выглядит так:
def add_link(self, _from, to): cycle_with = self.search_cycles(to, _from) if cycle_with is not None: cycle_with = ', '.join([str(x) for x in cycle_with]) raise DeadLockError(f'A cycle between {_from}th and {to}th threads has been detected. The full path of the cycle: {cycle_with}.') self.links[_from].add(to)
А удаление — так:
def delete_link(self, _from, to): if _from in self.links: if to in self.links[_from]: self.links[_from].remove(to) if not self.links[_from]: del self.links[_from]
Мы могли бы здесь не делать лишних проверок — тогда методы выглядели бы ещё проще.
Ну и центральная часть — обход графа для обнаружения циклов — это всего лишь три коротких метода:
def get_links_from(self, _from): return self.links[_from] def dfs(self, path, current_node, target): path.append(current_node) neighbors = self.get_links_from(current_node) if neighbors: for link in neighbors: if link == target: path.append(target) return path result_of_next_search = self.dfs(path, link, target) if result_of_next_search is not None: return result_of_next_search path.pop() def search_cycles(self, _from, to): return self.dfs([], _from, to)
Входной точкой здесь служит метод search_cycles()
, который по итогу возвращает None
, если цикл не найден, либо список идентификаторов потоков, его образующих, если таки да. Список потом используется для составления человекочитаемого описания в поднимаемом исключении.
Итак, как видим, в графе нет ничего сложного. Из важного хотелось бы отметить, что внутри методов графа нигде не используется прикреплённая к нему блокировка. На самом деле она предназначена для использования в вызывающем коде, а не в самом графе. Собственно, к вызывающему коду мы и переходим.
«Верхушка» класса лока выглядит так:
from threading import Lock, get_native_id from collections import deque from locklib.locks.smart_lock.graph import LocksGraph graph = LocksGraph() class SmartLock: def __init__(self, local_graph=graph): self.graph = local_graph self.lock = Lock() self.deque = deque() self.local_locks = {}
Как видим, здесь используется глобальная переменная — экземпляр графа, который мы описали выше. Также мы видим экземпляр обычного лока — ведь наш является, по сути, обёрткой вокруг него.
self.deque
и self.local_locks
используются, как мы увидим дальше, для воспроизведения семантики обычного лока, внутри которого находится очередь ожидающих тредов. В словарь self.local_locks
каждый поток кладёт экземпляр лока под ключом в виде собственного идентификатора. В очереди self.deque
последовательно лежат идентификаторы потоков, которые попытались взять данную блокировку. Каждый новый поток добавляется в очередь, кладёт в словарь свой лок и берёт лок предыдущего потока. Получается что-то вроде этого:
Объект лока предлагает, по сути, два возможных действия: взять лок и отпустить его. В данном случае они представлены методами acquire()
и release()
.
Начнём с acquire()
. Алгоритм действий здесь следующий:
1. Взять лок внутри нашего объекта лока. 2. Взять лок на граф. 3. Если данный поток берёт эту блокировку первым (то есть он первый в очереди): 3.1. Положить id текущего потока в очередь. 3.2. Используя id потока в качестве ключа, положить в словарь self.local_locks объект обычного лока и сразу взять его. 4. Если очередь ожидающих потоков не пуста (то есть данный поток не первый, кто взял этот лок): 4.1. Создать связь в графе между идентификатором текущего потока и идентификатором последнего потока в очереди. 4.2. Положить id текущего потока в очередь. 4.3. Используя id потока в качестве ключа, положить в словарь self.local_locks объект обычного лока и сразу взять его. 4.4. Взять из очереди id предыдущего потока и по нему извлечь из словаря объект блокировки, который тот туда положил. 5. Отпустить лок на граф. 6. Отпустить лок внутри нашего лока. 7. Если очередь потоков была не пуста: 7.1. Взять блокировку, оставленную предыдущим потоком, которую мы успели извлечь из словаря на шаге 4.4.
В коде же это выглядит так:
def acquire(self): id = get_native_id() previous_element_lock = None with self.lock: with self.graph.lock: if not self.deque: self.deque.appendleft(id) self.local_locks[id] = Lock() self.local_locks[id].acquire() else: previous_element = self.deque[0] self.graph.add_link(id, previous_element) self.deque.appendleft(id) self.local_locks[id] = Lock() self.local_locks[id].acquire() previous_element_lock = self.local_locks[previous_element] if previous_element_lock is not None: previous_element_lock.acquire()
Теперь release()
. Начнём снова с алгоритма:
1, 2. Взять локи внутри нашего объекта лока и внутри графа. 3. Удалить id текущего потока из очереди. 4. Достать из словаря self.local_locks объект лока по id текущего потока и сохранить его в переменную. 5. Удалить из словаря self.local_locks ключ, соответствующий id текущего потока. 6. Если за текущим потоком в очереди есть другие потоки: 6.1. Получить из очереди id потока, который попытался взять блокировку следующим. 6.2. Удалить из графа связь между текущим потоком и следующим. 7. Отпустить лок, извлечённый из словаря на шаге 4. 8, 9. Отпустить лок на граф и лок на лок.
И снова код:
def release(self): id = get_native_id() with self.lock: with self.graph.lock: if id not in self.local_locks: raise RuntimeError('Release unlocked lock.') self.deque.pop() lock = self.local_locks[id] del self.local_locks[id] if len(self.deque) != 0: next_element = self.deque[-1] self.graph.delete_link(next_element, id) lock.release()
Важным отличием от стандартного Lock()
здесь является то, что отпустить наш лок может только взявший его поток, в то время как стандартный Lock()
может отпустить и любой другой. Иначе пришлось бы сильно усложнить логику, связанную с перестройкой очереди и словаря с локами, для поддержания консистентности внутреннего состояния.
Что с производительностью?
Очевидно, что, поскольку наш лок является обёрткой над примитивом из стандартной библиотеки, добавляющей обязательные действия к каждой операции взятия/отпускания, он гарантированно будет медленнее. Вопрос только в том, насколько.
За основу возьмём скорость работы стандартного Lock
, которую замерим с помощью следующего кода:
from time import perf_counter from threading import Lock lock = Lock() t1 = perf_counter() for _ in range(100000000): lock.acquire() lock.release() print(perf_counter() - t1)
На моём компьютере (MacBook Air на M1) при первом запуске этого кода вышло 18,812158417 секунды на 100 000 000 итераций, то есть примерно 5 315 711 итераций в секунду.
Теперь протестируем наш лок (код отличается только во второй строчке):
from time import perf_counter from locklib import SmartLock as Lock lock = Lock() t1 = perf_counter() for _ in range(100000000): lock.acquire() lock.release() print(perf_counter() - t1)
У меня при первом запуске вышло 216,22566979200002 секунды, то есть примерно 462 479 итераций в секунду. Это чуть более, чем в десять раз медленнее.
Я протестировал время работы в вырожденном случае, когда один поток постоянно берёт и отпускает блокировку, однако в реальности оно может варьироваться в широких пределах, в зависимости от размера и конкретной топологии графа ожидания.
Стоит оно того или нет — решать вам. Если время взятия/отпускания лока ничтожно по сравнению с прочими операциями, выполняемыми внутри потока, это может быть весьма выгодной сделкой, ведь взамен вы получаете безопасность вашего кода.
А это можно скачать бесплатно из интернета?
Библиотека доступна в PyPI:
$ pip install locklib
Пробуем импортировать из нее лок и спровоцировать состояние гонки:
from threading import Thread from locklib import SmartLock lock_1 = SmartLock() lock_2 = SmartLock() def function_1(): while True: with lock_1: with lock_2: pass def function_2(): while True: with lock_2: with lock_1: pass thread_1 = Thread(target=function_1) thread_2 = Thread(target=function_2) thread_1.start() thread_2.start()
Получаем исключение!
... locklib.errors.DeadLockError: A cycle between 1970256th and 1970257th threads has been detected.
Итак, мы разобрались:
-
Какие вообще существуют способы обнаружения дедлоков и что такое граф ожидания.
-
Как реализовать граф ожидания на Python.
-
Как он влияет на производительность в сравнении с обычной блокировкой.
-
Где можно скачать библиотеку с приведённым в статье кодом.
Теперь, когда у вас появилось новое знание, помните, что знание — сила, а большая сила — это большая ответственность.
ссылка на оригинал статьи https://habr.com/ru/company/ozontech/blog/671620/
Добавить комментарий