Написание ETL пайплайна при помощи airflow, rabbitmq и postgres

от автора

Всем привет! В данной статье мы рассмотрим как можно локально развернуть airflow с помощью docker desktop’a и PyCharm’a. Кроме него развернём ещё и следующее: rabbitMQ, postgreSQL, redis и MongoDB.

Первый делом, нужно установить себе на компьютер Docker desktop, PyCharm и PgAdmin4. Теперь нам нужен yaml файл, в котором будет прописано все образы, которые мы хотим поставить. Готовый файл для нашей задачи можете скачать по ссылке. Когда всё необходимое было скачано, можно приступить к развёртыванию airflow.

Создаём проект в PyCharm, и в корневую папку загружаем наш yaml файл. В терминале выполняем следующий код: docker compose up -d. Когда нажмёте enter, то у вас начнётся скачивание необходимых образов в docker desktop и сразу из них запустятся контейнеры, кроме этого, в проекте у вас создастся необходимая структура папок.

Когда всё скачается, у вас docker desktop должен выглядеть примерно так:

На данный момент, у нас уже локально развернулось всё, что я обещал, теперь будем устанавливать необходимые соединения.

Объединение airflow и postgres

Начнём с Postgres; зайдём в PgAdmin и подключимся к нашему серверу

Логин и пароль: airflow, airflow соответственно

После подключения у вас должно появиться в сервере две базы данных:

Для работы с airflow используется airflow. Для будущего написания ETL давайте сразу создадим таблицу:

Теперь займёмся установкой соединения между airflow и postgres. Для того, чтобы перейти в airflow, зайдём в docker desktop и нажмём на порт у airflow-webserver:

Нужно будет ввести логин и пароль: airflow и airflow

После этого, вы окажетесь на основной странице airflow:

Теперь, чтобы наш airflow мог взаимодействовать с postgres, необходимо создать соединение между ними в airflow:

Пароль указываем от нашей базы данных, в нашем случае airflow

Сохраняем соединение и теперь у нас налажена работа airflow и postgres.

Напишем самый простой dag, чтобы проверить, что всё работает. Для начала нужно установить дополнительные библиотеки в PyCharm: apache-airflow и apache-airflow-providers-postgres.

Напишем код:

from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook   hook = PostgresHook(postgres_conn_id='Postgres')  request = "insert into public.data (name, birth_date, age) values ('Anton', '2004.11.14', 19)"   def insert():     hook.run(request)   with DAG(     dag_id="base_dag",     default_args={         "owner": "Kirill",     } ) as dag:      t = PythonOperator(         task_id='insert_postgres',         python_callable=insert         )

Как только мы закончим его писать, в airflow появится наш dag:

Запустим его и посмотрим, что получилось.

Как видим, dag отработал успешно и добавил новую строку в нашу таблицу.

Объединение airflow и rabbitmq

Начнём с настройки rabbitMQ. Для того, чтобы перейти в rabbitMQ, нужно нажать на его порт в docker desktop’е:

Логин и пароль: guest, guest. Перейдём в раздел «Queues» и создадим там новую очередь с названием queue.airflow:

Вернёмся в PyCharm и установим библиотеку airflow-provider-rabbitmq.

Перейдём в airflow в раздел с соединениями и добавим соединение с брокером, в поле с хостом нужно указать имя нашего контейнера с rabbit, то есть, rabbitmq_main:

После всего этого, настройка брокера закончена и мы можем приступать к взаимодействию питона, airflow и брокера. Напишем базовую программу:

Для этого в папке dags создадим папку tasks_broker. В ней создадим sensor.py с следующим кодом:

from rabbitmq_provider.sensors.rabbitmq import RabbitMQSensor   sensor = RabbitMQSensor(     task_id="sensor",     queue_name="queue.airflow",     rabbitmq_conn_id="RabbitMQ", ) 

Данный код является таской, которая принимает данные из брокера и передаёт в другую таску.

Также в папке tasks_broker создадим get_data.py с следующим содержанием:

import json from airflow.decorators import task   @task(task_id="get_data") def get_data(**kwargs) -> None:     message = json.loads(kwargs['task_instance'].xcom_pull(task_ids='sensor'))      print("#########################################################################################")     print(message)     print("#########################################################################################")

После этого, в папке dags создадим python_broker.py:

from airflow import DAG from tasks_broker.sensor import sensor from tasks_broker.get_data import get_data   with DAG(         dag_id="python_broker",         schedule_interval=None, ) as dag:     sensor >> get_data()

И у нас в airflow появится новый dag python_broker. Запустим его и он будет ждать до тех пор, пока из брокера не поступит сообщение. Чтобы отправить сообщение из брокера, нужно перейти в раздел «queues» и выбрать необходимую очередь, в нашем случае queue.airflow. Там выбрать «publish message», ввести нужное сообщение и отправить.

Как мы отправим сообщение, наш dag выполнится и мы можем убедиться в том, что всё прошло успешно, зайдя в логи второй таски:

Решение задачи ETL. Объединение airflow, rabbitmq и postgres

Давайте теперь решим задачу ETL, будет три таски:

  1. Получаем данные о человеке в формате json из брокера (имя, пол, дату рождения, возраст, город проживания) и передаём его во вторую таску для обработки

  2. Вторая таска отвечает за обработку полученных полей и дальнейшую передачу результата в третью таску (будем из пяти полей передавать только три (имя, дату рождения и возраст))

  3. Третья таска будет загружать данные в postgres

По сути, все соединения у нас установлены, необходимость лишь заключается в правильно написанном коде. В папке dags создадим папку tasks_broker_postgres. Там создадим три файла extract.py, transformation.py и load.py.

extract.py:

from rabbitmq_provider.sensors.rabbitmq import RabbitMQSensor   extract = RabbitMQSensor(     task_id="extract",     queue_name="queue.airflow",     rabbitmq_conn_id="RabbitMQ", ) 

transformation.py:

import json from airflow.decorators import task   @task(task_id="transformation") def transformation(**kwargs) -> dict:     message = json.loads(kwargs['task_instance'].xcom_pull(task_ids='extract'))      new_message = {"name": message["name"],                    "birth_date": message["birth_date"],                    "age": message["age"]}      return new_message 

load.py:

from airflow.decorators import task from airflow.providers.postgres.hooks.postgres import PostgresHook   @task(task_id="load") def load(**kwargs) -> None:     message = kwargs['task_instance'].xcom_pull(task_ids='transformation')      hook = PostgresHook(postgres_conn_id='Postgres')      request = ("insert into public.data (name, birth_date, age) values ('" + message["name"] + "','" +                                                                             message["birth_date"] + "'," +                                                                             str(message["age"]) + ")")      hook.run(request) 

Теперь в папке dags создадим python_broker_postgres.py:

from airflow import DAG from tasks_broker_postgres.extract import extract from tasks_broker_postgres.transformation import transformation from tasks_broker_postgres.load import load   with DAG(         dag_id="python_broker_postgres",         schedule_interval=None, ) as dag:     extract >> transformation() >> load() 

Запустим dag в airflow, зайдём в брокер и отправим сообщение:

По логам смотрим, что все таски прошли успешно:

Зайдём теперь в базу данных:

Как видим, наше сообщение успешно обработалось и добавилось в базу данных.

Заключение

В данной статье мы рассмотрели взаимодействие python, airflow, postgres и rabbitmq.

Если вы не забыли, то в yaml файл входит ещё и MongoDB, работа с ней осуществляется аналогично работе с postgres. Для удобной работы с MongoDB можно скачать эту программу.

До новых публикаций!


ссылка на оригинал статьи https://habr.com/ru/articles/857476/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *