Alerting Apache Airflow, уведомления в телеграм

от автора

Небольшое отступление
Работая работу, вдруг, появилась необходимость познакомиться с таким инструментом, как Apache Airflow. Задачу дали простую — нет никаких уведомлений в DAG’ах, при сбое необходимо уведомлять. Так как про этот сервис я только «слышал», уверенных знаний я показать даже сейчас, боюсь, не смогу. Зато смогу поделиться с вами простым кодом оповещения, который поможет вам не придумывать велосипед и воспользоваться (а то и улучшить) текущим. За основу я взял статью на Хабре, само собой официальная документация и другие открытые источники. Так же отдельное спасибо моему наставнику, который ревьювил всю работу.

Данный alerting предназначен для, скорее всего, для любых версий, но если вы хотите использовать преимущества Apache Airflow на полную, то при версии >= 2.6.0 рекомендуется читать статью выше.

Для аннотаций типов используется пакет typing (as tp). Для отправки уведомлений в telegram вам понадобится установить пакет python-telegram-bot .

Если вы здесь, скорее всего, вы уже примерно представляете что такое Apache Airflow, таски (джобы) и даги, поэтому не буду вдаваться в подробности и сразу перейду к сути.
Для начала я советую вам повторить (а в будущем и превзойти, если нужно!) текущую конфигурацию.

Для того чтобы понять работают ли уведомления напишем простейший dag со сломанной таской:

