Apache Airflow долгое время ассоциировался с таким стилем описания workflow:
# объявляем задачи-таскиtask1 = PythonOperator(...)task2 = BashOperator(...)# проставляем зависимости между нимиtask1 >> task2
Это рабочий и до сих пор актуальный подход, но с Airflow 2.0.0 появился TaskFlow API — способ описывать DAG-и через обычные Python функции и декораторы:
@dag(dag_id="linear_demo")def tutorial_dag() @task def extract(): return 42 @task def transform(x): return x * 2 # описываем зависимости и строим Flow y = transform(extract())# создаем даг tutorial_dag()
TaskFlow в Airflow позволяет описывать DAG как обычный Python-код: @dag задает сам workflow/DAG, а @task превращает Python-функции в задачи Airflow. При вызове декорированных функций, например transform(extract()), выполняется не сам расчет, а создаются объекты задач, связи между ними и ссылки на их будущие результаты (через объект XComArg).
То есть TaskFlow — это декларативный DSL(domain-specific language) для построения DAG, где вызовы функций не выполняют вычисления, а описывают граф зависимостей.
Задачи статьи
В этой статье попробуем:
-
Заглянуть внутрь Airflow и понять, как работает TaskFlow API (для версии 3.2.1)
-
На основе этих идей написать собственный микро-фреймворк для закрепления понимания.
-
Сохранить названия и общую логику внутренних объектов Airflow.
-
Понять главный архитектурный принцип: описание DAG ≠ выполнение DAG.
Итак, давайте еще раз рассмотрим основные фазы TaskFlow:
-
Создается DAG через @dag и вызов функции DAG-а.
В этот момент Airflow создает объект DAG, входит в его контекст и начинает исполнять тело функции DAG-а для сборки графа. -
Внутри тела DAG-а @task декорирует Python-функции.
То есть extract, transform становятся не обычными функциями, а объектами-декораторами (_TaskDecorator), которые умеют создавать Airflow-задачи. -
При вызове декорированных функций, например
y = transform(extract())создаются объекты задач (операторы) и зависимости между задачами.
Именно здесь фактически собирается граф DAG. -
Позже scheduler и worker исполняют уже собранный DAG.
Что делает @task
Объект task в Airflow является специальным вызываемым объектом (TaskDecoratorCollection). При декорировании функции он создаёт другой объект _TaskDecorator. см. исходники в [task-sdk\src\airflow\sdk\definitions\decorators\__init__.py]
Упрощённо:
class TaskDecoratorCollection: def __call__(self, function): return _TaskDecorator(function)task = TaskDecoratorCollection()
Напомню:
@taskdef extract():
эквивалентно: extract = task(extract)
А значит будет вызван:
TaskDecoratorCollection.__call__()
который вернёт _TaskDecorator, объявлен в [task-sdk\src\airflow\sdk\bases\decorator.py]
|
Переход от TaskDecoratorCollection к TaskDecorator немного запутан, если проследить — через _getattr__(«python»), достает Python task decorator из provider registry
Далее вызывает task_decorator_factory(…), который и возвращает _TaskDecorator |
Что такое _TaskDecorator
Это внутренний объект, который стоит за @task. Он хранит исходную функцию и умеет превращать её вызов в задачу DAG.
class _TaskDecorator: # храним функцию def __init__(self, function): self.function = function # создается оператор Airflow и оборачивается в XComArg def __call__(self, ...): op = BaseOperator( python_callable = self.function, ... ) return XComArg(op)
Результат
-
создаёт operator
-
регистрирует его в DAG
-
возвращает ссылку на результат
Что такое XComArg
XComArg — это ленивая ссылка на будущий результат задачи, т.е. это не само значение, а декларативная ссылка на результат upstream-задачи, которая будет разрешена только во время выполнения DAG.
В нашем простом случае мы просто обернем оператор в блок init:
class XComArg: def __init__(self, operator): self.operator = operator
Смысл такой: _TaskDecorator создаёт operator, наружу возвращается XComArg(operator) и в логике проверяя аргументы, если видим XComArg, то понимаем что это задача.
То есть:
BaseOperator = задача в DAG
XComArg = ссылка на output этой задачи
Что такое оператор в Airflow
Оператор — это объект задачи в DAG. Идея:
operator = узел графа + правила его выполнения
Оператор описывает:
-
что запускать
-
от чего зависит задача
-
как её выполнять
-
параметры retries / pools / queue / timeout
В нашем учебном примере оператор хранит Python-функцию, а в реальном Airflow это может быть любая логика выполнения, не только Python-функция. Также нужен даг (чтобы зарегистрировать там задачу). В нашем примере это будет просто глобальная переменная _CURRENTDAG.
При создании объекта оператора мы также проставляем зависимости между задачами в текущем даге:
-
upstream_task_ids — Идентификаторы upstream-задач, от которых зависит текущая задача
-
downstream_task_ids — Идентификаторы downstream-задач, которые зависят от текущей задачи
-
dag.add_task(self) — добавляем текущий оператор в текущий даг
# храним текущий даг здесь_CURRENT_DAG = Noneclass BaseOperator: def __init__(self, python_callable, ...): # Идентификатор задачи. # В этом учебном примере это просто имя Python-функции. self.task_id = python_callable.__name__ # Python-функция, которую будет выполнять эта задача. self.python_callable = python_callable # Для простоты берем текущий DAG из глобального контекста. self.dag = _CURRENT_DAG # Идентификаторы upstream-задач, от которых зависит текущая задача. self.upstream_task_ids = set() # Идентификаторы downstream-задач, которые зависят от текущей задачи. self.downstream_task_ids = set() # Ищем XComArg в аргументах задачи и по ним строим зависимости # upstream_task_ids и downstream_task_ids self._set_xcomarg_dependencies() # Добавляем задачу в DAG. self.dag.add_task(self) # Выполнение задачи. def execute(self, context): return self.python_callable()
Реальные наследники BaseOperator
В Airflow это, например:
-
PythonOperator
-
BashOperator
-
Sensor operators
-
SQL operators
Что такое DAG и @dag
DAG — объект, который хранит описание workflow как граф задач. Он отвечает за:
-
dag_id, идентификатор
-
список задач
-
зависимости между задачами
-
контекст with DAG(…) и т.д.
В TaskFlow задается через декоратор @dag, но по сути @dag это удобная обертка над with DAG(...), см. [task-sdk\src\airflow\sdk\definitions\dag.py] в реализации декоратора def dag.
То есть конструкция вида:
@dag(...) def tutorial_dag(): ...
по смыслу близка к:
def tutorial_dag(): with DAG(...) as dag_obj: ... return dag_obj
Для примера я сделал лишь версию с контекстным менеджером, чтобы не усложнять. И напомню, что у нас это будет глобальная переменная, в которую будем писать название текущего дага при входе/выходе из контекста.:
_CURRENTDAG = Noneclass DAG: def __init__(self, dag_id): self.dag_id = dag_id # список задач для этого дага self.task_dict = {} # вызывается при создании оператора def add_task(self, task): self.task_dict[task.task_id] = task @property def tasks(self): return list(self.task_dict.values()) # протокол контекстного менеджера def __enter__(self): global CURRENTDAG _CURRENTDAG = self return self def __exit__(self, exc_type, exc, tb): global CURRENTDAG _CURRENTDAG = None
Почему нужен with DAG(…)
Когда создаётся задача внутри блока:
with DAG("demo"): x = extract()
новая задача автоматически привязывается к текущему DAG.
Выполнение и что такое TaskInstance
Когда DAG уже описан, наступает runtime. Scheduler анализирует DAG и планирует выполнение задач, а worker исполняет конкретные TaskInstance — то есть конкретные запуски конкретных задач.
Идея:
-
BaseOperator — описание задачи
-
TaskInstance — конкретное исполнение этой задачи
Например, TaskInstance можно представить как:
-
task_id = «extract»
-
run_id или логическая дата запуска DAG-а
-
try_number = 2
При выполнении задачи ее результат может автоматически сохраняться через механизм XCom (механизм передачи данных между задачами).
-
в учебном примере результат сохраняется в XComStore
-
в настоящем Airflow результат сохраняется в XCom backend / metadata database
# хранилище сохраненных значенийclass XComStore: def __init__(self) -> None: self.values: dict[tuple[str, str, str], Any] = {} def push(self, dag_id: str, task_id: str, key: str, value: Any) -> None: self.values[(dag_id, task_id, key)] = value def pull(self, dag_id: str, task_id: str, key: str = XCOM_RETURN_KEY) -> Any: return self.values[(dag_id, task_id, key)]class TaskInstance: def __init__(self, task: BaseOperator, xcom_store: XComStore): self.task = task self.xcom_store = xcom_store def xcom_push(self, key: str, value: Any) -> None: self.xcom_store.push(...) def xcom_pull(self, task_ids: str, key: str = XCOM_RETURN_KEY) -> Any: return self.xcom_store.pull(...) def run(self): # запускаем на исполнение result = self.task.execute(context={}) # сохраняем результат в хранилище self.xcom_store.push(self.task.task_id, result)
Что такое XCom
XCom — механизм передачи данных между задачами. В примере это просто словарь c ключем (dag_id, task_id, key). В настоящем Airflow XCom хранится как записи в metadata database в модели XComModel, на практике это часто PostgreSQL.
Определяется в [airflow-core\src\airflow\models\xcom.py]
То есть по имени дага, таски, ключу можно получить что там сохранили. Например:
@taskdef extract(): return 42
Возвращаемое значение TaskFlow-задачи автоматически сериализуется и сохраняется как XCom под специальным ключом return_value. Следующая задача может получить его:
@taskdef transform(x): return x * 2
TaskFlow API делает это автоматически. В классическом стиле Airflow можно делать вручную: ti.xcom_pull(task_ids="extract"),здесь ti — это экземпляр TaskInstance.
В реальном Airflow обычно недостаточно только (dag_id, task_id, key) — еще важны run_id и map_index
-
run_id — идентификатор конкретного запуска DAG-а
-
map_index нужен для dynamic task mapping, когда одна задача разворачивается в несколько параллельных task instances
Что важно понимать
XCom хранится в metadata DB Airflow, поэтому передача больших объектов через XCom может резко замедлить scheduler и webserver. Поэтому XCom предназначен для небольших данных: числа, строки, json, id, пути к файлам, метаданные. Не стоит передавать большие DataFrame. Лучше:
1) task1 пишет parquet
2) task2 получает путь через XCom
Общий итог
Мы реализовали очень упрощенную версию следующих объектов airflow, попытались сохранить внутреннюю логику и названия:
-
DAG
-
Operator
-
TaskDecorator
-
XComArg
-
TaskInstance
-
XCom
То есть рассмотрели основные концепции Airflow и TaskFlow API.
А далее — минимальный рабочий пример . В нем добавлен объект LinearTaskRunner, который умеет запускать наш линейный ETL.
from __future__ import annotationsfrom collections.abc import Callablefrom typing import Any_CURRENT_DAG = NoneXCOM_RETURN_KEY = "return_value"class DAG: def __init__(self, dag_id: str) -> None: self.dag_id = dag_id self.task_dict: dict[str, BaseOperator] = {} def add_task(self, task: BaseOperator) -> None: # проверка на дубликат task #if task.task_id in self.task_dict and self.task_dict[task.task_id] is not task: # raise ValueError(f"Task id {task.task_id!r} already exists in DAG") self.task_dict[task.task_id] = task @property def tasks(self) -> list[BaseOperator]: return list(self.task_dict.values()) def get_task(self, task_id: str) -> BaseOperator: return self.task_dict[task_id] def __enter__(self) -> DAG: global _CURRENT_DAG _CURRENT_DAG = self return self def __exit__(self, exc_type: object, exc: object, tb: object) -> None: global _CURRENT_DAG _CURRENT_DAG = Noneclass XComStore: def __init__(self) -> None: self.values: dict[tuple[str, str, str], Any] = {} def push(self, dag_id: str, task_id: str, key: str, value: Any) -> None: self.values[(dag_id, task_id, key)] = value def pull(self, dag_id: str, task_id: str, key: str = XCOM_RETURN_KEY) -> Any: return self.values[(dag_id, task_id, key)]class XComArg: def __init__(self, operator: BaseOperator, key: str = XCOM_RETURN_KEY) -> None: self.operator = operator self.key = key# Базовый класс оператораclass BaseOperator: def __init__( self, python_callable: Callable[..., Any], args: tuple[Any, ...], kwargs: dict[str, Any], dag: DAG | None = None, ) -> None: self.task_id = python_callable.__name__ self.python_callable = python_callable self.args = args self.kwargs = kwargs self.dag = dag or _CURRENT_DAG self.upstream_task_ids: set[str] = set() self.downstream_task_ids: set[str] = set() self._set_xcomarg_dependencies() # кладем эту задачу для текущего дага if self.dag is not None: self.dag.add_task(self) def set_upstream(self, other: BaseOperator) -> None: self.upstream_task_ids.add(other.task_id) other.downstream_task_ids.add(self.task_id) def _set_xcomarg_dependencies(self) -> None: for arg in self.args: if isinstance(arg, XComArg): self.set_upstream(arg.operator) for arg in self.kwargs.values(): if isinstance(arg, XComArg): self.set_upstream(arg.operator) def execute(self, context: dict[str, TaskInstance]) -> Any: resolved_args = [ context["ti"].resolve(arg) if isinstance(arg, XComArg) else arg for arg in self.args ] resolved_kwargs = { key: context["ti"].resolve(value) if isinstance(value, XComArg) else value for key, value in self.kwargs.items() } return self.python_callable(*resolved_args, **resolved_kwargs)class TaskInstance: def __init__(self, task: BaseOperator, xcom_store: XComStore) -> None: self.task = task self.xcom_store = xcom_store def xcom_push(self, key: str, value: Any) -> None: self.xcom_store.push( dag_id=self.task.dag.dag_id, task_id=self.task.task_id, key=key, value=value, ) def xcom_pull(self, task_ids: str, key: str = XCOM_RETURN_KEY) -> Any: return self.xcom_store.pull( dag_id=self.task.dag.dag_id, task_id=task_ids, key=key, ) def resolve(self, value: Any) -> Any: if isinstance(value, XComArg): return self.xcom_pull(task_ids=value.operator.task_id, key=value.key) return value def run(self) -> Any: context = {"ti": self} result = self.task.execute(context) self.xcom_push(XCOM_RETURN_KEY, result) return resultclass LinearTaskRunner: def __init__(self, dag: DAG, xcom_store: XComStore) -> None: self.dag = dag self.xcom_store = xcom_store def run(self, task: BaseOperator) -> Any: self._run_task(task) return self.xcom_store.pull(self.dag.dag_id, task.task_id) def _run_task(self, task: BaseOperator) -> None: """ def _run_task(self, task): if task уже посчитан: return для каждого upstream: _run_task(upstream) запусти текущую задачу """ if (self.dag.dag_id, task.task_id, XCOM_RETURN_KEY) in self.xcom_store.values: return # Рекурсивный запуск upstream-задач for upstream_task_id in task.upstream_task_ids: upstream_task = self.dag.get_task(upstream_task_id) self._run_task(upstream_task) ti = TaskInstance(task=task, xcom_store=self.xcom_store) ti.run()class _TaskDecorator: def __init__(self, function: Callable[..., Any]) -> None: self.function = function def __call__(self, *args: Any, **kwargs: Any) -> XComArg: op = BaseOperator( python_callable=self.function, args=args, kwargs=kwargs, ) return XComArg(op)class TaskDecoratorCollection: def __call__(self, function: Callable[..., Any]) -> _TaskDecorator: return _TaskDecorator(function)task = TaskDecoratorCollection()with DAG("linear_demo") as linear_dag: @task def extract(): print("extract") return 3 @task def transform(x): print("transform") return x + 2 @task def load(x): print("load") return x * 10 result = load(transform(extract()))linear_xcom_store = XComStore()linear_runner = LinearTaskRunner(dag=linear_dag, xcom_store=linear_xcom_store)print(linear_runner.run(result.operator)) # 50print(linear_xcom_store.pull("linear_demo", "extract")) # 3print(linear_xcom_store.pull("linear_demo", "transform")) # 5print(linear_xcom_store.pull("linear_demo", "load")) # 50
ссылка на оригинал статьи https://habr.com/ru/articles/1033750/