Привет, друзья! Сегодня рассмотрим, как реализовать алгоритм Raft на Python.
Raft — это алгоритм распределённого консенсуса, который делает три вещи:
-
Выбирает лидера (тот, кто рулит кластером).
-
Реплицирует данные по всем узлам (чтобы не потерять, если что‑то пойдет не так).
-
Гарантирует согласованность данных (никакой битой записи в журнале).
Представьте себе группу людей, пытающихся решить, какую пиццу заказать. Если нет лидера — хаос. Если кто‑то заказал ананасовую, а другие решили, что это была шутка, — еще хуже. Вот Raft и помогает избежать этой катастрофы.
Структура проекта
Определимся, что мы собираемся написать:
-
Node
— сердце системы, представляет один узел кластера. -
Механизмы:
-
Выборы лидера.
-
Репликация журнала.
-
Управление состоянием.
-
-
Дополнительно:
-
Обработка отказов.
-
Оптимизация производительности.
-
Клиентская логика.
-
Тестирование и отказоустойчивость.
-
Узел кластера
Каждый узел в Raft знает:
-
Своё состояние (лидер, кандидат, или — чаще всего — просто унылый
follower
). -
Текущий термин (порядковый номер цикла выборов).
-
Свой журнал (лог операций).
Начнём с базовой структуры:
import random import threading import time from enum import Enum class State(Enum): FOLLOWER = 1 CANDIDATE = 2 LEADER = 3 class Node: def __init__(self, node_id, peers): self.id = node_id self.peers = peers # Список других узлов self.state = State.FOLLOWER self.current_term = 0 self.voted_for = None self.log = [] self.commit_index = -1 self.last_applied = -1 self.next_index = {} self.match_index = {} self.lock = threading.Lock() self.election_timeout = self.reset_election_timeout() self.disabled = False # Инициализация флага отключения self.timer = threading.Thread(target=self.run_election_timer, daemon=True) self.timer.start() def reset_election_timeout(self): return time.time() + random.uniform(5, 10)
Здесь определяем класс Node
с его основными атрибутами и инициализируем таймер выборов для каждого узла.
Таймер выборов
Узлы в Raft знают, что если лидер долго молчит, значит, пора искать нового. Здесь и поможет таймер выборов.
def run_election_timer(self): while True: time.sleep(0.1) with self.lock: if self.disabled: continue if time.time() >= self.election_timeout: print(f"Узел {self.id}: лидер потерян, начинаю выборы.") self.state = State.CANDIDATE self.current_term += 1 self.voted_for = self.id self.election_timeout = self.reset_election_timeout() threading.Thread(target=self.start_election, daemon=True).start()
Этот метод постоянно проверяет, не истёк ли таймаут, и при необходимости инициирует процесс выборов нового лидера.
Выборы
Когда узел становится кандидатом, он рассылает всем остальным запросы на голосование:
def start_election(self): votes = 1 # Голосуем за себя for peer in self.peers: if self.disabled: continue if peer.request_vote(self.current_term, self.id): votes += 1 if votes > len(self.peers) // 2: print(f"Узел {self.id}: выбран лидером!") self.become_leader() else: print(f"Узел {self.id}: выборы провалились.")
Здесь узел собирает голоса от своих собратьев. Если набирается большинство — он становится лидером.
Как голосуют узлы?
Каждый узел отвечает на запросы голосования:
def request_vote(self, term, candidate_id): with self.lock: if self.disabled: return False if term > self.current_term: self.current_term = term self.voted_for = None self.state = State.FOLLOWER if self.voted_for is None or self.voted_for == candidate_id: print(f"Узел {self.id}: голосую за {candidate_id}.") self.voted_for = candidate_id self.election_timeout = self.reset_election_timeout() return True else: print(f"Узел {self.id}: отказал в голосе {candidate_id}.") return False
Этот метод решает, дать ли свой голос кандидату, основываясь на текущем терминe и предыдущих голосах.
Лидерство
Если кандидат получает большинство голосов, он становится лидером:
def become_leader(self): self.state = State.LEADER print(f"Узел {self.id}: я лидер!") for peer in self.peers: self.next_index[peer.id] = len(self.log) self.match_index[peer.id] = -1 threading.Thread(target=self.send_heartbeats, daemon=True).start()
При становлении лидером узел инициализирует индексы для репликации журнала и начинает рассылку сердцебиений.
Сердцебиения
Лидер периодически шлёт всем узлам «сигналы жизни»:
def send_heartbeats(self): while True: with self.lock: if self.state != State.LEADER or self.disabled: break for peer in self.peers: if self.disabled: continue print(f"Лидер {self.id}: отправляю heartbeat узлу {peer.id}.") threading.Thread(target=self.append_entries, args=(peer,), daemon=True).start() time.sleep(1)
Этот цикл обеспечивает поддержание лидерства и синхронизацию журнала с другими узлами.
Репликация журнала
Когда клиент отправляет команду, лидер добавляет её в журнал и синхронизирует с другими узлами:
def client_command(self, command): with self.lock: if self.state != State.LEADER or self.disabled: print(f"Узел {self.id}: я не лидер, перенаправляю запрос.") return False entry = {'term': self.current_term, 'command': command} self.log.append(entry) print(f"Лидер {self.id}: добавляю команду {command} в журнал.") threading.Thread(target=self.replicate_log, daemon=True).start() return True
Здесь лидер обрабатывает команду клиента, добавляя её в свой журнал и инициируя процесс репликации на других узлах.
Репликация на другие узлы
def replicate_log(self): while True: with self.lock: if self.disabled: return replicated = 1 # Лидер уже имеет запись for peer in self.peers: if self.match_index.get(peer.id, -1) >= len(self.log) - 1: replicated += 1 if replicated > len(self.peers) // 2: self.commit_index = len(self.log) - 1 self.apply_log() break time.sleep(0.1)
Этот метод гарантирует, что запись будет реплицирована на большинстве узлов перед её применением.
Применение журнала к состоянию
Метод apply_log
применяет подтверждённые записи к состоянию узла:
def apply_log(self): with self.lock: while self.last_applied < self.commit_index: self.last_applied += 1 entry = self.log[self.last_applied] # Здесь мы применяем команду к состоянию print(f"Узел {self.id} применил команду: {entry['command']}")
Последовательно применяем команды из журнала к состоянию узла.
Обработка AppendEntries
Узлы должны уметь принимать записи от лидера:
def append_entries(self, peer): with self.lock: if self.disabled: return prev_log_index = self.next_index.get(peer.id, 0) - 1 prev_log_term = self.log[prev_log_index]['term'] if prev_log_index >= 0 and prev_log_index < len(self.log) else -1 entries = self.log[self.next_index.get(peer.id, 0):] term = self.current_term success = peer.receive_append_entries( term=term, leader_id=self.id, prev_log_index=prev_log_index, prev_log_term=prev_log_term, entries=entries, leader_commit=self.commit_index ) with self.lock: if self.disabled: return if success: self.match_index[peer.id] = self.next_index.get(peer.id, 0) + len(entries) - 1 self.next_index[peer.id] = self.match_index[peer.id] + 1 else: self.next_index[peer.id] = max(0, self.next_index.get(peer.id, 0) - 1)
Этот метод отправляет записи журнала другим узлам и обновляет индексы в зависимости от ответа.
А теперь реализуем обработку входящих AppendEntries
def receive_append_entries(self, term, leader_id, prev_log_index, prev_log_term, entries, leader_commit): with self.lock: if self.disabled: return False if term < self.current_term: return False self.state = State.FOLLOWER self.current_term = term self.election_timeout = self.reset_election_timeout() if prev_log_index >= 0: if len(self.log) <= prev_log_index or self.log[prev_log_index]['term'] != prev_log_term: return False # Добавляем новые записи, если их ещё нет for entry in entries: if len(self.log) > prev_log_index + 1: if self.log[prev_log_index + 1]['term'] != entry['term']: self.log = self.log[:prev_log_index + 1] self.log.append(entry) else: self.log.append(entry) if leader_commit > self.commit_index: self.commit_index = min(leader_commit, len(self.log) - 1) threading.Thread(target=self.apply_log, daemon=True).start() return True
Этот метод будет проверять согласованность журнала и обновлять его, если все в порядке.
Симуляция отказов
Чтобы протестировать отказоустойчивость, добавим возможность отключать узлы:
def disable(self): with self.lock: self.disabled = True print(f"Узел {self.id} отключен.") def enable(self): with self.lock: self.disabled = False self.election_timeout = self.reset_election_timeout() print(f"Узел {self.id} включен.")
И изменим методы отправки сообщений, чтобы учитывать состояние узла:
def request_vote(self, term, candidate_id): if getattr(self, 'disabled', False): return False # Остальной код... def receive_append_entries(self, *args, **kwargs): if getattr(self, 'disabled', False): return False # Остальной код...
Теперь можно симулировать отключение узлов и проверять, как кластер реагирует на это.
Протестируем кластер
Создадим несколько узлов и запустим их:
# main.py from node import Node, State import time if __name__ == "__main__": nodes = [] # Создаем все узлы сначала с пустыми peers for i in range(5): node = Node(node_id=i, peers=[]) nodes.append(node) # Теперь устанавливаем peers для каждого узла for node in nodes: node.peers = [peer for peer in nodes if peer.id != node.id] leader = None # Ждем, пока лидер не будет выбран while not leader: for node in nodes: if node.state == State.LEADER: leader = node break time.sleep(0.5) print(f"Лидер выбран: Узел {leader.id}") leader.client_command("Сохранить данные") # Отключаем лидера leader.disable() print(f"Узел {leader.id} отключен") time.sleep(15) # Проверяем, выбран ли новый лидер new_leader = None for node in nodes: if node.state == State.LEADER and not getattr(node, 'disabled', False): new_leader = node break if new_leader: print(f"Новый лидер: Узел {new_leader.id}") new_leader.client_command("Новая команда") else: print("Не удалось выбрать нового лидера.")
Скрипт создаст пять узлов, инициирует выборы лидера, отправит команды и симулирует отказ лидера. Вот что можно ожидать в консоли:
# main.py from node import Node, State import time if __name__ == "__main__": nodes = [] # Создаем все узлы сначала с пустыми peers for i in range(5): node = Node(node_id=i, peers=[]) nodes.append(node) # Теперь устанавливаем peers для каждого узла for node in nodes: node.peers = [peer for peer in nodes if peer.id != node.id] leader = None # Ждем, пока лидер не будет выбран while not leader: for node in nodes: if node.state == State.LEADER: leader = node break time.sleep(0.5) print(f"Лидер выбран: Узел {leader.id}") leader.client_command("Сохранить данные") # Отключаем лидера leader.disable() print(f"Узел {leader.id} отключен") time.sleep(15) # Проверяем, выбран ли новый лидер new_leader = None for node in nodes: if node.state == State.LEADER and not getattr(node, 'disabled', False): new_leader = node break if new_leader: print(f"Новый лидер: Узел {new_leader.id}") new_leader.client_command("Новая команда") else: print("Не удалось выбрать нового лидера.")
Вывод получится такой:
Узел 0: лидер потерян, начинаю выборы. Узел 0: голосую за 0. Узел 1: голосую за 0. Узел 2: голосую за 0. Узел 3: голосую за 0. Узел 4: голосую за 0. Узел 0: выбран лидером! Узел 0: я лидер! Лидер 0: отправляю heartbeat узлу 1. Лидер 0: отправляю heartbeat узлу 2. Лидер 0: отправляю heartbeat узлу 3. Лидер 0: отправляю heartbeat узлу 4. Узел 0: добавляю команду Сохранить данные в журнал. Лидер 0: отправляю heartbeat узлу 1. Лидер 0: отправляю heartbeat узлу 2. Лидер 0: отправляю heartbeat узлу 3. Лидер 0: отправляю heartbeat узлу 4. Узел 0 отключен Узел 1: лидер потерян, начинаю выборы. Узел 1: голосую за 1. Узел 2: голосую за 1. Узел 3: голосую за 1. Узел 4: голосую за 1. Узел 1: выбран лидером! Узел 1: я лидер! Лидер 1: отправляю heartbeat узлу 0. Лидер 1: отправляю heartbeat узлу 2. Лидер 1: отправляю heartbeat узлу 3. Лидер 1: отправляю heartbeat узлу 4. Новый лидер: Узел 1 Узел 1: добавляю команду Новая команда в журнал. Лидер 1: отправляю heartbeat узлу 0. Лидер 1: отправляю heartbeat узлу 2. Лидер 1: отправляю heartbeat узлу 3. Лидер 1: отправляю heartbeat узлу 4. Узел 1: применил команду: Новая команда
В начале узлы начинают выборы лидера и узел 0 становится первым лидером после получения большинства голосов. Затем лидер 0 начинает отправлять сердцебиения и реплицировать команду «Сохранить данные». Когда мы отключаем лидера 0, узел 1 обнаруживает его отсутствие, инициирует новые выборы и становится новым лидером, после чего успешно обрабатывает и реплицирует команду «Новая команда».
Но тут сделаю замечание, что из‑за асинхронности и случайных таймаутов порядок событий может меняться при каждом запуске.
Если захотите прикрутить Raft в реальный проект, то нужно будет добавить обработку сетевых сбоев, механизм «догонки» отставших узлов, и проверки корректности журналов. Улучшите производительность, перейдя на асинхронное программирование, минимизируйте блокировки и оптимизируйте передачу данных. Реализуйте обратную связь для клиентов, чтобы они получали подтверждения, и протестируйте систему в сложных сценариях отказов.
Всех желающих приглашаю на открытые уроки по архитектуре высоких нагрузок:
9 декабря: «Обеспечение отказоустойчивости хранилищ» — Вы узнаете, как правильно проектировать и настраивать хранилища, чтобы минимизировать простои и предотвратить потерю данных при сбоях. Записаться
16 декабря: «Распределённые транзакции — как добиться согласования данных в распределённой сети». Записаться
ссылка на оригинал статьи https://habr.com/ru/articles/862550/
Добавить комментарий