def failing_task():     raise Exception("Пример ошибки")   with DAG(     "telegram_notification_dag",     default_args={         "on_failure_callback": #что-то должно вызываться при сломанной таске     },     description="Отправка уведомлений через Telegram бот",     schedule_interval="@daily",     start_date=datetime(2022, 1, 1),     catchup=False, ) as dag:      failing_task = PythonOperator(         task_id="failing_task", python_callable=failing_task, dag=dag     )      (failing_task)

Для людей, которым нужно только уведомление, но никто не хочет в этом разбираться поясню: DAG — просто структура, которая определяет порядок выполнения задач, их взаимодействие и последовательности. Task (job) — это задание или шаг в рамках DAG, которое выполняет какое-либо действие.

В наш DAG передали его идентификатор, словарь аргументов (все аргументы можно посмотреть а официальной доке), интервал запуска и т.д.

Далее определим нужные нам таски, нам нужен только один (который будет сломан).

Существует несколько операторов для выполнения задач, в нашем случае используется PythonOperator, который будет вызывать (какой-то) python код.

Далее не буду описывать всё по шагам и просто скопирую и объясню. Класс TelegramNotification отвечает за отправку сообщения пользователю.

class TelegramNotification:     """intervals - интервалы переотправки оповещения,      если не получается отправить оповещение"""      def __init__(         self,         chat_id: str,         token: str,         message_template: str,         responsible_users: tp.List[str] = [],         intervals: tp.List[int] = [1, 60, 600],     ):         self._chat_id = chat_id         self._messageTemplate = MessageTemplate(message_template, responsible_users)         self._intervals = intervals         self._token = token      def send_telegram_notification(self, context: tp.Dict[str, tp.Any]) -> None:          message = self._messageTemplate.create_message_template(context)          for interval in self._intervals:             try:                 bot = Bot(token=self._token)                 bot.send_message(chat_id=self._chat_id, text=message)                 break             except Exception as e:                 logger.info(f"Error sending message to Telegram: {e}")                 time.sleep(interval)

Отправка сообщения боту происходит в цикле конструкции try except, из всего, что связано с телеграмом, тут только создание бота и далее строчка с вызовом метода send_message у нашего созданного бота. Почему всё это обёрнуто непонятно как будет объяснено в конце статьи. Наше сообщение должно формироваться в зависимости от того, что нужно человеку, т.к. это противоречит цели этого класса, а у каждого класса своя зона ответственности! То сам модуль состоит из ещё одного класса.

class MessageTemplate:     def __init__(self, message_template: str, responsible_users: tp.List[str]):         self._message_template = message_template         self._responsible_users = responsible_users      def create_message_template(self, context: tp.Dict[str, tp.Any]) -> str:         args = self._parse_context(context)         message = self._message_template.format(**args)         return message      def _parse_context(self, context: tp.Dict[str, tp.Any]) -> tp.Dict[str, tp.Any]:         """Доступные аргументы для message template         DAG_NAME: название DAG         TASK_ID: название задачи         DATE: дата и время выполнения задачи         TASK_LOG_URL: ссылка на лог выполнения задачи         PARAMS: параметры, переданные в задачу         CONF: глобальные параметры, переданные в DAG при его запуске         PREV_EXEC_DATE: дата и время выполнения предыдущей задачи         USERS: список пользоватлей, ответственных за выполнение задачи         """         return {             "DAG_NAME": context.get("dag").dag_id,             "TASK_ID": context.get("task_instance").task_id,             "DATE": self._create_formatted_date(context.get("execution_date")),             "TASK_LOG_URL": context.get("ti").log_url,             "PARAMS": context.get("params"),             "CONF": context.get("conf"),             "PREV_EXEC_DATE": self._create_formatted_date(                 context.get("prev_execution_date")             ),             "USERS": self._create_users_string(),         }      def _create_formatted_date(self, date: datetime) -> str:         return date.strftime("%Y-%m-%d %H:%M:%S") if date else ""      def _create_users_string(self) -> str:         return ", ".join([f"@{user_name}" for user_name in self._responsible_users])

Из всего, что здесь есть, думаю, стоит упомянуть объект context, который передаётся в callback функцию при успешном (можете сами дописать в класс функцию, которая отправляет что-то при success) или не успешном выполнении задачи. Этот объект — словарь, из которого можно вытянуть практически всю информацию, которая вам нужна. Аргументы в данном примере — не все. Их как минимум в два раза больше, если вам нужно что-то дополнительно, вы с лёгкостью сможете внедрить это сюда.

Теперь посмотрим, как же с этим работать. Далее весь код для нашего тестового DAG’а:

import time  from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime from loguru import logger  from dependencies.tg_notification import TelegramNotification, read_bot_secrets  PATH_TO_SECRETS = "путь_к_секретам" SECRETS = read_bot_secrets(PATH_TO_SECRETS)  CHAT_ID_KEY = "chat_id" TOKEN_KEY = "token"  NOTIFY_MESSAGE = """ Идентификатор DAG: {DAG_NAME}.  Идентификатор задачи: ❌{TASK_ID}❌.  Дата и время выполнения задачи: {DATE}.  Ответственные лица: {USERS} """   telegram_notification = TelegramNotification(     chat_id=SECRETS.get(CHAT_ID_KEY),     token=SECRETS.get(TOKEN_KEY),     message_template=NOTIFY_MESSAGE,     responsible_users=[         "your_username",      ],     intervals=[1, 100, 1000], )   def failing_task():     raise Exception("Пример ошибки")   with DAG(     "telegram_notification_dag",     default_args={         "on_failure_callback": telegram_notification.send_telegram_notification     },     description="Отправка уведомлений через Telegram бот",     schedule_interval="@daily",     start_date=datetime(2022, 1, 1),     catchup=False, ) as dag:      failing_task = PythonOperator(         task_id="failing_task", python_callable=failing_task, dag=dag     )      (failing_task)

Конечно же, последуют объяснения. Для начала прочитаем наши секреты. Функция для чтения (read_bot_secrets) будет показана чуть ниже, она играет довольно важную роль для нас, чтобы всё не сломалось.

Так же создадим шаблон сообщения. Код с доступными аргументами (MessageTemplate) находится выше и вы можете составить какой только захотите.

Создаём объект класса TelegramNotification и передаём туда токен нашего бота, наш id чата, шаблон сообщения, а так же ответственных лиц для нужного вам DAG’а (без @).

Вся магия происходит вот тут:

"on_failure_callback": telegram_notification.send_telegram_notification

При сбое в таске вызывается функция send_telegram_notification, куда передаётся Airflow объект context, в котором уже содержится вся нужная нам информация.

Что же произойдёт, если у нас не получилось прочитать данные токена или id чата, или вообще не получилось прочитать наши секреты? Вернётся пустой словарь или словарь с нужными (правильными или нет) значениями. Так как в данном коде используется метод get в получении значений (token и chat_id), то если какого-либо ключа не будет, то нам просто вернётся None. В конце функции send_telegram_notification класса TelegramNotification так же используется конструкция try except, это гарантирует, что если у нас будут проблемы с сетью, или у нас не будет каких либо значений, или они будут неправильные — в логи выведется ошибка и мы сможем уже работать с ней дальше.

В моём случае я использую рабочий Apache Airflow, из всей доступной мне информации могу сказать, что стоит версия 2.5.3.

Результаты:

На этом у меня всё. Задавайте свои вопросы, пишите комментарии, оставляйте свои замечания. Надеюсь, моя статья хоть как-то помогла вам.


ссылка на оригинал статьи https://habr.com/ru/articles/827136/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *