Оглавление:
Проблематика
Типовая задача для дата-инженера – это перенести данные из реплики/боевой OLTP DB в аналитическое хранилище.
В данной задаче обычно нужно переносить несколько таблиц и принцип их переноса является одинаковым. Ввиду чего необходимо создавать DAG с небольшими изменениями в коде.
Чаще всего это происходит так: дата-инженер заходит в типовой DAG и выполняет следующие действия:
-
Cmd+A
-
Cmd+C
-
Cmd+V
-
Поменял пару строчек в DAG,
совершил опечатку/неверно скопировал/что-то еще
Решение
Генерация DAG по типовому DAG (шаблону).
Создаем шаблон, изменяем автоматически все необходимые поля и радуемся пускам в прод.
Пример решения проблематики описанной выше
Ниже будет поэтапно расписано как можно просто сделать фабрику DAG, благодаря которой можно смело пускать в прод полученные DAG.
Используемые технологии:
Все операции выполнялись на:
-
MacBook Air (M1, 2020)
-
Оперативная память 16 Gb
-
macOS Monterey 12.6
-
-
Docker 4.17.0
-
Docker resources:
-
CPUs: 2
-
Memory: 4 Gb
-
Swap: 2 Gb
-
-
Установка Airflow
Весь проект вы сможете найти в git-репозитории. Все команды указаны в хронологическом порядке в описании проекта.
Кратко:
Поднимаем проект командой
docker-compose up -d
Пишем шаблонный DAG
Я не буду писать сложный DAG, так как в данной статье я бы хотел показать скорее суть и возможности создания фабрики DAG.
Код DAG
from datetime import datetime, timedelta import pendulum from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.empty import EmptyOperator args = { 'owner': 'k0rsakov', 'start_date': datetime(2023, 3, 10, tzinfo=pendulum.timezone('Europe/Moscow')), 'catchup': True, 'retries': 3, 'retry_delay': timedelta(hours=1), 'max_active_runs': 1 } def print_something() -> None: """ Печатает текст. :return: `None` """ print('something') with DAG( dag_id='template', schedule_interval='10 0 * * *', default_args=args, tags=['template', 'test'], description='', concurrency=1 ) as dag: start = EmptyOperator( task_id='start', ) print_something = PythonOperator( task_id='print_something', python_callable=print_something, ) end = EmptyOperator( task_id='end', ) start >> print_something >> end
Реализован самый простой DAG, который может что-то печатать в консоль.
Если его запустить, то получим ожидаемый результат:

Изменение его под шаблон
Давайте для начала организуем хранение нашей фабрики.
Так как все DAG попадают в веб-интерфейс из папки dags, то мы реализуем всю логику формирования новых DAG вне этой папки.
У меня будет такая структура:

В папке dag_config_json_print_something будет находиться файл с конфигами для наших будущих DAG (об этом расскажу ниже).
В папке templates_dags как раз будет храниться наш шаблонный DAG.
Далее в нашем шаблоне изменяем все параметры, которые поддаются шаблонизации или те, которые должны будут меняться в зависимости от его конфигурации.
Примеры:
Исходный код:
args = { 'owner': 'k0rsakov', 'start_date': datetime(2023, 3, 10, tzinfo=pendulum.timezone('Europe/Moscow')), 'catchup': True, 'retries': 3, 'retry_delay': timedelta(hours=1), 'max_active_runs': 1 }
Шаблонный код:
args = { 'owner': '$$OWNER', 'start_date': $$START_DATE, 'catchup': True, 'retries': 3, 'retry_delay': timedelta(hours=1), 'max_active_runs': 1 }
Код DAG под шаблон
from datetime import datetime, timedelta import pendulum from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.empty import EmptyOperator args = { 'owner': '$$OWNER', 'start_date': $$START_DATE, 'catchup': True, 'retries': 3, 'retry_delay': timedelta(hours=1), 'max_active_runs': 1 } def print_something() -> None: """ Печатает текст. :return: `None` """ print('$$PRINT') with DAG( dag_id='$$DAG_ID', schedule_interval='$$SCHEDULE_INTERVAL', default_args=args, tags=$$TAGS, concurrency=1 ) as dag: start = EmptyOperator( task_id='start', ) print_something = PythonOperator( task_id='print_something', python_callable=print_something, ) end = EmptyOperator( task_id='end', ) start >> print_something >> end
Рекомендую изменить наименование файла в .txt чтобы IDE не ругалась на «ошибки».
Генерация DAG
Ранее мы изменили наш исходный DAG под шаблон, теперь необходимо указать все ключи, которые будут изменяться в одном файле. Я выбрал самый привычный и, как по мне, удобный вариант – JSON.
В заголовке я указываю название DAG, далее указываю все ключи, которые мы ранее создали в нашем шаблоне.
Если мы заходим шаблонизировать какое-то поле, то добавляем его в шаблон и добавляем затем в наш config.
{ "template": { "OWNER": "k0rsakov", "START_DATE": "datetime(2023, 3, 10, tzinfo=pendulum.timezone('Europe/Moscow'))", "PRINT": "something", "DAG_ID": "template", "SCHEDULE_INTERVAL": "10 0 * * *", "TAGS": "['template', 'test']" } }
Создание «генератора» DAG
На данный момент мы имеем: шаблон типового DAG, имеем config файл с информацией о будущих DAG.
Теперь мы создадим саму фабрику, которая нам будет «генерировать» DAG.
Первая функция, которая будет изменять все ключи в шаблоне, которые есть в нашем config-файле:
Функция замены ключей в шаблоне
def replace_template_variables(template_dag: str = None, dict_variables: dict = None) -> str: """ Функция, которая итерируется по всем ключам ключа основного словаря берет оттуда значение и подставляет в шаблон. Пример: ```json { "test": { "OWNER": "airflow", "DAG_ID": "test", "PK": "id" ... } } ``` На вход поступает значения по ключу `test` и функция итерируется по ключам: `OWNER`, `DAG_ID`, `PK`, ... Берет значения по ключу и заменяет шаблон в указанном `template_dag`. Соответственно, по всем ключам, которые есть в словаре DAG будет произведена замена по шаблону. @param template_dag: Шаблон DAG; default 'None'. @param dict_variables: Словарь со значениями, которые необходимо поменять в шаблоне; default 'None'. @return: Измененный шаблон на основании значений по ключам. """ for variables_ in dict_variables: template_dag = template_dag.replace(f'$${variables_}', f'{dict_variables[variables_]}') return template_dag
Далее создадим функцию, которая будет итерироваться по всем ключам config-файла указанного типа шаблона:
Функция для итерации по ключам выбранного типа шаблона
def dag_factory(type_dag: str = None) -> None: """ Функция, которая генерирует DAG на основании выбранного `type_dag` и выбранных config на основании `type_dag`. Пример: Если указан `print_something` в `type_dag`, то функция для генерации DAG будет использовать config: "config_json_print_something.json" и сохранит сгенерированные DAG в папку: //dags/json_dags/print_something/<print_something_dag_name.py> @param type_dag: Указывается тип DAG для генерации print_something|etc; default 'None'. @return: Ничего не возвращает, а сохраняет сгенерированный DAG по определенному пути. """ with open(f'dag_config_json_{type_dag}/config_{type_dag}_dag.json') as j: json_config = j.read() json_config = json.loads(json_config) with open(f'templates_dags/{type_dag}.txt', mode='r') as f: template = f.read() for dag in json_config: modified_template = replace_template_variables(template_dag=template, dict_variables=json_config[dag]) with open(f'../dags/json_dags/{type_dag}/{dag}.py', mode='w') as dag_pyfile: dag_pyfile.write(modified_template)
Объеденим обе функции в один файл и запустим полученный скрипт.
Получим сгенерированный DAG, который будет содержать все наши значения из config-файла.

И DAG появился в нашем веб-интерфейсе:

Масштабирование
Давайте теперь сгенерируем новые DAG по заданному шаблону.


Таким образом можно быстро создавать однотипные DAG.
Рекомендации
-
Для шаблона использовать хорошо проверенные DAG.
-
Продумать заранее все поля, которые будут шаблонизироваться.
-
Продумать поля, которые не нужны во всех DAG и сделать для них обработку исключений, чтобы можно было не указывать какой-то ключ.
-
Держать в голове ту мысль, что если изменить шаблон и запустить фабрику DAG, то изменения подтянутся во все генерируемые DAG.
-
Продумать вариант удаления DAG, которые исключены из config-файлов, сейчас такое не реализовано (я просто руками удаляю).
-
Использовать отдельную папку для генерируемых DAG, чтобы не смешивать рукописные и шаблонные DAG.
-
Не забывайте про идемпотентность.
-
Правильно используйте переменные из контекста DAG.
Итог
Фабрика DAG очень полезна, потому что позволяет не копировать писать руками однотипные DAG. Можно создать универсальные шаблоны для разных типов DAG:
-
Перезаписывают каждый раз табличку.
-
Обновляют значения по ключу.
-
Добавляют в конец.
-
Прочие однотипные DAG.
Я использую данное решение в проде и данный вариант проверен временем.
ссылка на оригинал статьи https://habr.com/ru/post/722688/
Добавить комментарий