Всем привет! В данной статье мы рассмотрим как можно локально развернуть airflow с помощью docker desktop’a и PyCharm’a. Кроме него развернём ещё и следующее: rabbitMQ, postgreSQL, redis и MongoDB.
Первый делом, нужно установить себе на компьютер Docker desktop, PyCharm и PgAdmin4. Теперь нам нужен yaml файл, в котором будет прописано все образы, которые мы хотим поставить. Готовый файл для нашей задачи можете скачать по ссылке. Когда всё необходимое было скачано, можно приступить к развёртыванию airflow.
Создаём проект в PyCharm, и в корневую папку загружаем наш yaml файл. В терминале выполняем следующий код: docker compose up -d. Когда нажмёте enter, то у вас начнётся скачивание необходимых образов в docker desktop и сразу из них запустятся контейнеры, кроме этого, в проекте у вас создастся необходимая структура папок.
Когда всё скачается, у вас docker desktop должен выглядеть примерно так:
На данный момент, у нас уже локально развернулось всё, что я обещал, теперь будем устанавливать необходимые соединения.
Объединение airflow и postgres
Начнём с Postgres; зайдём в PgAdmin и подключимся к нашему серверу
Логин и пароль: airflow, airflow соответственно
После подключения у вас должно появиться в сервере две базы данных:
Для работы с airflow используется airflow. Для будущего написания ETL давайте сразу создадим таблицу:
Теперь займёмся установкой соединения между airflow и postgres. Для того, чтобы перейти в airflow, зайдём в docker desktop и нажмём на порт у airflow-webserver:
Нужно будет ввести логин и пароль: airflow и airflow
После этого, вы окажетесь на основной странице airflow:
Теперь, чтобы наш airflow мог взаимодействовать с postgres, необходимо создать соединение между ними в airflow:
Пароль указываем от нашей базы данных, в нашем случае airflow
Сохраняем соединение и теперь у нас налажена работа airflow и postgres.
Напишем самый простой dag, чтобы проверить, что всё работает. Для начала нужно установить дополнительные библиотеки в PyCharm: apache-airflow и apache-airflow-providers-postgres.
Напишем код:
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook hook = PostgresHook(postgres_conn_id='Postgres') request = "insert into public.data (name, birth_date, age) values ('Anton', '2004.11.14', 19)" def insert(): hook.run(request) with DAG( dag_id="base_dag", default_args={ "owner": "Kirill", } ) as dag: t = PythonOperator( task_id='insert_postgres', python_callable=insert )
Как только мы закончим его писать, в airflow появится наш dag:
Запустим его и посмотрим, что получилось.
Как видим, dag отработал успешно и добавил новую строку в нашу таблицу.
Объединение airflow и rabbitmq
Начнём с настройки rabbitMQ. Для того, чтобы перейти в rabbitMQ, нужно нажать на его порт в docker desktop’е:
Логин и пароль: guest, guest. Перейдём в раздел «Queues» и создадим там новую очередь с названием queue.airflow:
Вернёмся в PyCharm и установим библиотеку airflow-provider-rabbitmq.
Перейдём в airflow в раздел с соединениями и добавим соединение с брокером, в поле с хостом нужно указать имя нашего контейнера с rabbit, то есть, rabbitmq_main:
После всего этого, настройка брокера закончена и мы можем приступать к взаимодействию питона, airflow и брокера. Напишем базовую программу:
Для этого в папке dags создадим папку tasks_broker. В ней создадим sensor.py с следующим кодом:
from rabbitmq_provider.sensors.rabbitmq import RabbitMQSensor sensor = RabbitMQSensor( task_id="sensor", queue_name="queue.airflow", rabbitmq_conn_id="RabbitMQ", )
Данный код является таской, которая принимает данные из брокера и передаёт в другую таску.
Также в папке tasks_broker создадим get_data.py с следующим содержанием:
import json from airflow.decorators import task @task(task_id="get_data") def get_data(**kwargs) -> None: message = json.loads(kwargs['task_instance'].xcom_pull(task_ids='sensor')) print("#########################################################################################") print(message) print("#########################################################################################")
После этого, в папке dags создадим python_broker.py:
from airflow import DAG from tasks_broker.sensor import sensor from tasks_broker.get_data import get_data with DAG( dag_id="python_broker", schedule_interval=None, ) as dag: sensor >> get_data()
И у нас в airflow появится новый dag python_broker. Запустим его и он будет ждать до тех пор, пока из брокера не поступит сообщение. Чтобы отправить сообщение из брокера, нужно перейти в раздел «queues» и выбрать необходимую очередь, в нашем случае queue.airflow. Там выбрать «publish message», ввести нужное сообщение и отправить.
Как мы отправим сообщение, наш dag выполнится и мы можем убедиться в том, что всё прошло успешно, зайдя в логи второй таски:
Решение задачи ETL. Объединение airflow, rabbitmq и postgres
Давайте теперь решим задачу ETL, будет три таски:
-
Получаем данные о человеке в формате json из брокера (имя, пол, дату рождения, возраст, город проживания) и передаём его во вторую таску для обработки
-
Вторая таска отвечает за обработку полученных полей и дальнейшую передачу результата в третью таску (будем из пяти полей передавать только три (имя, дату рождения и возраст))
-
Третья таска будет загружать данные в postgres
По сути, все соединения у нас установлены, необходимость лишь заключается в правильно написанном коде. В папке dags создадим папку tasks_broker_postgres. Там создадим три файла extract.py, transformation.py и load.py.
extract.py:
from rabbitmq_provider.sensors.rabbitmq import RabbitMQSensor extract = RabbitMQSensor( task_id="extract", queue_name="queue.airflow", rabbitmq_conn_id="RabbitMQ", )
transformation.py:
import json from airflow.decorators import task @task(task_id="transformation") def transformation(**kwargs) -> dict: message = json.loads(kwargs['task_instance'].xcom_pull(task_ids='extract')) new_message = {"name": message["name"], "birth_date": message["birth_date"], "age": message["age"]} return new_message
load.py:
from airflow.decorators import task from airflow.providers.postgres.hooks.postgres import PostgresHook @task(task_id="load") def load(**kwargs) -> None: message = kwargs['task_instance'].xcom_pull(task_ids='transformation') hook = PostgresHook(postgres_conn_id='Postgres') request = ("insert into public.data (name, birth_date, age) values ('" + message["name"] + "','" + message["birth_date"] + "'," + str(message["age"]) + ")") hook.run(request)
Теперь в папке dags создадим python_broker_postgres.py:
from airflow import DAG from tasks_broker_postgres.extract import extract from tasks_broker_postgres.transformation import transformation from tasks_broker_postgres.load import load with DAG( dag_id="python_broker_postgres", schedule_interval=None, ) as dag: extract >> transformation() >> load()
Запустим dag в airflow, зайдём в брокер и отправим сообщение:
По логам смотрим, что все таски прошли успешно:
Зайдём теперь в базу данных:
Как видим, наше сообщение успешно обработалось и добавилось в базу данных.
Заключение
В данной статье мы рассмотрели взаимодействие python, airflow, postgres и rabbitmq.
Если вы не забыли, то в yaml файл входит ещё и MongoDB, работа с ней осуществляется аналогично работе с postgres. Для удобной работы с MongoDB можно скачать эту программу.
До новых публикаций!
ссылка на оригинал статьи https://habr.com/ru/articles/857476/
Добавить комментарий