Apache Airflow давно стал стандартом в мире Data Engineering благодаря своей гибкости, масштабируемости и богатой экосистеме. В этой статье мы подробно разберём, что такое Airflow, почему он так популярен, как эффективно использовать его в аналитической архитектуре, а также предоставим максимально подробную инструкцию по развертыванию Airflow.
Что такое Apache Airflow?
Apache Airflow — это платформа с открытым исходным кодом для создания, планирования и мониторинга рабочих процессов (workflow). Основная сила Airflow заключается в возможности организации сложных ETL-процессов в виде управляемых графов задач (DAG — Directed Acyclic Graph).
Основные компоненты Airflow
DAG (Directed Acyclic Graph)
DAG — это структурированный набор задач, связанных между собой четко определенными зависимостями. Основные характеристики DAG:
-
Задачи выполняются согласно строгому порядку, без циклических зависимостей.
-
Каждый DAG описывается с помощью кода Python, что делает процесс разработки и поддержки интуитивно понятным.
-
DAG позволяет ясно визуализировать и контролировать последовательность задач.
Task
Task (задача) является основной рабочей единицей внутри DAG:
-
Может выполнять любой код на Python, SQL-запросы или системные команды.
-
Использует специальные операторы Airflow (например, PythonOperator, BashOperator, PostgresOperator).
-
Позволяет конфигурировать параметры выполнения, такие как время запуска, поведение при ошибках и условия выполнения.
Scheduler
Scheduler (планировщик) отвечает за запуск DAG в соответствии с заданным расписанием:
-
Регулярно проверяет статус всех задач и запускает новые задачи в соответствии с графом.
-
Управляет состоянием задач (запуск, повторная попытка, пропуск, завершение).
-
Обеспечивает корректную обработку расписаний и зависимостей.
Executor
Executor управляет выполнением задач, распределением нагрузки и масштабированием:
-
SequentialExecutor: Запускает задачи последовательно (подходит для разработки и тестирования).
-
LocalExecutor: Запускает задачи параллельно на одном узле.
-
CeleryExecutor: Использует Celery для распределенного выполнения задач на нескольких узлах.
-
KubernetesExecutor: Запускает задачи в виде отдельных подов в Kubernetes, обеспечивая динамическое масштабирование.
Webserver
Webserver предоставляет веб-интерфейс для управления и мониторинга процессов Airflow:
-
Позволяет визуально отслеживать выполнение задач и графов.
-
Поддерживает интерфейс для ручного запуска и перезапуска задач.
-
Содержит встроенные инструменты для просмотра логов и отладки.
Metadata Database
Metadata Database (база метаданных) хранит информацию о всех DAG, задачах и их состоянии:
-
Обычно используется PostgreSQL или MySQL для хранения метаданных.
-
Содержит исторические данные о выполнении задач, позволяя анализировать производительность и ошибки.
-
Обеспечивает сохранение состояния даже после перезапуска Airflow.
Подробная инструкция по развертыванию Apache Airflow
Шаг 1: Подготовка окружения
Используйте Linux-сервер с установленным Docker и Docker Compose.
Установите Docker:
sudo apt update sudo apt install docker.io docker-compose -y sudo systemctl start docker sudo systemctl enable docker
Шаг 2: Создание файла Docker Compose
Создайте директорию проекта:
mkdir airflow cd airflow
Создайте файл docker-compose.yaml:
version: '3' services: postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow ports: - "5432:5432" redis: image: redis:latest ports: - "6379:6379" webserver: image: apache/airflow:2.9.1 depends_on: - postgres - redis environment: AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0 AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__WEBSERVER__SECRET_KEY: '' ports: - "8080:8080" command: webserver scheduler: image: apache/airflow:2.9.1 depends_on: - webserver environment: AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0 AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow command: scheduler worker: image: apache/airflow:2.9.1 depends_on: - scheduler environment: AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0 AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow command: celery worker
Шаг 3: Инициализация базы данных и создание пользователя
Запустите контейнеры:
sudo docker-compose up airflow-init
Шаг 4: Запуск Airflow
Запустите Airflow в режиме демона:
sudo docker-compose up -d
Шаг 5: Доступ к Airflow Web UI
Перейдите по адресу: http://localhost:8080
Стандартные учетные данные:
-
логин:
airflow -
пароль:
airflow
Создание первого DAG
Создание DAG начинается с определения задач и их последовательности. Рассмотрим этот процесс максимально подробно:
Шаг 1: Импорт необходимых библиотек
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime def extract(): print("Extracting data...") def transform(): print("Transforming data...") def load(): print("Loading data into warehouse...") with DAG('etl_pipeline', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag: extract_task = PythonOperator(task_id='extract', python_callable=extract) transform_task = PythonOperator(task_id='transform', python_callable=transform) load_task = PythonOperator(task_id='load', python_callable=load) extract_task >> transform_task >> load_task
Шаг 2: Определение функций задач
Каждая задача представляет собой отдельную функцию:
def extract(): print("Извлечение данных из источника...") def transform(): print("Преобразование и очистка данных...") def load(): print("Загрузка данных в хранилище...")
Шаг 3: Настройка аргументов по умолчанию
Установите аргументы, которые будут использоваться по умолчанию:
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5), }
Шаг 4: Создание DAG
Создайте DAG с настройкой расписания выполнения:
with DAG( 'etl_pipeline', default_args=default_args, description='Простой ETL-пайплайн', schedule_interval='@daily', catchup=False ) as dag:
Шаг 5: Создание задач внутри DAG
Определите каждую задачу с помощью операторов Airflow:
extract_task = PythonOperator( task_id='extract', python_callable=extract ) transform_task = PythonOperator( task_id='transform', python_callable=transform ) load_task = PythonOperator( task_id='load', python_callable=load )
Шаг 6: Определение порядка выполнения задач
Установите последовательность выполнения задач:
extract_task >> transform_task >> load_task
Таким образом, задачи будут выполнены последовательно: сначала extract_task, затем transform_task, и в конце load_task.
Советы по организации Airflow-проектов
-
Используйте шаблоны и переменные для переиспользования DAG’ов: создавайте переменные Airflow для параметров, используйте Jinja-шаблоны в SQL-запросах и Bash-командах. Это сделает ваши DAG’и универсальными и легко настраиваемыми.
-
Настройте мониторинг и логирование: включите сбор логов задач, настройте метрики выполнения DAG’ов и интегрируйте систему алертов (например, Slack, email), чтобы оперативно реагировать на проблемы.
-
Оптимизируйте DAG’и для параллельного выполнения: избегайте последовательных длинных цепочек задач, создавайте небольшие независимые задачи, которые можно запускать параллельно для ускорения выполнения.
Типичные ошибки и как их избежать
-
Разбивайте задачи на мелкие и параллельные: избегайте больших монолитных задач, так как при их падении придется запускать весь процесс заново. Разделяйте задачи на меньшие и независимые компоненты.
-
Внедряйте систему уведомлений и мониторинга: при возникновении ошибок вы должны немедленно получать уведомления, чтобы быстро реагировать и исправлять проблему.
-
Используйте секретные хранилища вместо хранения данных в коде: не храните пароли, ключи API и другие конфиденциальные данные в открытом виде. Используйте переменные Airflow, секретные менеджеры AWS или HashiCorp Vault.
Как масштабировать Airflow
-
Используйте CeleryExecutor для распределенного выполнения: CeleryExecutor позволяет выполнять задачи на нескольких узлах одновременно, значительно улучшая производительность и надежность вашей инфраструктуры.
-
Рассмотрите KubernetesExecutor для управления задачами в Kubernetes: KubernetesExecutor идеально подходит для автоматического масштабирования задач. Он запускает каждую задачу как отдельный под в кластере Kubernetes, позволяя легко управлять ресурсами и обеспечивать высокую доступность.
Заключение
Apache Airflow — мощный инструмент для оркестрации и автоматизации рабочих процессов, существенно облегчающий жизнь инженерам данных. Правильная настройка и использование Airflow позволяют построить надежные и прозрачные ETL-процессы, обеспечивающие качественную аналитику и принятие решений на основе данных.
ссылка на оригинал статьи https://habr.com/ru/articles/918580/
Добавить комментарий