Как написать Raft на чистом Python: основы

от автора

Привет, друзья! Сегодня рассмотрим, как реализовать алгоритм Raft на Python.

Raft — это алгоритм распределённого консенсуса, который делает три вещи:

  1. Выбирает лидера (тот, кто рулит кластером).

  2. Реплицирует данные по всем узлам (чтобы не потерять, если что‑то пойдет не так).

  3. Гарантирует согласованность данных (никакой битой записи в журнале).

Представьте себе группу людей, пытающихся решить, какую пиццу заказать. Если нет лидера — хаос. Если кто‑то заказал ананасовую, а другие решили, что это была шутка, — еще хуже. Вот Raft и помогает избежать этой катастрофы.

Структура проекта

Определимся, что мы собираемся написать:

  • Node — сердце системы, представляет один узел кластера.

  • Механизмы:

    • Выборы лидера.

    • Репликация журнала.

    • Управление состоянием.

  • Дополнительно:

    • Обработка отказов.

    • Оптимизация производительности.

    • Клиентская логика.

    • Тестирование и отказоустойчивость.

Узел кластера

Каждый узел в Raft знает:

  • Своё состояние (лидер, кандидат, или — чаще всего — просто унылый follower).

  • Текущий термин (порядковый номер цикла выборов).

  • Свой журнал (лог операций).

Начнём с базовой структуры:

import random import threading import time from enum import Enum  class State(Enum):     FOLLOWER = 1     CANDIDATE = 2     LEADER = 3  class Node:     def __init__(self, node_id, peers):         self.id = node_id         self.peers = peers  # Список других узлов         self.state = State.FOLLOWER         self.current_term = 0         self.voted_for = None         self.log = []         self.commit_index = -1         self.last_applied = -1         self.next_index = {}         self.match_index = {}         self.lock = threading.Lock()         self.election_timeout = self.reset_election_timeout()         self.disabled = False  # Инициализация флага отключения         self.timer = threading.Thread(target=self.run_election_timer, daemon=True)         self.timer.start()      def reset_election_timeout(self):         return time.time() + random.uniform(5, 10)

Здесь определяем класс Node с его основными атрибутами и инициализируем таймер выборов для каждого узла.

Таймер выборов

Узлы в Raft знают, что если лидер долго молчит, значит, пора искать нового. Здесь и поможет таймер выборов.

def run_election_timer(self):     while True:         time.sleep(0.1)         with self.lock:             if self.disabled:                 continue             if time.time() >= self.election_timeout:                 print(f"Узел {self.id}: лидер потерян, начинаю выборы.")                 self.state = State.CANDIDATE                 self.current_term += 1                 self.voted_for = self.id                 self.election_timeout = self.reset_election_timeout()                 threading.Thread(target=self.start_election, daemon=True).start()

Этот метод постоянно проверяет, не истёк ли таймаут, и при необходимости инициирует процесс выборов нового лидера.

Выборы

Когда узел становится кандидатом, он рассылает всем остальным запросы на голосование:

def start_election(self):     votes = 1  # Голосуем за себя     for peer in self.peers:         if self.disabled:             continue         if peer.request_vote(self.current_term, self.id):             votes += 1     if votes > len(self.peers) // 2:         print(f"Узел {self.id}: выбран лидером!")         self.become_leader()     else:         print(f"Узел {self.id}: выборы провалились.")

Здесь узел собирает голоса от своих собратьев. Если набирается большинство — он становится лидером.

Как голосуют узлы?

Каждый узел отвечает на запросы голосования:

def request_vote(self, term, candidate_id):     with self.lock:         if self.disabled:             return False         if term > self.current_term:             self.current_term = term             self.voted_for = None             self.state = State.FOLLOWER         if self.voted_for is None or self.voted_for == candidate_id:             print(f"Узел {self.id}: голосую за {candidate_id}.")             self.voted_for = candidate_id             self.election_timeout = self.reset_election_timeout()             return True         else:             print(f"Узел {self.id}: отказал в голосе {candidate_id}.")             return False

Этот метод решает, дать ли свой голос кандидату, основываясь на текущем терминe и предыдущих голосах.

Лидерство

Если кандидат получает большинство голосов, он становится лидером:

def become_leader(self):     self.state = State.LEADER     print(f"Узел {self.id}: я лидер!")     for peer in self.peers:         self.next_index[peer.id] = len(self.log)         self.match_index[peer.id] = -1     threading.Thread(target=self.send_heartbeats, daemon=True).start()

При становлении лидером узел инициализирует индексы для репликации журнала и начинает рассылку сердцебиений.

Сердцебиения

Лидер периодически шлёт всем узлам «сигналы жизни»:

def send_heartbeats(self):     while True:         with self.lock:             if self.state != State.LEADER or self.disabled:                 break         for peer in self.peers:             if self.disabled:                 continue             print(f"Лидер {self.id}: отправляю heartbeat узлу {peer.id}.")             threading.Thread(target=self.append_entries, args=(peer,), daemon=True).start()         time.sleep(1)

Этот цикл обеспечивает поддержание лидерства и синхронизацию журнала с другими узлами.

Репликация журнала

Когда клиент отправляет команду, лидер добавляет её в журнал и синхронизирует с другими узлами:

def client_command(self, command):     with self.lock:         if self.state != State.LEADER or self.disabled:             print(f"Узел {self.id}: я не лидер, перенаправляю запрос.")             return False         entry = {'term': self.current_term, 'command': command}         self.log.append(entry)         print(f"Лидер {self.id}: добавляю команду {command} в журнал.")         threading.Thread(target=self.replicate_log, daemon=True).start()         return True

Здесь лидер обрабатывает команду клиента, добавляя её в свой журнал и инициируя процесс репликации на других узлах.

Репликация на другие узлы

def replicate_log(self):     while True:         with self.lock:             if self.disabled:                 return             replicated = 1  # Лидер уже имеет запись             for peer in self.peers:                 if self.match_index.get(peer.id, -1) >= len(self.log) - 1:                     replicated += 1             if replicated > len(self.peers) // 2:                 self.commit_index = len(self.log) - 1                 self.apply_log()                 break         time.sleep(0.1)

Этот метод гарантирует, что запись будет реплицирована на большинстве узлов перед её применением.

Применение журнала к состоянию

Метод apply_log применяет подтверждённые записи к состоянию узла:

def apply_log(self):     with self.lock:         while self.last_applied < self.commit_index:             self.last_applied += 1             entry = self.log[self.last_applied]             # Здесь мы применяем команду к состоянию             print(f"Узел {self.id} применил команду: {entry['command']}")

Последовательно применяем команды из журнала к состоянию узла.

Обработка AppendEntries

Узлы должны уметь принимать записи от лидера:

def append_entries(self, peer):     with self.lock:         if self.disabled:             return         prev_log_index = self.next_index.get(peer.id, 0) - 1         prev_log_term = self.log[prev_log_index]['term'] if prev_log_index >= 0 and prev_log_index < len(self.log) else -1         entries = self.log[self.next_index.get(peer.id, 0):]         term = self.current_term     success = peer.receive_append_entries(         term=term,         leader_id=self.id,         prev_log_index=prev_log_index,         prev_log_term=prev_log_term,         entries=entries,         leader_commit=self.commit_index     )     with self.lock:         if self.disabled:             return         if success:             self.match_index[peer.id] = self.next_index.get(peer.id, 0) + len(entries) - 1             self.next_index[peer.id] = self.match_index[peer.id] + 1         else:             self.next_index[peer.id] = max(0, self.next_index.get(peer.id, 0) - 1)

Этот метод отправляет записи журнала другим узлам и обновляет индексы в зависимости от ответа.

А теперь реализуем обработку входящих AppendEntries

def receive_append_entries(self, term, leader_id, prev_log_index, prev_log_term, entries, leader_commit):     with self.lock:         if self.disabled:             return False         if term < self.current_term:             return False         self.state = State.FOLLOWER         self.current_term = term         self.election_timeout = self.reset_election_timeout()         if prev_log_index >= 0:             if len(self.log) <= prev_log_index or self.log[prev_log_index]['term'] != prev_log_term:                 return False         # Добавляем новые записи, если их ещё нет         for entry in entries:             if len(self.log) > prev_log_index + 1:                 if self.log[prev_log_index + 1]['term'] != entry['term']:                     self.log = self.log[:prev_log_index + 1]                     self.log.append(entry)             else:                 self.log.append(entry)         if leader_commit > self.commit_index:             self.commit_index = min(leader_commit, len(self.log) - 1)             threading.Thread(target=self.apply_log, daemon=True).start()         return True

Этот метод будет проверять согласованность журнала и обновлять его, если все в порядке.

Симуляция отказов

Чтобы протестировать отказоустойчивость, добавим возможность отключать узлы:

def disable(self):     with self.lock:         self.disabled = True         print(f"Узел {self.id} отключен.")  def enable(self):     with self.lock:         self.disabled = False         self.election_timeout = self.reset_election_timeout()         print(f"Узел {self.id} включен.")

И изменим методы отправки сообщений, чтобы учитывать состояние узла:

def request_vote(self, term, candidate_id):     if getattr(self, 'disabled', False):         return False     # Остальной код...  def receive_append_entries(self, *args, **kwargs):     if getattr(self, 'disabled', False):         return False     # Остальной код...

Теперь можно симулировать отключение узлов и проверять, как кластер реагирует на это.

Протестируем кластер

Создадим несколько узлов и запустим их:

# main.py  from node import Node, State import time  if __name__ == "__main__":     nodes = []     # Создаем все узлы сначала с пустыми peers     for i in range(5):         node = Node(node_id=i, peers=[])         nodes.append(node)     # Теперь устанавливаем peers для каждого узла     for node in nodes:         node.peers = [peer for peer in nodes if peer.id != node.id]     leader = None     # Ждем, пока лидер не будет выбран     while not leader:         for node in nodes:             if node.state == State.LEADER:                 leader = node                 break         time.sleep(0.5)     print(f"Лидер выбран: Узел {leader.id}")     leader.client_command("Сохранить данные")     # Отключаем лидера     leader.disable()     print(f"Узел {leader.id} отключен")     time.sleep(15)     # Проверяем, выбран ли новый лидер     new_leader = None     for node in nodes:         if node.state == State.LEADER and not getattr(node, 'disabled', False):             new_leader = node             break     if new_leader:         print(f"Новый лидер: Узел {new_leader.id}")         new_leader.client_command("Новая команда")     else:         print("Не удалось выбрать нового лидера.")

Скрипт создаст пять узлов, инициирует выборы лидера, отправит команды и симулирует отказ лидера. Вот что можно ожидать в консоли:

# main.py  from node import Node, State import time  if __name__ == "__main__":     nodes = []     # Создаем все узлы сначала с пустыми peers     for i in range(5):         node = Node(node_id=i, peers=[])         nodes.append(node)     # Теперь устанавливаем peers для каждого узла     for node in nodes:         node.peers = [peer for peer in nodes if peer.id != node.id]     leader = None     # Ждем, пока лидер не будет выбран     while not leader:         for node in nodes:             if node.state == State.LEADER:                 leader = node                 break         time.sleep(0.5)     print(f"Лидер выбран: Узел {leader.id}")     leader.client_command("Сохранить данные")     # Отключаем лидера     leader.disable()     print(f"Узел {leader.id} отключен")     time.sleep(15)     # Проверяем, выбран ли новый лидер     new_leader = None     for node in nodes:         if node.state == State.LEADER and not getattr(node, 'disabled', False):             new_leader = node             break     if new_leader:         print(f"Новый лидер: Узел {new_leader.id}")         new_leader.client_command("Новая команда")     else:         print("Не удалось выбрать нового лидера.")

Вывод получится такой:

Узел 0: лидер потерян, начинаю выборы. Узел 0: голосую за 0. Узел 1: голосую за 0. Узел 2: голосую за 0. Узел 3: голосую за 0. Узел 4: голосую за 0. Узел 0: выбран лидером! Узел 0: я лидер! Лидер 0: отправляю heartbeat узлу 1. Лидер 0: отправляю heartbeat узлу 2. Лидер 0: отправляю heartbeat узлу 3. Лидер 0: отправляю heartbeat узлу 4. Узел 0: добавляю команду Сохранить данные в журнал. Лидер 0: отправляю heartbeat узлу 1. Лидер 0: отправляю heartbeat узлу 2. Лидер 0: отправляю heartbeat узлу 3. Лидер 0: отправляю heartbeat узлу 4. Узел 0 отключен Узел 1: лидер потерян, начинаю выборы. Узел 1: голосую за 1. Узел 2: голосую за 1. Узел 3: голосую за 1. Узел 4: голосую за 1. Узел 1: выбран лидером! Узел 1: я лидер! Лидер 1: отправляю heartbeat узлу 0. Лидер 1: отправляю heartbeat узлу 2. Лидер 1: отправляю heartbeat узлу 3. Лидер 1: отправляю heartbeat узлу 4. Новый лидер: Узел 1 Узел 1: добавляю команду Новая команда в журнал. Лидер 1: отправляю heartbeat узлу 0. Лидер 1: отправляю heartbeat узлу 2. Лидер 1: отправляю heartbeat узлу 3. Лидер 1: отправляю heartbeat узлу 4. Узел 1: применил команду: Новая команда

В начале узлы начинают выборы лидера и узел 0 становится первым лидером после получения большинства голосов. Затем лидер 0 начинает отправлять сердцебиения и реплицировать команду «Сохранить данные». Когда мы отключаем лидера 0, узел 1 обнаруживает его отсутствие, инициирует новые выборы и становится новым лидером, после чего успешно обрабатывает и реплицирует команду «Новая команда».

Но тут сделаю замечание, что из‑за асинхронности и случайных таймаутов порядок событий может меняться при каждом запуске.


Если захотите прикрутить Raft в реальный проект, то нужно будет добавить обработку сетевых сбоев, механизм «догонки» отставших узлов, и проверки корректности журналов. Улучшите производительность, перейдя на асинхронное программирование, минимизируйте блокировки и оптимизируйте передачу данных. Реализуйте обратную связь для клиентов, чтобы они получали подтверждения, и протестируйте систему в сложных сценариях отказов.

Всех желающих приглашаю на открытые уроки по архитектуре высоких нагрузок:

9 декабря: «Обеспечение отказоустойчивости хранилищ» — Вы узнаете, как правильно проектировать и настраивать хранилища, чтобы минимизировать простои и предотвратить потерю данных при сбоях. Записаться

16 декабря: «Распределённые транзакции — как добиться согласования данных в распределённой сети». Записаться


ссылка на оригинал статьи https://habr.com/ru/articles/862550/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *