Генерация DAG в Apache Airflow

от автора

Оглавление:

Проблематика

Типовая задача для дата-инженера – это перенести данные из реплики/боевой OLTP DB в аналитическое хранилище.

В данной задаче обычно нужно переносить несколько таблиц и принцип их переноса является одинаковым. Ввиду чего необходимо создавать DAG с небольшими изменениями в коде.

Чаще всего это происходит так: дата-инженер заходит в типовой DAG и выполняет следующие действия:

  1. Cmd+A

  2. Cmd+C

  3. Cmd+V

  4. Поменял пару строчек в DAG, совершил опечатку/неверно скопировал/что-то еще

Решение

Генерация DAG по типовому DAG (шаблону).

Создаем шаблон, изменяем автоматически все необходимые поля и радуемся пускам в прод.

Пример решения проблематики описанной выше

Ниже будет поэтапно расписано как можно просто сделать фабрику DAG, благодаря которой можно смело пускать в прод полученные DAG.

Используемые технологии:

Все операции выполнялись на:

  • MacBook Air (M1, 2020)

    • Оперативная память 16 Gb

    • macOS Monterey 12.6

  • Docker 4.17.0

    • Docker resources:

      • CPUs: 2

      • Memory: 4 Gb

      • Swap: 2 Gb

Установка Airflow

Весь проект вы сможете найти в git-репозитории. Все команды указаны в хронологическом порядке в описании проекта.

Кратко:

Поднимаем проект командой

docker-compose up -d 

Пишем шаблонный DAG

Я не буду писать сложный DAG, так как в данной статье я бы хотел показать скорее суть и возможности создания фабрики DAG.

Код DAG
from datetime import datetime, timedelta  import pendulum  from airflow import DAG  from airflow.operators.python import PythonOperator from airflow.operators.empty import EmptyOperator   args = {     'owner': 'k0rsakov',     'start_date': datetime(2023, 3, 10, tzinfo=pendulum.timezone('Europe/Moscow')),     'catchup': True,     'retries': 3,     'retry_delay': timedelta(hours=1),     'max_active_runs': 1 }   def print_something() -> None:     """     Печатает текст.      :return: `None`     """     print('something')  with DAG(         dag_id='template',         schedule_interval='10 0 * * *',         default_args=args,         tags=['template', 'test'],         description='',         concurrency=1 ) as dag:      start = EmptyOperator(         task_id='start',     )      print_something = PythonOperator(         task_id='print_something',         python_callable=print_something,     )      end = EmptyOperator(         task_id='end',     )      start >> print_something >> end 

Реализован самый простой DAG, который может что-то печатать в консоль.

Если его запустить, то получим ожидаемый результат:

Изменение его под шаблон

Давайте для начала организуем хранение нашей фабрики.

Так как все DAG попадают в веб-интерфейс из папки dags, то мы реализуем всю логику формирования новых DAG вне этой папки.

У меня будет такая структура:

В папке dag_config_json_print_something будет находиться файл с конфигами для наших будущих DAG (об этом расскажу ниже).

В папке templates_dags как раз будет храниться наш шаблонный DAG.

Далее в нашем шаблоне изменяем все параметры, которые поддаются шаблонизации или те, которые должны будут меняться в зависимости от его конфигурации.

Примеры:

Исходный код:

args = {     'owner': 'k0rsakov',     'start_date': datetime(2023, 3, 10, tzinfo=pendulum.timezone('Europe/Moscow')),     'catchup': True,     'retries': 3,     'retry_delay': timedelta(hours=1),     'max_active_runs': 1 }

Шаблонный код:

args = {     'owner': '$$OWNER',     'start_date': $$START_DATE,     'catchup': True,     'retries': 3,     'retry_delay': timedelta(hours=1),     'max_active_runs': 1 }
Код DAG под шаблон
from datetime import datetime, timedelta  import pendulum  from airflow import DAG  from airflow.operators.python import PythonOperator from airflow.operators.empty import EmptyOperator   args = {     'owner': '$$OWNER',     'start_date': $$START_DATE,     'catchup': True,     'retries': 3,     'retry_delay': timedelta(hours=1),     'max_active_runs': 1 }   def print_something() -> None:     """     Печатает текст.      :return: `None`     """     print('$$PRINT')  with DAG(         dag_id='$$DAG_ID',         schedule_interval='$$SCHEDULE_INTERVAL',         default_args=args,         tags=$$TAGS,         concurrency=1 ) as dag:      start = EmptyOperator(         task_id='start',     )      print_something = PythonOperator(         task_id='print_something',         python_callable=print_something,     )      end = EmptyOperator(         task_id='end',     )      start >> print_something >> end 

Рекомендую изменить наименование файла в .txt чтобы IDE не ругалась на «ошибки».

Генерация DAG

Ранее мы изменили наш исходный DAG под шаблон, теперь необходимо указать все ключи, которые будут изменяться в одном файле. Я выбрал самый привычный и, как по мне, удобный вариант – JSON.

В заголовке я указываю название DAG, далее указываю все ключи, которые мы ранее создали в нашем шаблоне.

Если мы заходим шаблонизировать какое-то поле, то добавляем его в шаблон и добавляем затем в наш config.

{   "template": {     "OWNER": "k0rsakov",     "START_DATE": "datetime(2023, 3, 10, tzinfo=pendulum.timezone('Europe/Moscow'))",     "PRINT": "something",     "DAG_ID": "template",     "SCHEDULE_INTERVAL": "10 0 * * *",     "TAGS": "['template', 'test']"   } }

Создание «генератора» DAG

На данный момент мы имеем: шаблон типового DAG, имеем config файл с информацией о будущих DAG.

Теперь мы создадим саму фабрику, которая нам будет «генерировать» DAG.

Первая функция, которая будет изменять все ключи в шаблоне, которые есть в нашем config-файле:

Функция замены ключей в шаблоне
def replace_template_variables(template_dag: str = None, dict_variables: dict = None) -> str:     """     Функция, которая итерируется по всем ключам ключа основного словаря берет оттуда значение и подставляет в шаблон.      Пример:     ```json     {       "test":       {         "OWNER": "airflow",         "DAG_ID": "test",         "PK": "id"         ...       }     }     ```     На вход поступает значения по ключу `test` и функция итерируется по ключам: `OWNER`, `DAG_ID`, `PK`, ...     Берет значения по ключу и заменяет шаблон в указанном `template_dag`.      Соответственно, по всем ключам, которые есть в словаре DAG будет произведена замена по шаблону.      @param template_dag: Шаблон DAG; default 'None'.     @param dict_variables: Словарь со значениями, которые необходимо поменять в шаблоне; default 'None'.     @return: Измененный шаблон на основании значений по ключам.     """     for variables_ in dict_variables:         template_dag = template_dag.replace(f'$${variables_}', f'{dict_variables[variables_]}')      return template_dag

Далее создадим функцию, которая будет итерироваться по всем ключам config-файла указанного типа шаблона:

Функция для итерации по ключам выбранного типа шаблона
def dag_factory(type_dag: str = None) -> None:     """     Функция, которая генерирует DAG на основании выбранного `type_dag` и выбранных config на основании `type_dag`.      Пример: Если указан `print_something` в `type_dag`, то функция для генерации DAG будет использовать config:     "config_json_print_something.json" и сохранит сгенерированные DAG в папку:     //dags/json_dags/print_something/<print_something_dag_name.py>      @param type_dag: Указывается тип DAG для генерации print_something|etc; default 'None'.     @return: Ничего не возвращает, а сохраняет сгенерированный DAG по определенному пути.     """     with open(f'dag_config_json_{type_dag}/config_{type_dag}_dag.json') as j:         json_config = j.read()         json_config = json.loads(json_config)      with open(f'templates_dags/{type_dag}.txt', mode='r') as f:         template = f.read()      for dag in json_config:         modified_template = replace_template_variables(template_dag=template, dict_variables=json_config[dag])         with open(f'../dags/json_dags/{type_dag}/{dag}.py', mode='w') as dag_pyfile:             dag_pyfile.write(modified_template)

Объеденим обе функции в один файл и запустим полученный скрипт.

Получим сгенерированный DAG, который будет содержать все наши значения из config-файла.

И DAG появился в нашем веб-интерфейсе:

Масштабирование

Давайте теперь сгенерируем новые DAG по заданному шаблону.

Таким образом можно быстро создавать однотипные DAG.

Рекомендации

  • Для шаблона использовать хорошо проверенные DAG.

  • Продумать заранее все поля, которые будут шаблонизироваться.

  • Продумать поля, которые не нужны во всех DAG и сделать для них обработку исключений, чтобы можно было не указывать какой-то ключ.

  • Держать в голове ту мысль, что если изменить шаблон и запустить фабрику DAG, то изменения подтянутся во все генерируемые DAG.

  • Продумать вариант удаления DAG, которые исключены из config-файлов, сейчас такое не реализовано (я просто руками удаляю).

  • Использовать отдельную папку для генерируемых DAG, чтобы не смешивать рукописные и шаблонные DAG.

  • Не забывайте про идемпотентность.

  • Правильно используйте переменные из контекста DAG.

Итог

Фабрика DAG очень полезна, потому что позволяет не копировать писать руками однотипные DAG. Можно создать универсальные шаблоны для разных типов DAG:

  • Перезаписывают каждый раз табличку.

  • Обновляют значения по ключу.

  • Добавляют в конец.

  • Прочие однотипные DAG.

Я использую данное решение в проде и данный вариант проверен временем.


ссылка на оригинал статьи https://habr.com/ru/post/722688/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *