Apache Airflow: нестандартное применение

от автора

Привет, Хабр! Я Маша Башан, Python разработчик в команде dBrain. Сегодня расскажу, как в dBrain мы внедрили собственную стратегию тестирования, которая в разы сокращает количество действий работающих с платформой инженеров. Мы уже озвучивали свой подход в разработке: если процесс можно автоматизировать — это нужно сделать.

Мы применили альтернативный вариант тестирования — с помощью сервиса Apache Airflow. Это open‑source инструмент для разработки, выполнения, планирования и мониторинга процессов по обработке данных.

Наиболее популярные способы применения Apache Airflow:

  • создание и управление ETL‑процессами (Extract, Transform, Load), где данные извлекаются из различных источников, обрабатываются и загружаются в хранилище;

  • планирование выполнения задач по расписанию, что удобно для регулярных процессов, таких как обновление отчетов или выполнение резервного копирования;

  • документирование рабочих процессов и отслеживание изменений в версиях DAG (Directed Acyclic Graphs);

  • веб‑интерфейс для мониторинга выполнения задач и отправки оповещений в случае ошибок.

Обозначим основные термины, чтобы лучше понимать друг друга 🙂

  • task — это базовая единица работы в Airflow, которая преследует конкретную задачу: обработка данных, выполнение вычислений или взаимодействие с внешними системами;

  • DAG — это файл с тестом, ключевая сущность Airflow, которая состоит из набора tasks.

Раздел DAGS

Раздел DAGS

Почему Airflow лучше для тестирования? Сравним с Python: система поддерживает модуль PyTest, отчет приходит в виде текста. В Airflow пользователь получает графический отчет, где с помощью цветов расставлены все акценты.

В веб-интерфейсе Airflow размещаются все подготовленные разработчиком тестовые сценарии. Для тестов можно задать расписание, настроить их в формате кода или в UI. В Airflow отображаются все параметры:

  • владелец,

  • активный или неактивный тест,

  • стоит DAG с тестированием на паузе или нет,

  • количество запусков: сколько успешных, сколько провальных, последний запуск, когда запланирован следующий, какие задачи были перезапущены и так далее.

Кликнув по тесту, пользователь получает детальную информацию:

  • о дате и времени проведения испытания,

  • ходе и результате выполнения задач.

В графическом отчете Airflow видно количество итераций и их успешность (колонка слева) для каждой из тасок внутри DAG’а и время их выполнения. В строке сверху указаны обозначения всех цветов. На центральной части — графическое изображение DAG’а: связь между задачами, порядок их выполнения и зависимости.

DAG внутри

DAG внутри

В разделе Graph отображаются выполненные для каждой из операций задачи. Выбрав определенную задачу, можно в логах изучить ход ее выполнения. Более детально углубиться в полученные результаты тестирования можно с помощью кода.

В исходном коде видны нюансы происходящего, например, что должно было отразиться в DAG’e, если в логах это не прописано и т.д. В меню есть флажки разных цветов, чтобы пользователю было проще ориентироваться в задачах. Предусмотрено автообновление DAG’а: все внесенные в код обновления подтянутся и в него. Таким образом, тестирование сводится к нажатию нескольких кнопок, что значительно облегчает жизнь девопса. Достаточно просто нажать Play, начнется работа DAG’а с несколькими задачами. За процессом можно наблюдать буквально в прямом эфире.

Реализация

Airflow — open source объект. Разворачивание Apache Airflow на кластере может варьироваться в зависимости от используемой инфраструктуры.

Чтобы развернуть его на нашем кластере, было необходимо подготовить окружение, настроить базу данных, а после установки — конфигурацию airflow.cfg. Есть только один нюанс — должен быть отдельно развернут PostgreSQL (или другая совместимая база), в которой будет создана БД для Airflow. После выполнения предыдущих этапов создаем директорию для хранения всех DAG’ов, которые и будут тестами (являются тестами) — DAG. Внутри этой директории мы разместим наши DAG’и (тесты).

При написании кода на Python внутри DAG в Apache Airflow есть несколько нюансов, которые помогут избежать ошибок.

  • Импорт

Все модули (библиотеки) DAG’ов должны быть установлены в среде, где работает Airflow. Однако некоторые модули необходимо устанавливать непосредственно внутри task (DAG состоит из task).

def test_k8s():     @task.virtualenv(task_id="check_pods_status", requirements=["kubernetes"])     def check_pods_status():         from kubernetes import client, config 

С помощью декоратора @task.virtualenv и тега requirements указываем необходимые зависимости, а в теле функции прописываем импорт модулей. Таким образом создаем виртуальное окружение для отдельной функции.

  • Структура

В версию Airflow 2.9.3, которую мы установили в dBrain, используется структура DAG с помощью декораторов. В параметрах указываем необходимые теги и настраиваем их: например, расписание выполнения теста. Потом создаем функцию, внутри которой будем размещать task, а также настраивать порядок их выполнения и зависимости между ними.

from airflow.decorators import dag, task from airflow.utils.trigger_rule import TriggerRule import pendulum @dag(     schedule=None,     start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),     catchup=False,     tags=["k8s"], ) def test_k8s():     @task.virtualenv(task_id="check_pods_status", requirements=["kubernetes"])     def check_pods_status():         from kubernetes import client, config 

Зависимости можно установить двумя способами:

  • двойной угловой скобкой (функция delete_bucket будет выполнена только после выполнения функции create_bucket);

  • методами set_upstream() and set_downstream(), причем этим методам эквивалентны bitshift operators >> and << соответственно.

storage_classes = get_list_storage_classes() pods = get_list_pods() pvcs = get_list_pvc()  task_clear_cache = clear_cache(storage_classes, pvcs, pods)  task_create_pvc = create_pvc(storage_classes, pvcs) task_create_pods = create_pods(storage_classes, pods)
  • Настройка выполнения tasks внутри DAG

В стандартных настройках Airflow при прерывании одной из задач работа DAG полностью останавливается, но эту последовательность можно перенастроить с помощью тега Trigger Rule.

Как это выглядит в коде:

@task.virtualenv(task_id='delete_rbd', trigger_rule=TriggerRule.ALWAYS, requirements=["requests==2.32.3"])

Примеры других значений Trigger Rule для определения последовательности выполнения task:

  • all_success : (default) all parents have succeeded

  • all_failed : all parents are in a failed or upstream_failed state

  • all_done : all parents are done with their execution

  • one_failed : fires as soon as at least one parent has failed, it does not wait for all parents to be done

  • one_success : fires as soon as at least one parent succeeds, it does not wait for all parents to be done

  • none_failed : all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped

  • none_skipped : no parent is in a skipped state, i.e. all parents are in a success , failed , or upstream_failed state

  • dummy : dependencies are just for show, trigger at will

Здесь можно почитать про них.

Три вида тестирования

На dBrain реализованы три варианта тестирования: функциональное, нагрузочное и Status Check.

Нагрузочное тестирование помогает определить, выдержит ли размещенное на dBrain приложение новый виток нагрузки. Например, наплыв пользователей планируется после запуска рекламной кампании. Владельцы приложения определяют предполагаемую нагрузку и задают параметры для тестирования в dBrain. Результаты покажут, нужно ли расширять базу данных или искать какие-то альтернативные варианты развития событий.

Функциональное тестирование. При развертывании приложения на платформе создается новый кластер. Функциональное тестирование покажет работоспособность всех сервисов и баз данных платформы на этом кластере. Такой вариант тестирования незаменим и перед выходом обновлений как приложения, так и самой платформы в продакшен.

Status Check делится на:

  • Healthcheck — проверка состояния системы или ее компонентов, доступны ли они и работают корректно. Частота проведения: выполняется по расписанию или вручную при необходимости.

  • Clustercheck — проверка состояния кластера, обеспечения отказоустойчивости системы. Инженеры видят подсвеченные ошибки по каждому из кластеров. Частота проведения: перед релизом, после внесения изменений, при обновлении платформы, а также по настроенному расписанию.

Тестирование в dBrain — это своеобразное руководство по работе платформы. Например, в ClusterCheck проверка работы Kubernetes превращается не просто в исследование подов, нодов, Load Balancer и др. Пользователи видят и могут изучить, с помощью чего и как идет проверка. Тестирование помогает имитировать тот или иной инцидент в работе системы, что позволяет разработать план действий для устранения неполадок.

Функциональное тестирование покажет, как работают сервисы и какие компоненты в них есть. Тестирование может служить обучением программистов по использованию нашей платформы. Например, тестирование Ceph покажет, что основная функция сервиса — создание RBD и бакетов, а среди других возможностей — файловая система CephFS и управление репликацией данных.

Airflow разработан для управления сложными потоками данных и задачами. Он позволяет визуализировать зависимости между задачами, что упрощает понимание и тестирование сложных процессов. Airflow предоставляет возможности для планирования задач, что позволяет автоматически запускать тесты в определенное время или по триггерам. Он предлагает встроенные инструменты мониторинга и визуализации выполнения задач. Можно легко отследить статус задач, что помогает быстро выявлять проблемы и проводить диагностику. Airflow интегрируется с различными системами и инструментами обработки данных, такими как базы данных, облачные сервисы и API. Это позволяет тестировать рабочие процессы в реальных условиях и проверять взаимодействие между компонентами. Airflow имеет механизмы для обработки ошибок и повторных попыток выполнения задач. Это позволяет лучше управлять неудачными тестами и предоставляет возможность автоматического восстановления. Airflow хорошо подходит для масштабируемых решений, где требуется тестирование большого количества задач и потоков данных. Он может обрабатывать множество DAG»ов одновременно, что делает его эффективным для крупных проектов. Airflow поддерживает параллельное выполнение задач, что позволяет ускорить процесс тестирования и обработки данных, особенно в больших проектах.

С таким инструментом для тестирования все, что инженерам dBrain необходимо было прописывать руками перед каждым обновлением платформы или находящихся на ней приложений, свелось к нажатию нескольких клавиш и сокращению времени на проверку сервисов в несколько раз.

Мы поделились своим лайфхаком, как упростить жизнь девопсу, и ждем вашего отклика. Делитесь в комментариях своими рабочими инструментами для упрощения задач девопса, а также рассказывайте, какие темы в разработке вам интересны.


ссылка на оригинал статьи https://habr.com/ru/articles/861842/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *