Airflow TaskFlow API: внутреннее устройство современного способа писать DAG-и

от автора

Apache Airflow долгое время ассоциировался с таким стилем описания workflow:

# объявляем задачи-таскиtask1 = PythonOperator(...)task2 = BashOperator(...)# проставляем зависимости между нимиtask1 >> task2

Это рабочий и до сих пор актуальный подход, но с Airflow 2.0.0 появился TaskFlow API — способ описывать DAG-и через обычные Python функции и декораторы:

@dag(dag_id="linear_demo")def tutorial_dag()    @task    def extract():       return 42        @task    def transform(x):       return x * 2        # описываем зависимости и строим Flow    y = transform(extract())# создаем даг    tutorial_dag()

TaskFlow в Airflow позволяет описывать DAG как обычный Python-код: @dag задает сам workflow/DAG, а @task превращает Python-функции в задачи Airflow. При вызове декорированных функций, например transform(extract()), выполняется не сам расчет, а создаются объекты задач, связи между ними и ссылки на их будущие результаты (через объект XComArg).

То есть TaskFlow — это декларативный DSL(domain-specific language) для построения DAG, где вызовы функций не выполняют вычисления, а описывают граф зависимостей.

Задачи статьи

В этой статье попробуем:

  1. Заглянуть внутрь Airflow и понять, как работает TaskFlow API (для версии 3.2.1)

  2. На основе этих идей написать собственный микро-фреймворк для закрепления понимания.

  3. Сохранить названия и общую логику внутренних объектов Airflow.

  4. Понять главный архитектурный принцип: описание DAG ≠ выполнение DAG.

Итак, давайте еще раз рассмотрим основные фазы TaskFlow:

  1. Создается DAG через @dag и вызов функции DAG-а.
    В этот момент Airflow создает объект DAG, входит в его контекст и начинает исполнять тело функции DAG-а для сборки графа.

  2. Внутри тела DAG-а @task декорирует Python-функции.
    То есть extract, transform становятся не обычными функциями, а объектами-декораторами (_TaskDecorator), которые умеют создавать Airflow-задачи.

  3. При вызове декорированных функций, например

    y = transform(extract())

    создаются объекты задач (операторы) и зависимости между задачами.
    Именно здесь фактически собирается граф DAG.

  4. Позже scheduler и worker исполняют уже собранный DAG.

Что делает @task

Объект task в Airflow является специальным вызываемым объектом (TaskDecoratorCollection). При декорировании функции он создаёт другой объект _TaskDecorator. см. исходники в [task-sdk\src\airflow\sdk\definitions\decorators\__init__.py]

Упрощённо:

class TaskDecoratorCollection:   def __call__(self, function):       return _TaskDecorator(function)task = TaskDecoratorCollection()

Напомню:

@taskdef extract():

эквивалентно: extract = task(extract)

А значит будет вызван:

TaskDecoratorCollection.__call__()

который вернёт _TaskDecorator, объявлен в [task-sdk\src\airflow\sdk\bases\decorator.py]

Переход от TaskDecoratorCollection к TaskDecorator немного запутан, если проследить — через  _getattr__(«python»), достает Python task decorator из provider registry

def python_task(...):
[providers\standard\src\airflow\providers\standard\decorators\python.py]

Далее вызывает task_decorator_factory(…), который и возвращает _TaskDecorator

Что такое _TaskDecorator

Это внутренний объект, который стоит за @task. Он хранит исходную функцию и умеет превращать её вызов в задачу DAG. 

class _TaskDecorator:    # храним функцию    def __init__(self, function):        self.function = function  # создается оператор Airflow и оборачивается в XComArg  def __call__(self, ...):      op = BaseOperator(          python_callable = self.function,          ...      )      return XComArg(op)

Результат

  • создаёт operator

  • регистрирует его в DAG

  • возвращает ссылку на результат

Что такое XComArg

XComArg — это ленивая ссылка на будущий результат задачи, т.е. это не само значение, а декларативная ссылка на результат upstream-задачи, которая будет разрешена только во время выполнения DAG.

В нашем простом случае мы просто обернем оператор в блок init:

class XComArg:   def __init__(self, operator):       self.operator = operator

Смысл такой: _TaskDecorator создаёт operator, наружу возвращается XComArg(operator) и в логике проверяя аргументы, если видим XComArg, то понимаем что это задача.

То есть:

BaseOperator = задача в DAG

XComArg = ссылка на output этой задачи

Что такое оператор в Airflow

Оператор — это объект задачи в DAG. Идея:

operator = узел графа + правила его выполнения

Оператор описывает:

  • что запускать

  • от чего зависит задача

  • как её выполнять

  • параметры retries / pools / queue / timeout

В нашем учебном примере оператор хранит Python-функцию, а в реальном Airflow это может быть любая логика выполнения, не только Python-функция. Также нужен даг (чтобы зарегистрировать там задачу). В нашем примере это будет просто глобальная переменная _CURRENTDAG.

При создании объекта оператора мы также проставляем зависимости между задачами в текущем даге:

  • upstream_task_ids — Идентификаторы upstream-задач, от которых зависит текущая задача

  • downstream_task_ids — Идентификаторы downstream-задач, которые зависят от текущей задачи

  • dag.add_task(self) — добавляем текущий оператор в текущий даг

# храним текущий даг здесь_CURRENT_DAG = Noneclass BaseOperator:    def __init__(self, python_callable, ...):        # Идентификатор задачи.        # В этом учебном примере это просто имя Python-функции.        self.task_id = python_callable.__name__        # Python-функция, которую будет выполнять эта задача.        self.python_callable = python_callable        # Для простоты берем текущий DAG из глобального контекста.        self.dag = _CURRENT_DAG        # Идентификаторы upstream-задач, от которых зависит текущая задача.        self.upstream_task_ids = set()        # Идентификаторы downstream-задач, которые зависят от текущей задачи.        self.downstream_task_ids = set()        # Ищем XComArg в аргументах задачи и по ним строим зависимости        # upstream_task_ids и downstream_task_ids        self._set_xcomarg_dependencies()        # Добавляем задачу в DAG.        self.dag.add_task(self)    # Выполнение задачи.    def execute(self, context):        return self.python_callable()

Реальные наследники BaseOperator

В Airflow это, например:

  • PythonOperator

  • BashOperator

  • Sensor operators

  • SQL operators

Что такое DAG и @dag

DAG — объект, который хранит описание workflow как граф задач. Он отвечает за:

  • dag_id, идентификатор

  • список задач

  • зависимости между задачами

  • контекст with DAG(…) и т.д.

В TaskFlow задается через декоратор @dag, но по сути @dag это удобная обертка над with DAG(...), см. [task-sdk\src\airflow\sdk\definitions\dag.py] в реализации декоратора def dag.

То есть конструкция вида:

@dag(...) def tutorial_dag():       ...

по смыслу близка к:

def tutorial_dag():      with DAG(...) as dag_obj:    ...         return dag_obj

Для примера я сделал лишь версию с контекстным менеджером, чтобы не усложнять. И напомню, что у нас это будет глобальная переменная, в которую будем писать название текущего дага при входе/выходе из контекста.:

_CURRENTDAG = Noneclass DAG:    def __init__(self, dag_id):        self.dag_id = dag_id        # список задач для этого дага        self.task_dict = {}        # вызывается при создании оператора    def add_task(self, task):        self.task_dict[task.task_id] = task    @property    def tasks(self):        return list(self.task_dict.values())        # протокол контекстного менеджера    def __enter__(self):        global CURRENTDAG        _CURRENTDAG = self        return self    def __exit__(self, exc_type, exc, tb):        global CURRENTDAG        _CURRENTDAG = None

Почему нужен with DAG(…)

Когда создаётся задача внутри блока:

with DAG("demo"):   x = extract()

новая задача автоматически привязывается к текущему DAG.

Выполнение и что такое TaskInstance

Когда DAG уже описан, наступает runtime. Scheduler анализирует DAG и планирует выполнение задач, а worker исполняет конкретные TaskInstance — то есть конкретные запуски конкретных задач.

Идея:

  • BaseOperator — описание задачи

  • TaskInstance — конкретное исполнение этой задачи

Например, TaskInstance можно представить как:

  • task_id = «extract»

  • run_id или логическая дата запуска DAG-а

  • try_number = 2

При выполнении задачи ее результат может автоматически сохраняться через механизм XCom (механизм передачи данных между задачами).

  • в учебном примере результат сохраняется в XComStore

  • в настоящем Airflow результат сохраняется в XCom backend / metadata database

# хранилище сохраненных значенийclass XComStore:    def __init__(self) -> None:        self.values: dict[tuple[str, str, str], Any] = {}    def push(self, dag_id: str, task_id: str, key: str, value: Any) -> None:        self.values[(dag_id, task_id, key)] = value    def pull(self, dag_id: str, task_id: str, key: str = XCOM_RETURN_KEY) -> Any:        return self.values[(dag_id, task_id, key)]class TaskInstance:    def __init__(self, task: BaseOperator, xcom_store: XComStore):        self.task = task        self.xcom_store = xcom_store    def xcom_push(self, key: str, value: Any) -> None:        self.xcom_store.push(...)            def xcom_pull(self, task_ids: str, key: str = XCOM_RETURN_KEY) -> Any:        return self.xcom_store.pull(...)    def run(self):        # запускаем на исполнение        result = self.task.execute(context={})        # сохраняем результат в хранилище        self.xcom_store.push(self.task.task_id, result)

Что такое XCom

XCom — механизм передачи данных между задачами. В примере это просто словарь c ключем (dag_id, task_id, key). В настоящем Airflow XCom хранится как записи в metadata database в модели XComModel, на практике это часто PostgreSQL.

Определяется в [airflow-core\src\airflow\models\xcom.py]

То есть по имени дага, таски, ключу можно получить что там сохранили. Например:

@taskdef extract():   return 42

Возвращаемое значение TaskFlow-задачи автоматически сериализуется и сохраняется как XCom под специальным ключом return_value. Следующая задача может получить его:

@taskdef transform(x):   return x * 2

TaskFlow API делает это автоматически. В классическом стиле Airflow можно делать вручную: ti.xcom_pull(task_ids="extract"),здесь ti — это экземпляр TaskInstance.

В реальном Airflow обычно недостаточно только (dag_id, task_id, key) — еще важны run_id и map_index

  • run_id  — идентификатор конкретного запуска DAG-а

  • map_index нужен для dynamic task mapping, когда одна задача разворачивается в несколько параллельных task instances

Что важно понимать

XCom хранится в metadata DB Airflow, поэтому передача больших объектов через XCom может резко замедлить scheduler и webserver. Поэтому XCom предназначен для небольших данных: числа, строки, json, id, пути к файлам, метаданные. Не стоит передавать большие DataFrame. Лучше:

1) task1 пишет parquet

2) task2 получает путь через XCom

Общий итог

Мы реализовали очень упрощенную версию следующих объектов airflow, попытались сохранить внутреннюю логику и названия:

  • DAG

  • Operator

  • TaskDecorator

  • XComArg

  • TaskInstance

  • XCom

То есть рассмотрели основные концепции Airflow и TaskFlow API.

А далее — минимальный рабочий пример . В нем добавлен объект LinearTaskRunner, который умеет запускать наш линейный ETL.

from __future__ import annotationsfrom collections.abc import Callablefrom typing import Any_CURRENT_DAG = NoneXCOM_RETURN_KEY = "return_value"class DAG:    def __init__(self, dag_id: str) -> None:        self.dag_id = dag_id        self.task_dict: dict[str, BaseOperator] = {}    def add_task(self, task: BaseOperator) -> None:        # проверка на дубликат task        #if task.task_id in self.task_dict and self.task_dict[task.task_id] is not task:        #    raise ValueError(f"Task id {task.task_id!r} already exists in DAG")        self.task_dict[task.task_id] = task    @property    def tasks(self) -> list[BaseOperator]:        return list(self.task_dict.values())    def get_task(self, task_id: str) -> BaseOperator:        return self.task_dict[task_id]    def __enter__(self) -> DAG:        global _CURRENT_DAG        _CURRENT_DAG = self        return self    def __exit__(self, exc_type: object, exc: object, tb: object) -> None:        global _CURRENT_DAG        _CURRENT_DAG = Noneclass XComStore:    def __init__(self) -> None:        self.values: dict[tuple[str, str, str], Any] = {}    def push(self, dag_id: str, task_id: str, key: str, value: Any) -> None:        self.values[(dag_id, task_id, key)] = value    def pull(self, dag_id: str, task_id: str, key: str = XCOM_RETURN_KEY) -> Any:        return self.values[(dag_id, task_id, key)]class XComArg:    def __init__(self, operator: BaseOperator, key: str = XCOM_RETURN_KEY) -> None:        self.operator = operator        self.key = key# Базовый класс оператораclass BaseOperator:    def __init__(        self,        python_callable: Callable[..., Any],        args: tuple[Any, ...],        kwargs: dict[str, Any],        dag: DAG | None = None,    ) -> None:        self.task_id = python_callable.__name__        self.python_callable = python_callable        self.args = args        self.kwargs = kwargs        self.dag = dag or _CURRENT_DAG        self.upstream_task_ids: set[str] = set()        self.downstream_task_ids: set[str] = set()        self._set_xcomarg_dependencies()        # кладем эту задачу для текущего дага        if self.dag is not None:            self.dag.add_task(self)    def set_upstream(self, other: BaseOperator) -> None:        self.upstream_task_ids.add(other.task_id)        other.downstream_task_ids.add(self.task_id)    def _set_xcomarg_dependencies(self) -> None:        for arg in self.args:            if isinstance(arg, XComArg):                self.set_upstream(arg.operator)        for arg in self.kwargs.values():            if isinstance(arg, XComArg):                self.set_upstream(arg.operator)    def execute(self, context: dict[str, TaskInstance]) -> Any:        resolved_args = [            context["ti"].resolve(arg) if isinstance(arg, XComArg) else arg            for arg in self.args        ]        resolved_kwargs = {            key: context["ti"].resolve(value) if isinstance(value, XComArg) else value            for key, value in self.kwargs.items()        }        return self.python_callable(*resolved_args, **resolved_kwargs)class TaskInstance:    def __init__(self, task: BaseOperator, xcom_store: XComStore) -> None:        self.task = task        self.xcom_store = xcom_store    def xcom_push(self, key: str, value: Any) -> None:        self.xcom_store.push(            dag_id=self.task.dag.dag_id,            task_id=self.task.task_id,            key=key,            value=value,        )    def xcom_pull(self, task_ids: str, key: str = XCOM_RETURN_KEY) -> Any:        return self.xcom_store.pull(            dag_id=self.task.dag.dag_id,            task_id=task_ids,            key=key,        )    def resolve(self, value: Any) -> Any:        if isinstance(value, XComArg):            return self.xcom_pull(task_ids=value.operator.task_id, key=value.key)        return value    def run(self) -> Any:        context = {"ti": self}        result = self.task.execute(context)        self.xcom_push(XCOM_RETURN_KEY, result)        return resultclass LinearTaskRunner:    def __init__(self, dag: DAG, xcom_store: XComStore) -> None:        self.dag = dag        self.xcom_store = xcom_store    def run(self, task: BaseOperator) -> Any:        self._run_task(task)        return self.xcom_store.pull(self.dag.dag_id, task.task_id)    def _run_task(self, task: BaseOperator) -> None:        """ def _run_task(self, task):                if task уже посчитан:                    return                для каждого upstream:                    _run_task(upstream)                запусти текущую задачу        """        if (self.dag.dag_id, task.task_id, XCOM_RETURN_KEY) in self.xcom_store.values:            return        # Рекурсивный запуск upstream-задач        for upstream_task_id in task.upstream_task_ids:            upstream_task = self.dag.get_task(upstream_task_id)            self._run_task(upstream_task)        ti = TaskInstance(task=task, xcom_store=self.xcom_store)        ti.run()class _TaskDecorator:    def __init__(self, function: Callable[..., Any]) -> None:        self.function = function    def __call__(self, *args: Any, **kwargs: Any) -> XComArg:        op = BaseOperator(            python_callable=self.function,            args=args,            kwargs=kwargs,        )        return XComArg(op)class TaskDecoratorCollection:    def __call__(self, function: Callable[..., Any]) -> _TaskDecorator:        return _TaskDecorator(function)task = TaskDecoratorCollection()with DAG("linear_demo") as linear_dag:    @task    def extract():        print("extract")        return 3    @task    def transform(x):        print("transform")        return x + 2    @task    def load(x):        print("load")        return x * 10    result = load(transform(extract()))linear_xcom_store = XComStore()linear_runner = LinearTaskRunner(dag=linear_dag, xcom_store=linear_xcom_store)print(linear_runner.run(result.operator))  # 50print(linear_xcom_store.pull("linear_demo", "extract"))  # 3print(linear_xcom_store.pull("linear_demo", "transform"))  # 5print(linear_xcom_store.pull("linear_demo", "load"))  # 50

ссылка на оригинал статьи https://habr.com/ru/articles/1033750/