Кастомные email-оповещения в Apache Airflow

от автора

Пролог

 Как ИИ представляет себе "Этакое желание"

 Как ИИ представляет себе «Этакое желание»

Каждый разработчик сталкивался, или непременно столкнется, с ситуацией, когда в бизнес-требованиях видишь “этакое желание”.

Этакое желание (каламбурное определение) – достичь чего-то невозможного или близкого к невозможному с помощью программирования.

Дорогой читатель, наверное, задаст риторический вопрос:
— Как это?! Разве чего-то нельзя достичь с помощью программирования?!
Вопрос, конечно, к месту, и ответ в большинстве случаев очевиден:
— Нет ничего невозможного… главное грамотно спроектировать техническое решение.
Но сейчас немного не об этом, а о целесообразности расхода ресурсов: трудозатрат, количество привлеченных специалистов и т.п.

Стоит ли овчинка выделки?

Увидев “этакое желание” хочется сразу пресечь его на корню и больше не возвращаться к обсуждению безумной идеи.
Но что если все-таки дать шанс и извлечь из этого выгоду в будущем?

И тут без ИИ не обошлось

И тут без ИИ не обошлось

Конкретный случай

Спойлер: стоит отметить, что дальше не будет rocket science и метапрограммирования, а будет описан подход к решению задачи.

Мне, как разработчику команды “Разработка и автоматизация загрузок данных” Газпромбанка, посчастливилось встретить в одном из технических заданий “этакое желание” — реализовать в загрузочном DAG кастомизированные email-оповещения.
Они должны собирать необходимую информацию на всех стадиях пайплайна, то есть в каждой из задач DAG’a, а в зависимости от успешного выполнения или при отклонении алгоритма, формировать тело email-сообщения. Например: в задаче происходит ошибка валидации данных и в этом случае должна сообщаться конкретная причина падения DAG. Информация должна быть полезной и понятной для бизнеса.

Для загрузок в нашей кодовой базе уже был реализован инструмент оповещения,
но оповещения выполнялись только при удачном сценарии отработки DAG, например,
с количеством обработанных файлов и записанных строк в базу данных.
Формат был стандартным и не конфигурировался.

Первое впечатление от увиденного требования у меня было крайне скептическое,
но на одной из ежедневных встреч с командой, обсудив все за и против, было принято решение реализовать данную задачу. Один из основных аргументов – создание универсального инструмента нотификации, который можно будет переиспользовать в других наших DAG’ах. Это переиспользование даст возможность получить другим Заказчикам более четкое представление о загрузках.
Чем подробнее алерты, тем меньше вопросов к нам.

Ближе к коду

Собрав команду на встрече “побрейнштормить” задачку, мы наметили итеративный подход к реализации: в первой итерации – основной механизм, во второй – доработка и исправление багов.

Умный и находчивый читатель, наверное, сразу догадался, что можно использовать встроенные в библиотеку Airflow Callbacks. Но это не наш случай, так как нужно обрабатывать ошибки по месту, конкретные случаи. Наиболее предпочтительный вариант — использовать  отдельный Task Airflow (далее dag_email_notification) с определенным TriggerRule = NONE_SKIPPED, так как необходимо обработать все задачи DAG’а со статусами success и failed.

С обработчиком определились, а как передавать информацию между задачами? – XCom, скажете вы, и будете правы. Но что и как передавать? Хотелось бы использовать лаконичный и понятный для эксплуатации объект, который сам бы отправился в XCom.
Мы решили, что для этого хорошо подходит — dataclass (далее EmailNotification) с несколькими методами. Dataclass хороший вариант, так как в XCom следует отправлять сериализуемые в JSON данные.

Рассмотрим, как выглядит наш объект:

@dataclass class EmailNotification:     task_description: str | None = None     lines: list[dict[str, t.Any] | str | BaseException] = field(default_factory=list)       XCOM_KEY: str = "email_notification"       @classmethod     def set_task_description(cls, task_description: str) -> None:         """         Добавление описания таски в тело сообщения.            :param task_description: Описание таски - дополнительно           раскрывает смысл/задачу.           :return: None           """         instance = cls.get_from_xcom()           instance.task_description = task_description           cls.push_into_xcom(instance)       @classmethod     def add_line(         cls,         message: str | BaseException | dict[str, t.Any],     ) -> None:         """         Добавление дополнительной строки в инстанс EmailNotification          и пуш в XCom.           :param message: Содержание строки с сообщением         :return: None           """         instance = cls.get_from_xcom()           instance.lines.append(message)           cls.push_into_xcom(instance)       @classmethod     def push_into_xcom(cls, instance: EmailNotification) -> None:         # util,который из context возвращает текущий TaskInstance         ti: TaskInstance = get_current_ti()           ti.xcom_push(key=cls.XCOM_KEY, value=instance)           @classmethod     def get_from_xcom(cls) -> EmailNotification:         """         Возвращает инстанс EmailNotification из XCOM или создает новый.           :return: EmailNotification           """         # util, который из context возвращает текущий TaskInstance         ti: TaskInstance = get_current_ti()           instance = ti.xcom_pull(key=cls.XCOM_KEY, task_ids=ti.task_id)           return instance or cls()

Итак, на борту имеется — основной метод add_line, который будет добавлять необходимую строку в тело сообщения, как мы и хотели. Classmethod выбран не случайным образом, а для лаконичного использования, исключается необходимость дополнительно создавать инстанс, инициализируя класс.
Основная работа скрыта в методе get_from_xcom: при добавлении строки в тело сообщения инстанс класса будет создан в XCom, а если объект уже есть, то добавится строка в существующий инстанс.

Как это выглядит на практике:

@task def first_task() -> None:     EmailNotification.set_task_description("Описание для task 1")       EmailNotification.add_line("Первая строка в task 1")       EmailNotification.add_line("Вторая строка в task 1")

По результатам выполнения задачи first_task, в XCom получаем следующий объект:

EmailNotification(     task_description="Описание для task 1",     lines=["Первая строка в task 1", "Вторая строка в task 1"],     XCOM_KEY="email_notification" )

Остается только получить все эти объекты из XCom задач со статусами success/failed с помощью обработчика dag_email_notification, собрать все это в одно тело сообщения, разделяя на блоки для наглядности, и разослать всем заинтересованным лицам.

Ну мёд, медятина!

Вот как выглядит наше сообщение:

Конечно, для красоты добавлены всякие “рюшечки” и “бантики”, например, если статус failed, фон блока задачи в HTML-разметке меняется на красный.
Автоматически определяются адресаты и тема сообщения в зависимости от статуса выполнения DAG.
Параметр message в методе add_line неспроста может быть: строкой, exception или словарем — обработчик dag_email_notification учитывает это и, в зависимости от типа данных, по-разному формируется содержимое блока с описанием процесса выполнения задачи в теле сообщения.

И что, всё?

 Учим ИИ шутить

 Учим ИИ шутить

Как сказал Тимлид:

“Попробуй протестить с Dynamic Task Mapping (далее Mapped Tasks), там могут быть нюансы…”.

И действительно, нюансы есть…

Первое, что идет не по плану — в EmailNotification.get_from_xcom после успешного выполнения первой Mapped Task остальные падают, и ничего непонятно.
В ходе деббагинга выявился фигурант дела — LazyXComAccess.

Кратко говоря, это последовательность XCom’ов с Mapped Tasks.
В get_from_xcom, чтобы получить инстанс класса из XCom, одного параметра task_id становится мало – у всех Mapped Tasks один и тот же task_id.
К счастью, XCom’ы внутри LazyXComAccess хранятся с использованием индексов.
Чтобы решить вопрос, добавим map_indexes=ti.map_index в вызов xcom_pull.
Также теперь стоит учитывать индексы при получении данных из XCom в обработчике dag_email_notification.

С Mapped Tasks сообщение может выглядеть так:

Напоследок

Хотелось бы показать больше кода, особенно обработчик dag_email_notification,
но цель статьи – не перегружать вас техническими деталями, а главное – продемонстрировать, как можно совместно с командой предоставить бизнесу эффективный инструмент для мониторинга и анализа процессов.


ссылка на оригинал статьи https://habr.com/ru/articles/832278/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *