Как сделать livenessProbe для Kafka-консьюмеров и перестать перезапускать их вручную

от автора

Kafka-консьюмеры не всегда работают так стабильно, как хотелось бы. Иногда они просто зависают — без ошибок, без падений, но и без обработки сообщений. LivenessProbe в Kubernetes помогает автоматически перезапускать зависшие сервисы, но с Kafka-консьюмерами всё не так просто: стандартного решения для них нет. В этой статье разберём, как правильно реализовать livenessProbe для консьюмеров с помощью паттерна Heartbeat, чтобы не перезапускать их вручную.

Содержание

Зачем нужен livenessProbe

В Kubernetes livenessProbe периодически опрашивает под (приложение) на предмет работоcпособности и перезапускает его, если что-то не работает. Если бы это был контейнер с API на FastAPI, то мы бы удостоверились, что контейнер возвращает правильный HTTP код 200 с одного из эндпоинтов. Допустим, приложение зависло и перестало отдавать ответ клиенту – livenessProbe перезапустил бы контейнер.

Kubernetes с помощью livenessProbe периодически опрашивает контейнер и перезапускает его в случае необходимости

Kubernetes с помощью livenessProbe периодически опрашивает контейнер и перезапускает его в случае необходимости

В чем проблема livenessProbe для Kafka-консьюмера

Для Kafka Consumer нет единого описанного подхода к реализации такого механизма, из-за чего разработчики сходу не могут реализовать эту функциональность правильно. Зачастую можно наткнуться на статьи, где предлагают следить за Сonsumer lag, что делает такой подход некорректным, так-как наличие лага не говорит, есть ли проблема у конкретного консьюмера или он просто не справляется с нагрузкой.

В моей голове родилось несколько подходов к проверке (увы, не все из них жизнеспособны):

  • Через consumer lag (неточный);

  • Через unix сигналы (проверяет, не завис ли процесс ОС, неточный);

  • Через внутренние метрики кафки (сложно);

  • Через кастомный механизм heartbeat в консьюмере (оптимально и точно);

  • Через отправку сообщения в health топики и его вычитку в livenessProbe (точно, но громоздко)

Реализация livenessProbe через паттерн HeartBeat ❤️

Простым решением проверки работы консьюмера может стать периодическая отправка хартбитов в любое хранилище, а затем сверка текущего времени и времени последнего бита.

Допустим, мы примерно знаем, что если задача выполняется больше 10 секунд, то скорее всего консьюмер завис и нам стоит его перезапустить. Каждый цикл получения и обработки сообщений из топика сохраняем данные по времени в некое хранилище. Самый простой вариант — в файл. Kubernetes затем может посмотреть в файл, сравнить время последнего бита с текущим и перезапустить консьюмер, если разница больше 10 секунд.

Концептуальная работа livenessProbe для кафка консьюмера

Концептуальная работа livenessProbe для кафка консьюмера

Реализация на Python

Для использования этого подхода необходимы изменения на стороне консьюмера, а для Kubernetes необходим скрипт.

Абстрактный класс HeartBeat

Опишем абстрактный класс HeartBeat, наследуя и реализуя который мы сможем вызывать методы save для сохранения heartbeat консьюмера. Это позволит нам в дальнейшем подменять реализацию – сохранять данные в файл, в редис или БД.

import abc  class HeartBeat(abc.ABC):     @abc.abstractmethod     def save(self) -> None:         """Save timestamp data to mark that worker is alive."""              @abc.abstractmethod     def is_alive(self) -> bool:         """Check that worker is alive."""        

Структура данных

Опишем структуру данных о heartbeat. В нашем случае важно хранить идентификатор хоста и время. И должно это сохраняться в JSON.

{     "host_id": "00000000-0000-0000-0000-0242ac160007",      "last_beat": "2023-12-08T09:38:01.255909+00:00" }
class HeartBeatState(BaseModel):     host_id: str     last_beat: datetime

FileHeartBeat – сохранение в файл

Для простоты сохраним данные в файл – напишем реализацию FileHeartBeat. Придумаем файлу имя, соберем к нему путь, затем сериализуем HeartBeat в JSON и сохраним в файле.

HEART_BEAT_FILE_NAME = "heartbeat.json"  logger = logging.getLogger(__name__)   class FileHeartBeat(HeartBeat):     """     Save heartbeats to a file to check if consumer is alive by livenessProbe.     """      def save(self):         """Save heartbeat timestamp data to indicate that worker is alive."""         heartbeat_file_path = self._retrieve_heartbeat_file_path()         host_id = uuid.UUID(int=uuid.getnode())         last_beat = now()          state = HeartBeatState(             host_id=str(host_id),             last_beat=last_beat,         )         self._save_state_to_file(heartbeat_file_path, state)      def _save_state_to_file(             self,              heartbeat_file_path: Path,              state: HeartBeatState,     ) -> None:         """Save heartbeat timestamp and other information in a file."""         logger.info("Saving heart beat to file: %s, %s", heartbeat_file_path, str(state.json()))          with open(heartbeat_file_path, "w+") as worker_heartbeat_file:             worker_heartbeat_file.write(state.json())       def _retrieve_heartbeat_file_path(self) -> Path:         """Construct file path where heartbeat info is located."""          base_dir = Path(settings.BASE_DIR)          heartbeat_file_name = HEART_BEAT_FILE_NAME         heartbeat_file_path = base_dir.joinpath(heartbeat_file_name)          return heartbeat_file_path 

FileHeartBeat – проверка, жив ли консьюмер

Для проверки, что консьюмер жив, опишем метод is_alive. Прочтем файл, десериализуем данные из JSON в нашу модель и сравним, сколько прошло секунд с момент последнего бита. Если больше, чем мы ожидаем, то считаем консьюмер мертвым.

class FileHeartBeat(HeartBeat):     """     Saves heartbeats to a file to check if consumer is alive by livenessProbe.     """      def is_alive(self, max_time_after_worker_dead: int = 15) -> bool:         """ Main livenessProbe method to check if consumer is healthy. """         heartbeat_file_path = self._retrieve_heartbeat_file_path()          try:             logger.info("Trying to check heartbeat file: %s", heartbeat_file_path)             heart_beat_stats = self._read_heartbeat_file(heartbeat_file_path)         except FileNotFoundError:             logger.error("Heartbeat file not found: %s", heartbeat_file_path)              return False          logger.info("Got last heartbeat info: %s", str(heart_beat_stats.json()))          time_after_last_beat = now() - heart_beat_stats.last_beat         if time_after_last_beat.seconds > max_time_after_worker_dead:             logger.error(                 "Worker is dead. Time since last beat %s > %s",                 time_after_last_beat.seconds,                 time_offset,             )             return False          logger.info(             "Worker is alive. Time since last beat %s < %s",             time_after_last_beat.seconds,             time_offset,         )         return True      def _read_heartbeat_file(self, heartbeat_file_path: Path) -> HeartBeatState:         """Read heartbeat timestamp and other information from a file."""         with open(heartbeat_file_path) as worker_heartbeat_file:             heart_beat_stats_json: dict = json.load(worker_heartbeat_file)             heart_beat_state = HeartBeatState(**heart_beat_stats_json)              return heart_beat_state      def _retrieve_heartbeat_file_path(self) -> Path:         """Construct file path where heartbeat info is located."""          base_dir = Path(settings.BASE_DIR)          heartbeat_file_name = HEART_BEAT_FILE_NAME         heartbeat_file_path = base_dir.joinpath(heartbeat_file_name)          return heartbeat_file_path 

FileHeartBeat – интегрируем в консьюмер

Интегрируем этот код в код консьюмера. В данном случае используется датакласс для возможности подмены реализации. В начале каждого цикла сохраняем данные, что живы.

@dataclasses.dataclass class ConfluentKafkaConsumer:     heart_beat: HeartBeat      def run(self):         consumer = ...         ....                  while True:             message: ConfluentKafkaMessage = consumer.poll(2.0)             self.heart_beat.save()              if not message:                 # No message received                 continue              self._process_message(message)   confluent_kafka_consumer = ConfluentKafkaConsumer(     heart_beat=FileHeartBeat(), )

FileHeartBeat — интегрируем в CLI

Дальше необходимо написать CLI-скрипт, который сообщит Kubernetes о том, что воркер жив или умер. Для этого необходима подобная функция, в зависимости от вашего фреймворка:

ERROR_CODE = 1  ... # Your CLI  def health_check(max_time: Optional[int]):     heart_beat = FileHeartBeat()      is_alive = heart_beat.is_alive(max_time_after_worker_dead=max_time)      if not is_alive:         sys.exit(ERROR_CODE)

FileHeartBeat – интегрируем в livenessProbe

В helm пропишем livenessProbe проверять состояние консьюмера каждые 30 секунд. У нас используется Django, поэтому запускаем через него.

    livenessProbe:       exec:         command:         - ./manage.py         - consumer         - is_alive         - --max_time         - "30"

На этом всё. Мы готовы проверять консьюмеры. Код приведен концептуальный.

Что дальше?

Вы можете реализовать сохранение Heartbeat в БД или Redis, а затем в админке смотреть состояние ваших консьюмеров.

Вы можете пойти ещё дальше и написать команды для приостановки вашего консьюмера прямо в коде через подобную концепцию проверки состояния в цикле перед обработкой сообщения. Это может быть удобно, когда нужно сбросить оффсеты в критических случаях (консьюмер группа должна быть остановлена)

Помимо вышеописанного, можно попробовать выделить код в отдельную библиотеку и переиспользовать.

Выводы ✍️

Из опыта использования этого подхода могу выделить следующее:

  • После реализации механизма Heartbeat мы перестали перезапускать консьюмеры руками и полностью автоматизировали этот процесс.

  • Среди возможных реализаций данный способ оказался наиболее простым и точным.

  • Данный подход позволяет расширять функциональность по отслеживанию и управлению консьюмерами – если использовать Redis, то можно отследить в админке консьюмеры, а также реализовать функциональность по управлению без перезапуска и девопсов прямо из админки.

Дополнительные материалы 📚

Вот, что ещё можно почитать на эту тему:


ссылка на оригинал статьи https://habr.com/ru/articles/880684/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *