Apache Airflow на практике: погружение в инструмент для оркестрации ETL-процессов

от автора

Apache Airflow давно стал стандартом в мире Data Engineering благодаря своей гибкости, масштабируемости и богатой экосистеме. В этой статье мы подробно разберём, что такое Airflow, почему он так популярен, как эффективно использовать его в аналитической архитектуре, а также предоставим максимально подробную инструкцию по развертыванию Airflow.

Что такое Apache Airflow?

Apache Airflow — это платформа с открытым исходным кодом для создания, планирования и мониторинга рабочих процессов (workflow). Основная сила Airflow заключается в возможности организации сложных ETL-процессов в виде управляемых графов задач (DAG — Directed Acyclic Graph).

Основные компоненты Airflow

DAG (Directed Acyclic Graph)

DAG — это структурированный набор задач, связанных между собой четко определенными зависимостями. Основные характеристики DAG:

  • Задачи выполняются согласно строгому порядку, без циклических зависимостей.

  • Каждый DAG описывается с помощью кода Python, что делает процесс разработки и поддержки интуитивно понятным.

  • DAG позволяет ясно визуализировать и контролировать последовательность задач.

Task

Task (задача) является основной рабочей единицей внутри DAG:

  • Может выполнять любой код на Python, SQL-запросы или системные команды.

  • Использует специальные операторы Airflow (например, PythonOperator, BashOperator, PostgresOperator).

  • Позволяет конфигурировать параметры выполнения, такие как время запуска, поведение при ошибках и условия выполнения.

Scheduler

Scheduler (планировщик) отвечает за запуск DAG в соответствии с заданным расписанием:

  • Регулярно проверяет статус всех задач и запускает новые задачи в соответствии с графом.

  • Управляет состоянием задач (запуск, повторная попытка, пропуск, завершение).

  • Обеспечивает корректную обработку расписаний и зависимостей.

Executor

Executor управляет выполнением задач, распределением нагрузки и масштабированием:

  • SequentialExecutor: Запускает задачи последовательно (подходит для разработки и тестирования).

  • LocalExecutor: Запускает задачи параллельно на одном узле.

  • CeleryExecutor: Использует Celery для распределенного выполнения задач на нескольких узлах.

  • KubernetesExecutor: Запускает задачи в виде отдельных подов в Kubernetes, обеспечивая динамическое масштабирование.

Webserver

Webserver предоставляет веб-интерфейс для управления и мониторинга процессов Airflow:

  • Позволяет визуально отслеживать выполнение задач и графов.

  • Поддерживает интерфейс для ручного запуска и перезапуска задач.

  • Содержит встроенные инструменты для просмотра логов и отладки.

Metadata Database

Metadata Database (база метаданных) хранит информацию о всех DAG, задачах и их состоянии:

  • Обычно используется PostgreSQL или MySQL для хранения метаданных.

  • Содержит исторические данные о выполнении задач, позволяя анализировать производительность и ошибки.

  • Обеспечивает сохранение состояния даже после перезапуска Airflow.

Подробная инструкция по развертыванию Apache Airflow

Шаг 1: Подготовка окружения

Используйте Linux-сервер с установленным Docker и Docker Compose.

Установите Docker:

sudo apt update sudo apt install docker.io docker-compose -y sudo systemctl start docker sudo systemctl enable docker

Шаг 2: Создание файла Docker Compose

Создайте директорию проекта:

mkdir airflow cd airflow

Создайте файл docker-compose.yaml:

version: '3' services:   postgres:     image: postgres:13     environment:       POSTGRES_USER: airflow       POSTGRES_PASSWORD: airflow       POSTGRES_DB: airflow     ports:       - "5432:5432"    redis:     image: redis:latest     ports:       - "6379:6379"    webserver:     image: apache/airflow:2.9.1     depends_on:       - postgres       - redis     environment:       AIRFLOW__CORE__EXECUTOR: CeleryExecutor       AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow       AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0       AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow       AIRFLOW__CORE__FERNET_KEY: ''       AIRFLOW__WEBSERVER__SECRET_KEY: ''     ports:       - "8080:8080"     command: webserver    scheduler:     image: apache/airflow:2.9.1     depends_on:       - webserver     environment:       AIRFLOW__CORE__EXECUTOR: CeleryExecutor       AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow       AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0       AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow     command: scheduler    worker:     image: apache/airflow:2.9.1     depends_on:       - scheduler     environment:       AIRFLOW__CORE__EXECUTOR: CeleryExecutor       AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow       AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0       AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow     command: celery worker

Шаг 3: Инициализация базы данных и создание пользователя

Запустите контейнеры:

sudo docker-compose up airflow-init

Шаг 4: Запуск Airflow

Запустите Airflow в режиме демона:

sudo docker-compose up -d

Шаг 5: Доступ к Airflow Web UI

Перейдите по адресу: http://localhost:8080

Стандартные учетные данные:

  • логин: airflow

  • пароль: airflow

Создание первого DAG

Создание DAG начинается с определения задач и их последовательности. Рассмотрим этот процесс максимально подробно:

Шаг 1: Импорт необходимых библиотек

from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime  def extract():     print("Extracting data...")  def transform():     print("Transforming data...")  def load():     print("Loading data into warehouse...")  with DAG('etl_pipeline', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:     extract_task = PythonOperator(task_id='extract', python_callable=extract)     transform_task = PythonOperator(task_id='transform', python_callable=transform)     load_task = PythonOperator(task_id='load', python_callable=load)      extract_task >> transform_task >> load_task

Шаг 2: Определение функций задач

Каждая задача представляет собой отдельную функцию:

def extract():     print("Извлечение данных из источника...")  def transform():     print("Преобразование и очистка данных...")  def load():     print("Загрузка данных в хранилище...")

Шаг 3: Настройка аргументов по умолчанию

Установите аргументы, которые будут использоваться по умолчанию:

default_args = {     'owner': 'airflow',     'depends_on_past': False,     'start_date': datetime(2023, 1, 1),     'retries': 1,     'retry_delay': timedelta(minutes=5), }

Шаг 4: Создание DAG

Создайте DAG с настройкой расписания выполнения:

with DAG(     'etl_pipeline',     default_args=default_args,     description='Простой ETL-пайплайн',     schedule_interval='@daily',     catchup=False ) as dag:

Шаг 5: Создание задач внутри DAG

Определите каждую задачу с помощью операторов Airflow:

    extract_task = PythonOperator(         task_id='extract',         python_callable=extract     )      transform_task = PythonOperator(         task_id='transform',         python_callable=transform     )      load_task = PythonOperator(         task_id='load',         python_callable=load     )

Шаг 6: Определение порядка выполнения задач

Установите последовательность выполнения задач:

    extract_task >> transform_task >> load_task

Таким образом, задачи будут выполнены последовательно: сначала extract_task, затем transform_task, и в конце load_task.

Советы по организации Airflow-проектов

  • Используйте шаблоны и переменные для переиспользования DAG’ов: создавайте переменные Airflow для параметров, используйте Jinja-шаблоны в SQL-запросах и Bash-командах. Это сделает ваши DAG’и универсальными и легко настраиваемыми.

  • Настройте мониторинг и логирование: включите сбор логов задач, настройте метрики выполнения DAG’ов и интегрируйте систему алертов (например, Slack, email), чтобы оперативно реагировать на проблемы.

  • Оптимизируйте DAG’и для параллельного выполнения: избегайте последовательных длинных цепочек задач, создавайте небольшие независимые задачи, которые можно запускать параллельно для ускорения выполнения.

Типичные ошибки и как их избежать

  • Разбивайте задачи на мелкие и параллельные: избегайте больших монолитных задач, так как при их падении придется запускать весь процесс заново. Разделяйте задачи на меньшие и независимые компоненты.

  • Внедряйте систему уведомлений и мониторинга: при возникновении ошибок вы должны немедленно получать уведомления, чтобы быстро реагировать и исправлять проблему.

  • Используйте секретные хранилища вместо хранения данных в коде: не храните пароли, ключи API и другие конфиденциальные данные в открытом виде. Используйте переменные Airflow, секретные менеджеры AWS или HashiCorp Vault.

Как масштабировать Airflow

  • Используйте CeleryExecutor для распределенного выполнения: CeleryExecutor позволяет выполнять задачи на нескольких узлах одновременно, значительно улучшая производительность и надежность вашей инфраструктуры.

  • Рассмотрите KubernetesExecutor для управления задачами в Kubernetes: KubernetesExecutor идеально подходит для автоматического масштабирования задач. Он запускает каждую задачу как отдельный под в кластере Kubernetes, позволяя легко управлять ресурсами и обеспечивать высокую доступность.

Заключение

Apache Airflow — мощный инструмент для оркестрации и автоматизации рабочих процессов, существенно облегчающий жизнь инженерам данных. Правильная настройка и использование Airflow позволяют построить надежные и прозрачные ETL-процессы, обеспечивающие качественную аналитику и принятие решений на основе данных.


ссылка на оригинал статьи https://habr.com/ru/articles/918580/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *