Всё началось с того, что однажды мы решили развернуть Airflow для управления нашими ETL-процессами. И не просто развернули, а сделали это в K8s. Но зачем это понадобилось?
Наша задача была довольно проста: взять данные из одной системы и переложить их в другую. Главным хранилищем данных у нас был Greenplum. Эта система имеет интересное расширение под названием PXF (Pivotal Extensible Framework). С его помощью можно подключаться к различным источникам и забирать оттуда данные, а также отправлять их из Greenplum во внешние системы.
Но что такое PXF и почему он так крут?
-
Гибкость: PXF поддерживает разные типы данных и форматы. Текстовые файлы, последовательности, паркет, ORC, Avro — он справится со всем.
-
Производительность: PXF оптимизирован для распределённых систем. Он умеет эффективно выполнять параллельные запросы к внешним данным.
-
Интеграция с Greenplum: PXF предоставляет SQL-интерфейс для работы с внешними данными. Просто и знакомо!
-
Безопасность: PXF поддерживает механизмы аутентификации и авторизации.
Как это работает на практике? Представьте, что вам нужно забрать данные из MSSQL и сохранить их в Greenplum. Всё, что вам нужно сделать — это выполнить операцию:
insert into target_table select * from pxf_source_table
А если нужно отправить данные из Greenplum в другую систему?
Так же просто:
insert into pxf_target_table select * from source_view
Звучит здорово, правда? Но тут возникает вопрос: как управлять всеми этими процессами Вот тут-то и появляется Airflow! Мы стали создавать задачи в DAG’ах, используя PostgresOperator:
insert_salles= PostgresOperator( task_id='insert_salles', postgres_conn_id='PSQL-Greenplum-Connection', sql="""insert_salles.sql""" )
Всё бы гладко, но время шло, и количество таких задач росло. Через два года у нас уже было 156 задач! К ним добавились и обновления материализованных представлений. Поддерживать всё это стало сложно. Много кода, не все знают Python, не многие хотят с ним работать. И тут появилась задача: переделать систему так, чтобы её могли поддерживать все — DevOps, системные администраторы, SQL-специалисты, даже 1С-ники.
И тут мы задумались…
Вспомнили про старого друга
Внезапно нас осенило. Ansible! Помните эту штуку? Удобную, понятную практически всем IT-шникам благодаря использованию YAML. И тут мы подумали: а что если адаптировать идею Ansible под Airflow?
Идея была простой: создать YAML-файлы для описания DAG’ов и задач. При этом в задачах можно использовать любые операторы, как уже имеющиеся, так и разработанные заранее. Формат YAML-файла должен был быть похож на Ansible.
Вот как это выглядело:
config: dag_id: Название_DAG description: Описание schedule: Расписание catchup: False/True default_args: retries: кол-во повторений retry_delay_seconds: время ожидания между повторениями start_date: дата с которого работает DAG tags: - Тэг steps: - refresh_materialized_view: task_id: ID задачи conn_id: ID подключения view: название представления которое нужно обновить
Но как это работает?
-
config — это блок с настройками DAG. Все параметры, которые есть у
класса DAG в Airflow, можно добавлять здесь. По сути, этот блок будет
передаваться в класс DAG:DAG(config)
-
steps — это блок с информацией о всех задачах в DAG’е. Он формируется как список модулей, где передаются параметры модуля и их значения.
Например, refresh_materialized_view — это модуль, который мы заранее разработали. У него есть параметры: task_id , conn_id , view . Мы просто подгружаем этот класс и передаем в него нужные параметры. Но как это все организовать технически?
Структура каталогов
Для начала нам нужно было разобраться со структурой каталогов. Это позволило бы нам динамически подгружать кастомные модули. Вот что у нас получилось:
modules/ <название_модуля>/ module_operator.py
Например, для модуля refresh_materialized_view это выглядело так:
modules/ refresh_materialized_view/ module_operator.py
Теперь при разборе YAML-файла нам нужно просто подставить название
директории в строку импорта класса.
Хранение YAML-файлов
А как же хранить сами YAML-файлы? Учитывая, что их может быть много, мы решили разделять их по директориям внутри общей папки pipelines:
pipelines/ refresh_views/ pipeline1.yaml pipeline2.yaml transfer_data/ pipeline_transfer.yaml
Это позволяет объединять YAML-файлы в директории по логике работы. Так проще поддерживать систему и быстрее находить нужные DAG и задачи.
Реализация. Разбор YAML-файлов
Итак, у нас есть структура каталогов и YAML-файлы. Что дальше? Нужно научиться их читать и превращать в рабочие DAG’и. Звучит сложно? Давайте разберем по шагам.
Первым делом нам нужно найти все YAML-файлы. Как это сделать?
def _get_files_recursively(self, directory): for root, _, files in os.walk(directory): for file in files: full_path = os.path.join(root, file) self._files_dict[file] = full_path
Этот метод проходит по всем файлам в указанной директории и ее поддиректориях. Каждый найденный файл добавляется в словарь self._files_dict.
Но что если в директории окажутся не только YAML-файлы? Как отфильтровать только нужные?
def filter_dict_by_key_suffix(d, suffix): return {k: v for k, v in d.items() if k.endswith(suffix)} pipelines_dict = filter_dict_by_key_suffix(self._files_dict,'yaml')
Теперь у нас есть только YAML-файлы. Что дальше? Нужно превратить их в DAG’и! Но как это сделать? Посмотрим на код:
for pipeline, yaml_file in pipelines_dict.items(): data_dict = self._parse_yaml_file(yaml_file) creator = DAGCreator(data_dict['config'], data_dict.get('steps_config')) for step in data_dict['steps']: top_level_key = list(step.keys())[0] module_params = step[top_level_key] module_params['vars'] = {"files": self._files_dict} module_class = self._create_cls(top_level_key) creator.add_task(module_class, module_params) globals()[data_dict['config']['dag_id']] = creator.dag
_parse_yaml_file — это метод, который разбирает YAML-файл и преобразует его в словарь:
def _parse_yaml_file(self, file_path): try: with open(file_path, 'r') as f: data = yaml.safe_load(f) return data except FileNotFoundError: print(f"Файл не найден: {file_path}") except yaml.YAMLError as e: print(f"Ошибка при разборе YAML: {e}") return None
top_level_key — это название модуля, например refresh_materialized_view (как из примера YAML-файла выше).
module_params — это все параметры модуля из YAML-файла, например task_id, conn_id, view (как из примера YAML-файла выше).
Теперь, зная название модуля (наименование директории, в которой лежит модуль, согласно разработанной ранее структуре), мы можем импортировать класс модуля.
Для этого реализуем метод _create_cls.
def _create_cls(self, module_name): module = f"{self._pipeline_dir}.{self._modules_dir}.{module_name}.module_operator" module = importlib.import_module(module) return getattr(module, "ModuleOperator")
Здесь всё просто: мы динамически подгружаем класс ModuleOperator из файла module_operator.py, который лежит в директории с вызываемым модулем. В нашем случае это директория refresh_materialized_view.
creator = DAGCreator(data_dict[‘config’], data_dict.get(‘steps_config’)) — эта строка кода инициализирует класс, который создаёт DAG и добавляет в него новые задачи через метод add_task.
DAGCreator.add_task принимает класс модуля (Airflow Operator) и параметры этого модуля, и создаёт задачу, которую добавляет к созданному DAG’у.
globals()[data_dict[‘config’][‘dag_id’]] = creator.dag — добавляет новый DAG в Airflow. Без этой строки динамически созданные DAG’и не появятся в Airflow.
CreatePipelines
class CreatePipelines: def __init__(self, pipeline_dir): self._pipeline_dir = pipeline_dir self._base_pipeline_dir = 'pipelines' self._modules_dir = 'modules' self._files_dict = {} current_file_path = os.path.abspath(__file__) current_dir = os.path.dirname(current_file_path) self._get_files_recursively(os.path.join(current_dir, pipeline_dir, self._base_pipeline_dir)) def _get_files_recursively(self, directory): for root, _, files in os.walk(directory): for file in files: full_path = os.path.join(root, file) self._files_dict[file] = full_path def create(self): def filter_dict_by_key_suffix(d, suffix): return {k: v for k, v in d.items() if k.endswith(suffix)} pipelines_dict = filter_dict_by_key_suffix(self._files_dict, 'yaml') for pipeline, yaml_file in pipelines_dict.items(): data_dict = self._parse_yaml_file(yaml_file) print(data_dict['config']) creator = DAGCreator(data_dict['config'], data_dict.get('steps_config')) for step in data_dict['steps']: top_level_key = list(step.keys())[0] module_params = step[top_level_key] module_params['vars'] = {"files": self._files_dict} module_class = self._create_cls(top_level_key) creator.add_task(module_class, module_params) globals()[data_dict['config']['dag_id']] = creator.dag def _create_cls(self, module_name): module = f"{self._pipeline_dir}.{self._modules_dir}.{module_name}.module_operator" module = importlib.import_module(module) return getattr(module, "ModuleOperator") def _parse_yaml_file(self, file_path): try: with open(file_path, 'r') as f: data = yaml.safe_load(f) return data except FileNotFoundError: print(f"Файл не найден: {file_path}") except yaml.YAMLError as e: print(f"Ошибка при разборе YAML: {e}") return None
Реализация. Создание DAG’ов
Сперва реализуем класс DAGCreator, который принимает конфигурацию DAG’а и задач. А что делать дальше?
Посмотрим на конструктор класса DAGCreator:
def __init__(self, config, steps_config): self.config = config self.steps_config = steps_config self.is_step_parallels = steps_config.get('is_step_parallels') if steps_config else False self.dag = DAG(**self.config) self.previous_task = None
Здесь заполняются параметры и создается DAG:
self.dag = DAG(**self.config)
Также есть self.previous_task, который будет использоваться для построения цепочки задач, если они должны идти последовательно.
Но как добавить задачу к DAG’у? Для этого есть метод add_task:
def add_task(self, task_cls, task_params): task_params['dag'] = self.dag task = task_cls(**task_params) if not self.is_step_parallels: if self.previous_task: self.previous_task >> task self.previous_task = task
Что происходит при добавлении задачи?
-
В параметры задачи добавляется task_params[‘dag’] = self.dag , чтобы привязать ее к DAG’у.
-
Инициализируется новый оператор: task = task_cls(**task_params) .
-
Проверяется, как должны выполняться задачи — последовательно или параллельно:
-
Если последовательно, то задается цепочка self.previous_task >> task и обновляется значение self.previous_task = task .
-
Если параллельно, то ничего дополнительно делать не нужно.
-
DAGCreator
class DAGCreator: def __init__(self, config, steps_config): self.config = config self.steps_config = steps_config self.is_step_parallels = steps_config.get('is_step_parallels') if steps_config else False self.dag = DAG(**self.config) self.previous_task = None def add_task(self, task_cls, task_params): task_params['dag'] = self.dag task = task_cls(**task_params) print(task) if not self.is_step_parallels: if self.previous_task: self.previous_task >> task self.previous_task = task
Идем дальше. Теперь у вас есть все необходимое для создания DAG’ов по конфигурации.
Но как запустить этот процесс?
Создадим файл ENTRYPOINT.py со следующим содержимым:
CreatePipelines('dag_yaml').create()
Этот код создаст все DAG’и из YAML-файлов.
Следующее что делаем — реализуем модуль refresh_materialized_view. Помните, как устроены модули? Каждый лежит в отдельной папке с файлом module_operator.py:
modules/ refresh_materialized_view/ module_operator.py
И ModuleOperator наследуется от BaseOperator:
class ModuleOperator(BaseOperator): ...
Теперь нужно разобраться с конструктором. Как мы помним из YAML-файла, модуль refresh_materialized_view должен принимать следующие параметры:
-
task_id — это обязательный параметр для Airflow Operator;
-
conn_id — это ID соединения, который задается в UI Airflow;
-
view — наименование материализованного представления, которое нужно обновлять.
Как добавить подключение в Airflow
Теперь можно реализовать конструктор класса ModuleOperator.
from airflow.hooks.base_hook import BaseHook class ModuleOperator(BaseOperator): def __init__(self,view,conn_id, vars, *args, **kwargs): super().__init__(*args, **kwargs) self.view = view self.conn = BaseHook.get_connection(conn_id) self.vars = vars
BaseHook.get_connection(conn_id) — получает подключение (которое мы завели
ранее) по его идентификатору. Теперь нам нужно реализовать метод execute (он должен быть во всех операторах Airflow, так как именно он вызывается для запуска задачи).
def execute(self, context): ConnectionFabric[self.conn.conn_type]( conn_id=self.conn_id, view = self.view ).execute()
Мы применяем паттерн «фабрика», чтобы работать с разными типами БД. Класс для работы с СУБД выбирается по self.conn.conn_type. Так мы предусматриваем возможность подключения по PostgreSQL, но еще и других БД.
Класс ModuleOperator полностью:
class ModuleOperator(BaseOperator): def __init__(self,view,conn_id, vars, *args, **kwargs): super().__init__(*args, **kwargs) self.view = view self.conn = BaseHook.get_connection(conn_id) self.vars = vars def execute(self, context): ConnectionFabric[self.conn.conn_type]( conn_id=self.conn_id, view = self.view ).execute()
Рассмотрим теперь класс ModuleOperator. Здесь все достаточно просто:
class ConnectionFabric(metaclass=ConnectionFabricMeta): postgres = PostgresqlWorker
У нас есть список поддерживаемых СУБД и классов, которые работают с этим типом. Но для того, чтобы к классу ConnectionFabric можно было обращаться как к словарю
cls = ConnectionFabric['postgres'] #Вернет PostgresqlWorker cls() #PostgresqlWorker()
Нам нужно задать метакласс. У нас это ConnectionFabricMeta.
class ConnectionFabricMeta(type): def __getitem__(cls, name): return getattr(cls, name)
Как видно, в этом классе реализован метод getitem, который и будет возвращать нам атрибут класса по его наименованию. С классом ConnectionFabric и ConnectionFabricMeta все понятно. Идем далее. Теперь посмотрим на реализацию PostgresqlWorker, который и обновляет нам материализованные представления:
from airflow.providers.postgres.hooks.postgres import PostgresHook class PostgresqlWorker(BaseConnection): def execute(self): sql_template = f""" refresh materialized view "{ view_name }" """ PostgresHook(postgres_conn_id=self.conn.conn_id).run(sql_template )
Здесь все достаточно просто. Мы инициализируем класс PostgresHook (он работает с PostgreSQL), после чего вызываем метод run, в который передаем запрос на обновление материализованного представления. BaseConnection — это базовый класс для всех классов фабрики, чтобы разработчики в дальнейшем понимали, какой набор методов должен содержать данный класс. Также там определен конструктор, чтобы не писать его в каждом новом классе коннектора:
class BaseConnection: def __init__(self, conn, view): self.conn = conn self.view = view def execute(self, sql): ... def get_records(self, sql): ...
Что в итоге?
Мы реализовали систему, которая динамически создаёт DAG’и в Airflow по созданному YAML-файлу.
Что это дает на практике? Во-первых, теперь поддерживать и расширять нашу систему смогут даже те, кто не знаком с ее исходным кодом. Достаточно правильно описать задачи в YAML — и вуаля, новый DAG готов! Так что прощай, «бас фактор», здравствуй, легкая передача проекта новой команде!
Во-вторых, мы сэкономили время и усилия на добавлении новых задач. Никакой возни с Python-кодом, никаких ошибок и опечаток. Просто правим Как мы укротили 156 ETL-процессов в Airflow за один день: метод, который изменил работу DevOps 15 YAML — и Airflow сам позаботится об остальном. А высвободившееся время можно потратить на что-то по-настоящему важное.
Но не будем останавливаться на достигнутом! У нашего проекта большие планы на будущее:
-
Научиться использовать стандартные операторы Airflow (PythonOperator, SparkOperator и др.)
-
Задавать порядок выполнения задач в DAG’е
-
Передавать данные между задачами через XCom прямо из YAML
А как вы разрабатываете piplin’ы в Airflow? Делитесь опытом в комментариях
ссылка на оригинал статьи https://habr.com/ru/articles/854408/
Добавить комментарий